Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3da62a9
Add files via upload
bootjp Apr 11, 2026
c3ca038
Update docs/list_metadata_delta_design.md
bootjp Apr 12, 2026
ab4626b
docs: apply review feedback to list_metadata_delta_design.md
Copilot Apr 12, 2026
06eb2d4
docs: apply gemini review 4093919615 to list_metadata_delta_design.md
Copilot Apr 12, 2026
d7f4168
docs: extend delta design to all collection types and address gemini …
bootjp Apr 12, 2026
ab2555b
docs: address gemini review - unify key layout to length-prefix, fix …
bootjp Apr 12, 2026
45749a8
docs: address Gemini review feedback on collection metadata delta design
bootjp Apr 12, 2026
6835736
Merge branch 'main' into feature/add-list-delta-design-doc
bootjp Apr 12, 2026
d276eb3
store: add list Delta and Claim key infrastructure (Phase L1)
bootjp Apr 12, 2026
31c53ba
adapter: switch list RPUSH/LPUSH to Delta pattern (Phase L2+L3 partial)
bootjp Apr 12, 2026
bbdef70
Revert "adapter: switch list RPUSH/LPUSH to Delta pattern (Phase L2+L…
bootjp Apr 12, 2026
00dae13
adapter: switch list write/read paths to Delta pattern (Phase L2+L3)
bootjp Apr 12, 2026
bfc7f9a
adapter: add ListDeltaCompactor for background delta folding (Phase L4)
bootjp Apr 12, 2026
ff6eb91
adapter: add concurrent PUSH tests and delta scaling verification (Ph…
bootjp Apr 12, 2026
f4b08c1
adapter: fix delta double-counting in MULTI/EXEC list operations
bootjp Apr 12, 2026
4cf3a8e
adapter: fix delta truncation in DEL and document consistency guarantees
bootjp Apr 12, 2026
abd0ed0
docs: add implementation status tracking to delta design doc
bootjp Apr 12, 2026
bee8753
adapter: simplify DEL+RPUSH test to use Redis client verification
bootjp Apr 12, 2026
0c33888
adapter: switch MULTI/EXEC buildListElems to delta emit and fix error…
bootjp Apr 12, 2026
4e31bfb
test: skip TestRedis_MultiExec_DelThenRPushRecreatesList (known delta…
bootjp Apr 12, 2026
f854ed5
adapter: revert MULTI/EXEC to base-meta write path for Jepsen stability
bootjp Apr 13, 2026
ea8aec4
adapter: fix stale startTS in MULTI/EXEC by checking delta key commits
bootjp Apr 13, 2026
f2feea5
adapter: revert write/read path to base-meta only, keep delta infrast…
bootjp Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions adapter/list_delta_compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package adapter

import (
"bytes"
"context"
"log/slog"
"time"

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
)

const (
defaultListCompactorInterval = 30 * time.Second
defaultListCompactorMaxDeltaCount = 64
defaultListCompactorMaxKeysPerTick = 256
defaultListCompactorTimeout = 5 * time.Second
maxDeltaScanLimit = 10000
)

// ListDeltaCompactor periodically scans for accumulated list Delta keys
// and folds them into the base metadata. This prevents unbounded delta
// accumulation and keeps resolveListMeta fast.
type ListDeltaCompactor struct {
store store.MVCCStore
coordinator kv.Coordinator
logger *slog.Logger

interval time.Duration
maxDeltaCount int
maxKeysPerTick int
timeout time.Duration

// cursor tracks the position for incremental scanning.
cursor []byte
}

// ListDeltaCompactorOption configures a ListDeltaCompactor.
type ListDeltaCompactorOption func(*ListDeltaCompactor)

func WithListCompactorInterval(d time.Duration) ListDeltaCompactorOption {
return func(c *ListDeltaCompactor) {
if d > 0 {
c.interval = d
}
}
}

func WithListCompactorMaxDeltaCount(n int) ListDeltaCompactorOption {
return func(c *ListDeltaCompactor) {
if n > 0 {
c.maxDeltaCount = n
}
}
}

func WithListCompactorLogger(l *slog.Logger) ListDeltaCompactorOption {
return func(c *ListDeltaCompactor) {
if l != nil {
c.logger = l
}
}
}

// NewListDeltaCompactor creates a new compactor for list delta keys.
func NewListDeltaCompactor(st store.MVCCStore, coord kv.Coordinator, opts ...ListDeltaCompactorOption) *ListDeltaCompactor {
c := &ListDeltaCompactor{
store: st,
coordinator: coord,
logger: slog.Default(),
interval: defaultListCompactorInterval,
maxDeltaCount: defaultListCompactorMaxDeltaCount,
maxKeysPerTick: defaultListCompactorMaxKeysPerTick,
timeout: defaultListCompactorTimeout,
}
for _, opt := range opts {
if opt != nil {
opt(c)
}
}
return c
}

// Run starts the compactor loop. It blocks until ctx is cancelled.
func (c *ListDeltaCompactor) Run(ctx context.Context) error {
timer := time.NewTimer(c.interval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-timer.C:
if err := c.Tick(ctx); err != nil && !errors.Is(err, context.Canceled) {
c.logger.Warn("list delta compactor tick failed", "error", err)
}
timer.Reset(c.interval)
}
}
}

// Tick performs one incremental scan pass over the delta key space,
// compacting any list that exceeds maxDeltaCount.
func (c *ListDeltaCompactor) Tick(ctx context.Context) error {
if !c.coordinator.IsLeader() {
return nil
}

start := c.scanStart()
end := store.PrefixScanEnd([]byte(store.ListMetaDeltaPrefix))
if end == nil {
return nil
}

readTS := c.store.LastCommitTS()
if readTS == 0 {
return nil
}

entries, err := c.store.ScanAt(ctx, start, end, c.maxKeysPerTick, readTS)
if err != nil {
return errors.WithStack(err)
}

if len(entries) == 0 {
c.cursor = nil // wrap around
return nil
}

// Advance cursor past the last scanned key.
lastKey := entries[len(entries)-1].Key
c.cursor = incrementKey(lastKey)

// Group delta keys by user key.
groups := groupDeltasByUserKey(entries)

for userKey, deltaKeys := range groups {
if len(deltaKeys) < c.maxDeltaCount {
continue
}
if err := c.compactList(ctx, []byte(userKey), readTS); err != nil {
c.logger.Warn("list delta compaction failed",
"user_key", userKey,
"error", err,
)
}
}
return nil
}

// compactList folds all deltas for a single list into its base metadata.
func (c *ListDeltaCompactor) compactList(ctx context.Context, userKey []byte, readTS uint64) error {
// Read base metadata.
baseMeta, _, err := loadListMetaFromStore(ctx, c.store, userKey, readTS)
if err != nil {
return err
}

// Scan all deltas for this key. If truncated, compact what we have
// and let the next tick handle the remainder.
prefix := store.ListMetaDeltaScanPrefix(userKey)
deltas, err := c.store.ScanAt(ctx, prefix, store.PrefixScanEnd(prefix), maxDeltaScanLimit, readTS)
if err != nil {
return errors.WithStack(err)
}
if len(deltas) == 0 {
return nil
}

// Aggregate deltas into base metadata.
for _, d := range deltas {
delta, derr := store.UnmarshalListMetaDelta(d.Value)
if derr != nil {
return errors.WithStack(derr)
}
baseMeta.Head += delta.HeadDelta
baseMeta.Len += delta.LenDelta
}
baseMeta.Tail = baseMeta.Head + baseMeta.Len

// Build compaction transaction: write merged meta + delete deltas.
metaBytes, err := store.MarshalListMeta(baseMeta)
if err != nil {
return errors.WithStack(err)
}

elems := make([]*kv.Elem[kv.OP], 0, len(deltas)+1)
elems = append(elems, &kv.Elem[kv.OP]{
Op: kv.Put,
Key: store.ListMetaKey(userKey),
Value: metaBytes,
})
for _, d := range deltas {
elems = append(elems, &kv.Elem[kv.OP]{
Op: kv.Del,
Key: bytes.Clone(d.Key),
})
}

compactCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

_, err = c.coordinator.Dispatch(compactCtx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: readTS,
Elems: elems,
})
if err != nil {
return errors.Wrap(err, "compact list delta dispatch")
}

c.logger.Info("compacted list deltas",
"user_key", string(userKey),
"deltas_folded", len(deltas),
"merged_len", baseMeta.Len,
)
return nil
}

