Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"log/slog"
"path/filepath"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -1317,6 +1318,11 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
if err := e.persist.Release(snap); err != nil {
return errors.WithStack(err)
}

snapDir := filepath.Join(e.dataDir, snapDirName)
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
slog.Warn("failed to purge old snap files", "error", purgeErr)
}
return nil
}

Expand Down Expand Up @@ -2262,6 +2268,10 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
switch {
case err == nil:
snapDir := filepath.Join(e.dataDir, snapDirName)
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
slog.Warn("failed to purge old snap files", "error", purgeErr)
}
return nil
case errors.Is(err, etcdraft.ErrCompacted):
return nil
Expand Down
19 changes: 19 additions & 0 deletions internal/raftengine/etcd/snapshot_spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package etcd
import (
"io"
"os"
"path/filepath"

"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -64,6 +65,24 @@ func (s *snapshotSpool) Reader() (io.Reader, error) {
return s.file, nil
}

// cleanupStaleSnapshotSpools removes orphaned snapshot spool files left behind
// by a previous engine instance that crashed before Close could run.
func cleanupStaleSnapshotSpools(dir string) error {
matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
if err != nil {
return errors.WithStack(err)
}
var combined error
for _, match := range matches {
removeErr := os.Remove(match)
if removeErr == nil || os.IsNotExist(removeErr) {
continue
}
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
}
return errors.WithStack(combined)
}

func (s *snapshotSpool) Close() error {
if s == nil {
return nil
Expand Down
111 changes: 111 additions & 0 deletions internal/raftengine/etcd/snapshot_spool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package etcd

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

func TestCleanupStaleSnapshotSpools(t *testing.T) {
dir := t.TempDir()

// Create several orphaned spool files matching the pattern.
for i := 0; i < 5; i++ {
f, err := os.CreateTemp(dir, snapshotSpoolPattern)
require.NoError(t, err)
require.NoError(t, f.Close())
}

// Create an unrelated file that must not be removed.
unrelated := filepath.Join(dir, "keep-me.txt")
require.NoError(t, os.WriteFile(unrelated, []byte("data"), 0o600))

matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
require.NoError(t, err)
require.Len(t, matches, 5)

require.NoError(t, cleanupStaleSnapshotSpools(dir))

// All spool files should be gone.
matches, err = filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
require.NoError(t, err)
require.Empty(t, matches)

// Unrelated file should still exist.
_, err = os.Stat(unrelated)
require.NoError(t, err)
}

func TestCleanupStaleSnapshotSpoolsEmptyDir(t *testing.T) {
dir := t.TempDir()
require.NoError(t, cleanupStaleSnapshotSpools(dir))
}

func TestCleanupStaleSnapshotSpoolsNonExistentDir(t *testing.T) {
require.NoError(t, cleanupStaleSnapshotSpools(filepath.Join(t.TempDir(), "no-such-dir")))
}

// createSnapFile creates a fake .snap file with the etcd naming convention.
func createSnapFile(t *testing.T, dir string, term, index uint64) {
t.Helper()
name := fmt.Sprintf("%016x-%016x.snap", term, index)
path := filepath.Join(dir, name)
require.NoError(t, os.WriteFile(path, []byte("fake"), 0o600))
}

func TestPurgeOldSnapFiles(t *testing.T) {
dir := t.TempDir()

// Create 6 snap files at increasing indices.
for i := uint64(1); i <= 6; i++ {
createSnapFile(t, dir, 1, i*10000)
}

// Create a non-snap file that must be preserved.
other := filepath.Join(dir, "db.tmp.12345")
require.NoError(t, os.WriteFile(other, []byte("x"), 0o600))

require.NoError(t, purgeOldSnapFiles(dir))

entries, err := os.ReadDir(dir)
require.NoError(t, err)

var snaps []string
for _, e := range entries {
if filepath.Ext(e.Name()) == ".snap" {
snaps = append(snaps, e.Name())
}
}

// Only the newest 3 should remain.
require.Len(t, snaps, 3)
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(40000)), snaps[0])
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(50000)), snaps[1])
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(60000)), snaps[2])

// Non-snap file preserved.
_, err = os.Stat(other)
require.NoError(t, err)
}

func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
dir := t.TempDir()

// Only 2 files — under the limit of 3, nothing should be removed.
createSnapFile(t, dir, 1, 1000)
createSnapFile(t, dir, 1, 2000)

require.NoError(t, purgeOldSnapFiles(dir))

entries, err := os.ReadDir(dir)
require.NoError(t, err)
require.Len(t, entries, 2)
}

func TestPurgeOldSnapFilesEmptyDir(t *testing.T) {
dir := t.TempDir()
require.NoError(t, purgeOldSnapFiles(dir))
}
43 changes: 43 additions & 0 deletions internal/raftengine/etcd/wal_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) {
return nil, errors.WithStack(err)
}

if err := cleanupStaleSnapshotSpools(cfg.DataDir); err != nil {
return nil, errors.Wrap(err, "cleanup stale snapshot spools")
}

if wal.Exist(walDir) {
return loadWalState(logger, walDir, snapDir, cfg.StateMachine)
}
Expand Down Expand Up @@ -336,6 +340,45 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst
return snapshot, nil
}

// defaultMaxSnapFiles is the number of .snap files to retain in the snap
// directory. etcd itself purges old snap files via fileutil.PurgeFile; the
// elastickv etcd engine must do this explicitly.
const defaultMaxSnapFiles = 3

// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most
// recent defaultMaxSnapFiles files. Snap file names encode term and index in
// hex and sort lexicographically from oldest to newest, matching etcd's
// Snapshotter convention.
func purgeOldSnapFiles(snapDir string) error {
entries, err := os.ReadDir(snapDir)
if err != nil {
return errors.WithStack(err)
}

var snaps []string
for _, e := range entries {
if !e.IsDir() && filepath.Ext(e.Name()) == ".snap" {
snaps = append(snaps, e.Name())
}
}

if len(snaps) <= defaultMaxSnapFiles {
return nil
}

// snaps is already sorted ascending (oldest first) because os.ReadDir
// returns entries in directory order which, for zero-padded hex names,
// equals chronological order.

var combined error
for _, name := range snaps[:len(snaps)-defaultMaxSnapFiles] {
if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) {
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
}
}
return errors.WithStack(combined)
}

func buildLocalSnapshot(storage *etcdraft.MemoryStorage, applied uint64, payload []byte) (raftpb.Snapshot, error) {
_, confState, err := storage.InitialState()
if err != nil {
Expand Down
Loading