diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index efbb91f4..69d2b121 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "log/slog" + "path/filepath" "sort" "strconv" "sync" @@ -1317,6 +1318,11 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { if err := e.persist.Release(snap); err != nil { return errors.WithStack(err) } + + snapDir := filepath.Join(e.dataDir, snapDirName) + if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil { + slog.Warn("failed to purge old snap files", "error", purgeErr) + } return nil } @@ -2262,6 +2268,10 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) switch { case err == nil: + snapDir := filepath.Join(e.dataDir, snapDirName) + if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil { + slog.Warn("failed to purge old snap files", "error", purgeErr) + } return nil case errors.Is(err, etcdraft.ErrCompacted): return nil diff --git a/internal/raftengine/etcd/snapshot_spool.go b/internal/raftengine/etcd/snapshot_spool.go index 10f59903..2986f719 100644 --- a/internal/raftengine/etcd/snapshot_spool.go +++ b/internal/raftengine/etcd/snapshot_spool.go @@ -3,6 +3,7 @@ package etcd import ( "io" "os" + "path/filepath" "github.com/cockroachdb/errors" ) @@ -64,6 +65,24 @@ func (s *snapshotSpool) Reader() (io.Reader, error) { return s.file, nil } +// cleanupStaleSnapshotSpools removes orphaned snapshot spool files left behind +// by a previous engine instance that crashed before Close could run. +func cleanupStaleSnapshotSpools(dir string) error { + matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern)) + if err != nil { + return errors.WithStack(err) + } + var combined error + for _, match := range matches { + removeErr := os.Remove(match) + if removeErr == nil || os.IsNotExist(removeErr) { + continue + } + combined = errors.CombineErrors(combined, errors.WithStack(removeErr)) + } + return errors.WithStack(combined) +} + func (s *snapshotSpool) Close() error { if s == nil { return nil diff --git a/internal/raftengine/etcd/snapshot_spool_test.go b/internal/raftengine/etcd/snapshot_spool_test.go new file mode 100644 index 00000000..bd63960d --- /dev/null +++ b/internal/raftengine/etcd/snapshot_spool_test.go @@ -0,0 +1,111 @@ +package etcd + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCleanupStaleSnapshotSpools(t *testing.T) { + dir := t.TempDir() + + // Create several orphaned spool files matching the pattern. + for i := 0; i < 5; i++ { + f, err := os.CreateTemp(dir, snapshotSpoolPattern) + require.NoError(t, err) + require.NoError(t, f.Close()) + } + + // Create an unrelated file that must not be removed. + unrelated := filepath.Join(dir, "keep-me.txt") + require.NoError(t, os.WriteFile(unrelated, []byte("data"), 0o600)) + + matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern)) + require.NoError(t, err) + require.Len(t, matches, 5) + + require.NoError(t, cleanupStaleSnapshotSpools(dir)) + + // All spool files should be gone. + matches, err = filepath.Glob(filepath.Join(dir, snapshotSpoolPattern)) + require.NoError(t, err) + require.Empty(t, matches) + + // Unrelated file should still exist. + _, err = os.Stat(unrelated) + require.NoError(t, err) +} + +func TestCleanupStaleSnapshotSpoolsEmptyDir(t *testing.T) { + dir := t.TempDir() + require.NoError(t, cleanupStaleSnapshotSpools(dir)) +} + +func TestCleanupStaleSnapshotSpoolsNonExistentDir(t *testing.T) { + require.NoError(t, cleanupStaleSnapshotSpools(filepath.Join(t.TempDir(), "no-such-dir"))) +} + +// createSnapFile creates a fake .snap file with the etcd naming convention. +func createSnapFile(t *testing.T, dir string, term, index uint64) { + t.Helper() + name := fmt.Sprintf("%016x-%016x.snap", term, index) + path := filepath.Join(dir, name) + require.NoError(t, os.WriteFile(path, []byte("fake"), 0o600)) +} + +func TestPurgeOldSnapFiles(t *testing.T) { + dir := t.TempDir() + + // Create 6 snap files at increasing indices. + for i := uint64(1); i <= 6; i++ { + createSnapFile(t, dir, 1, i*10000) + } + + // Create a non-snap file that must be preserved. + other := filepath.Join(dir, "db.tmp.12345") + require.NoError(t, os.WriteFile(other, []byte("x"), 0o600)) + + require.NoError(t, purgeOldSnapFiles(dir)) + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + + var snaps []string + for _, e := range entries { + if filepath.Ext(e.Name()) == ".snap" { + snaps = append(snaps, e.Name()) + } + } + + // Only the newest 3 should remain. + require.Len(t, snaps, 3) + require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(40000)), snaps[0]) + require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(50000)), snaps[1]) + require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(60000)), snaps[2]) + + // Non-snap file preserved. + _, err = os.Stat(other) + require.NoError(t, err) +} + +func TestPurgeOldSnapFilesUnderLimit(t *testing.T) { + dir := t.TempDir() + + // Only 2 files — under the limit of 3, nothing should be removed. + createSnapFile(t, dir, 1, 1000) + createSnapFile(t, dir, 1, 2000) + + require.NoError(t, purgeOldSnapFiles(dir)) + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + require.Len(t, entries, 2) +} + +func TestPurgeOldSnapFilesEmptyDir(t *testing.T) { + dir := t.TempDir() + require.NoError(t, purgeOldSnapFiles(dir)) +} diff --git a/internal/raftengine/etcd/wal_store.go b/internal/raftengine/etcd/wal_store.go index 86d99dc5..2c847bbc 100644 --- a/internal/raftengine/etcd/wal_store.go +++ b/internal/raftengine/etcd/wal_store.go @@ -38,6 +38,10 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) { return nil, errors.WithStack(err) } + if err := cleanupStaleSnapshotSpools(cfg.DataDir); err != nil { + return nil, errors.Wrap(err, "cleanup stale snapshot spools") + } + if wal.Exist(walDir) { return loadWalState(logger, walDir, snapDir, cfg.StateMachine) } @@ -336,6 +340,45 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst return snapshot, nil } +// defaultMaxSnapFiles is the number of .snap files to retain in the snap +// directory. etcd itself purges old snap files via fileutil.PurgeFile; the +// elastickv etcd engine must do this explicitly. +const defaultMaxSnapFiles = 3 + +// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most +// recent defaultMaxSnapFiles files. Snap file names encode term and index in +// hex and sort lexicographically from oldest to newest, matching etcd's +// Snapshotter convention. +func purgeOldSnapFiles(snapDir string) error { + entries, err := os.ReadDir(snapDir) + if err != nil { + return errors.WithStack(err) + } + + var snaps []string + for _, e := range entries { + if !e.IsDir() && filepath.Ext(e.Name()) == ".snap" { + snaps = append(snaps, e.Name()) + } + } + + if len(snaps) <= defaultMaxSnapFiles { + return nil + } + + // snaps is already sorted ascending (oldest first) because os.ReadDir + // returns entries in directory order which, for zero-padded hex names, + // equals chronological order. + + var combined error + for _, name := range snaps[:len(snaps)-defaultMaxSnapFiles] { + if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) { + combined = errors.CombineErrors(combined, errors.WithStack(removeErr)) + } + } + return errors.WithStack(combined) +} + func buildLocalSnapshot(storage *etcdraft.MemoryStorage, applied uint64, payload []byte) (raftpb.Snapshot, error) { _, confState, err := storage.InitialState() if err != nil {