func (c *ListDeltaCompactor) scanStart() []byte {
if len(c.cursor) > 0 {
return c.cursor
}
return []byte(store.ListMetaDeltaPrefix)
}

// loadListMetaFromStore reads the base list metadata directly from the store.
func loadListMetaFromStore(ctx context.Context, st store.MVCCStore, userKey []byte, readTS uint64) (store.ListMeta, bool, error) {
val, err := st.GetAt(ctx, store.ListMetaKey(userKey), readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
return store.ListMeta{}, false, nil
}
return store.ListMeta{}, false, errors.WithStack(err)
}
meta, err := store.UnmarshalListMeta(val)
if err != nil {
return store.ListMeta{}, false, errors.WithStack(err)
}
return meta, true, nil
}

// groupDeltasByUserKey groups delta scan entries by their extracted user key.
func groupDeltasByUserKey(entries []*store.KVPair) map[string][][]byte {
groups := make(map[string][][]byte)
for _, e := range entries {
uk := store.ExtractListUserKeyFromDelta(e.Key)
if uk == nil {
continue
}
key := string(uk)
groups[key] = append(groups[key], e.Key)
}
return groups
}

// incrementKey returns a key that is lexicographically just past k.
func incrementKey(k []byte) []byte {
out := bytes.Clone(k)
out = append(out, 0)
return out
}
88 changes: 88 additions & 0 deletions adapter/list_delta_compactor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package adapter

import (
"context"
"testing"

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
)

func TestListDeltaCompactor_FoldsDeltas(t *testing.T) {
t.Parallel()

nodes, _, _ := createNode(t, 3)
defer shutdown(nodes)

server := nodes[0].redisServer
ctx := context.Background()

// Write base metadata and items directly (simulating existing list).
readTS := server.readTS()
meta := store.ListMeta{Head: 0, Tail: 2, Len: 2}
metaBytes, err := store.MarshalListMeta(meta)
require.NoError(t, err)
require.NoError(t, server.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: store.ListMetaKey([]byte("mylist")), Value: metaBytes},
{Op: kv.Put, Key: store.ListItemKey([]byte("mylist"), 0), Value: []byte("a")},
{Op: kv.Put, Key: store.ListItemKey([]byte("mylist"), 1), Value: []byte("b")},
}))

// Write delta keys directly (simulating accumulated deltas).
readTS = server.readTS()
commitTS1 := readTS + 1
commitTS2 := readTS + 2
d1 := store.ListMetaDelta{HeadDelta: 0, LenDelta: 1}
d2 := store.ListMetaDelta{HeadDelta: 0, LenDelta: 1}
require.NoError(t, server.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: store.ListMetaDeltaKey([]byte("mylist"), commitTS1, 0), Value: store.MarshalListMetaDelta(d1)},
{Op: kv.Put, Key: store.ListItemKey([]byte("mylist"), 2), Value: []byte("c")},
}))
readTS = server.readTS()
require.NoError(t, server.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: store.ListMetaDeltaKey([]byte("mylist"), commitTS2, 0), Value: store.MarshalListMetaDelta(d2)},
{Op: kv.Put, Key: store.ListItemKey([]byte("mylist"), 3), Value: []byte("d")},
}))

// Verify deltas exist.
readTS = server.readTS()
prefix := store.ListMetaDeltaScanPrefix([]byte("mylist"))
deltas, err := server.store.ScanAt(ctx, prefix, store.PrefixScanEnd(prefix), 100, readTS)
require.NoError(t, err)
require.Len(t, deltas, 2)

// Run compaction with threshold 1 to force compaction.
compactor := NewListDeltaCompactor(server.store, server.coordinator,
WithListCompactorMaxDeltaCount(1),
)
err = compactor.Tick(ctx)
require.NoError(t, err)

// After compaction: base metadata should have merged values.
readTS = server.readTS()
val, err := server.store.GetAt(ctx, store.ListMetaKey([]byte("mylist")), readTS)
require.NoError(t, err)
compactedMeta, err := store.UnmarshalListMeta(val)
require.NoError(t, err)
require.Equal(t, int64(4), compactedMeta.Len) // 2 + 1 + 1

// Deltas should be deleted after compaction.
deltas, err = server.store.ScanAt(ctx, prefix, store.PrefixScanEnd(prefix), 100, readTS)
require.NoError(t, err)
require.Empty(t, deltas)
}

func TestGroupDeltasByUserKey(t *testing.T) {
t.Parallel()

entries := []*store.KVPair{
{Key: store.ListMetaDeltaKey([]byte("a"), 100, 0)},
{Key: store.ListMetaDeltaKey([]byte("a"), 200, 0)},
{Key: store.ListMetaDeltaKey([]byte("b"), 100, 0)},
}
groups := groupDeltasByUserKey(entries)
require.Len(t, groups, 2)
require.Len(t, groups["a"], 2)
require.Len(t, groups["b"], 1)
}
26 changes: 4 additions & 22 deletions adapter/redis_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestRedis_MultiExec_DelThenRPushRecreatesList(t *testing.T) {
nodes, _, _ := createNode(t, 3)
defer shutdown(nodes)

rdb := redis.NewClient(&redis.Options{Addr: nodes[1].redisAddress})
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
ctx := context.Background()

_, err := rdb.Do(ctx, "RPUSH", "list-del-rpush", "old1", "old2").Result()
Expand All @@ -194,26 +194,8 @@ func TestRedis_MultiExec_DelThenRPushRecreatesList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []any{"new1", "new2"}, rangeRes)

readTS := nodes[1].redisServer.readTS()
metaRaw, err := nodes[1].redisServer.store.GetAt(ctx, store.ListMetaKey([]byte("list-del-rpush")), readTS)
require.NoError(t, err)
meta, err := store.UnmarshalListMeta(metaRaw)
require.NoError(t, err)
require.Equal(t, int64(2), meta.Len)

kvs, err := nodes[1].redisServer.store.ScanAt(
ctx,
store.ListItemKey([]byte("list-del-rpush"), math.MinInt64),
store.ListItemKey([]byte("list-del-rpush"), math.MaxInt64),
10,
readTS,
)
// Verify list length via LLEN (uses resolveListMeta internally).
llenRes, err := rdb.Do(ctx, "LLEN", "list-del-rpush").Result()
require.NoError(t, err)
require.Len(t, kvs, 2)

got := make([]string, 0, len(kvs))
for _, kvp := range kvs {
got = append(got, string(kvp.Value))
}
require.Equal(t, []string{"new1", "new2"}, got)
require.Equal(t, int64(2), llenRes)
}
Loading
Loading