From a7742a183cbf9d4a4506caf726d52a94c0e478e4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 04:15:51 +0900 Subject: [PATCH 1/9] store: add read-set validation to ApplyMutations for SSI Add readKeys parameter to ApplyMutations so that the store validates both write-write and read-write conflicts under the apply lock. This upgrades transaction isolation from Snapshot Isolation (SI) to Serializable Snapshot Isolation (SSI), preventing write skew anomalies where concurrent transactions read overlapping keys but write disjoint ones. The read set collected in the Redis adapter txnContext is threaded through the coordinator, FSM, and proto layers into the store, where each read key is checked for commits after startTS. --- adapter/distribution_server_test.go | 2 +- adapter/redis.go | 6 +++- adapter/s3_test.go | 2 +- distribution/catalog.go | 2 +- kv/coordinator.go | 11 ++++---- kv/coordinator_txn_test.go | 8 +++--- kv/fsm.go | 20 ++++++------- kv/leader_routed_store.go | 4 +-- kv/shard_store.go | 4 +-- kv/sharded_coordinator.go | 10 +++---- kv/transcoder.go | 3 ++ proto/internal.pb.go | 42 ++++++++++++++++++++-------- proto/internal.proto | 5 ++++ store/compact_txn_test.go | 2 +- store/lsm_store.go | 18 +++++++++++- store/lsm_store_test.go | 4 +-- store/lsm_store_txn_test.go | 16 +++++------ store/mvcc_store.go | 7 ++++- store/mvcc_store_concurrency_test.go | 4 +-- store/mvcc_store_snapshot_test.go | 4 +-- store/store.go | 9 +++--- 21 files changed, 118 insertions(+), 65 deletions(-) diff --git a/adapter/distribution_server_test.go b/adapter/distribution_server_test.go index 58065d1a..3eb51d8f 100644 --- a/adapter/distribution_server_test.go +++ b/adapter/distribution_server_test.go @@ -664,7 +664,7 @@ func (s *distributionCoordinatorStub) applyDispatch( startTS uint64, commitTS uint64, ) error { - if err := s.store.ApplyMutations(ctx, mutations, startTS, commitTS); err != nil { + if err := s.store.ApplyMutations(ctx, mutations, nil, startTS, commitTS); err != nil { return err } if s.afterDispatch != nil { diff --git a/adapter/redis.go b/adapter/redis.go index 70a0f9ce..9b4c0c7a 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -1923,7 +1923,11 @@ func (t *txnContext) commit() error { return nil } - group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS} + readKeys := make([][]byte, 0, len(t.readKeys)) + for _, k := range t.readKeys { + readKeys = append(readKeys, k) + } + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS, ReadKeys: readKeys} ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout) defer cancel() if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil { diff --git a/adapter/s3_test.go b/adapter/s3_test.go index 8fc7f073..d88260f6 100644 --- a/adapter/s3_test.go +++ b/adapter/s3_test.go @@ -1916,7 +1916,7 @@ func TestS3Server_BackwardCompatibility_NoBucketAclFieldIsPrivate(t *testing.T) commitTS := coord.Clock().Next() err = st.ApplyMutations(context.Background(), []*store.KVPairMutation{ {Op: store.OpTypePut, Key: s3keys.BucketMetaKey("legacy-bucket"), Value: legacyJSON}, - }, commitTS-1, commitTS) + }, nil, commitTS-1, commitTS) require.NoError(t, err) // Create a server WITH credentials; the legacy bucket has no acl field. diff --git a/distribution/catalog.go b/distribution/catalog.go index 78e1931d..28883bbc 100644 --- a/distribution/catalog.go +++ b/distribution/catalog.go @@ -627,7 +627,7 @@ func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mu if err != nil { return err } - if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, commitTS); err != nil { + if err := s.store.ApplyMutations(ctx, mutations, nil, plan.readTS, commitTS); err != nil { if errors.Is(err, store.ErrWriteConflict) { return errors.WithStack(ErrCatalogVersionMismatch) } diff --git a/kv/coordinator.go b/kv/coordinator.go index 2fcfc8bb..930b6808 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -75,7 +75,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C } if reqs.IsTxn { - return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS) + return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS, reqs.ReadKeys) } return c.dispatchRaw(reqs.Elems) @@ -122,7 +122,7 @@ func (c *Coordinate) nextStartTS() uint64 { return c.clock.Next() } -func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64) (*CoordinateResponse, error) { +func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64, readKeys [][]byte) (*CoordinateResponse, error) { primary := primaryKeyForElems(reqs) if len(primary) == 0 { return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) @@ -144,7 +144,7 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint } r, err := c.transactionManager.Commit([]*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primary, reqs), + onePhaseTxnRequest(startTS, commitTS, primary, reqs, readKeys), }) if err != nil { return nil, errors.WithStack(err) @@ -258,7 +258,7 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C commitTS = 0 } requests = []*pb.Request{ - onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems), + onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, reqs.ReadKeys), } } else { for _, req := range reqs.Elems { @@ -318,7 +318,7 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation { panic("unreachable") } -func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request { +func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP], readKeys [][]byte) *pb.Request { muts := make([]*pb.Mutation, 0, len(reqs)+1) muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS)) for _, req := range reqs { @@ -329,6 +329,7 @@ func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Ele Phase: pb.Phase_NONE, Ts: startTS, Mutations: muts, + ReadKeys: readKeys, } } diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index 6ef319fd..e7159966 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -36,7 +36,7 @@ func TestCoordinateDispatchTxn_RejectsNonMonotonicCommitTS(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, startTS, 0) + }, startTS, 0, nil) require.ErrorIs(t, err, ErrTxnCommitTSRequired) require.Equal(t, 0, tx.commits) } @@ -52,7 +52,7 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: nil, Value: []byte("v")}, - }, 1, 0) + }, 1, 0, nil) require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired) require.Equal(t, 0, tx.commits) } @@ -70,7 +70,7 @@ func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("b"), Value: []byte("v1")}, {Op: Del, Key: []byte("x")}, - }, startTS, 0) + }, startTS, 0, nil) require.NoError(t, err) require.Equal(t, 1, tx.commits) require.Len(t, tx.reqs, 1) @@ -107,7 +107,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { commitTS := uint64(25) _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, startTS, commitTS) + }, startTS, commitTS, nil) require.NoError(t, err) require.Len(t, tx.reqs, 1) require.Len(t, tx.reqs[0], 1) diff --git a/kv/fsm.go b/kv/fsm.go index 0e817b49..c54673f3 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -184,7 +184,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui } // Raw requests always commit against the latest state; use commitTS as both // the validation snapshot and the commit timestamp. - return errors.WithStack(f.store.ApplyMutations(ctx, muts, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutations(ctx, muts, nil, commitTS, commitTS)) } // extractDelPrefix checks if the mutations contain a DEL_PREFIX operation. @@ -313,18 +313,16 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return err } - if err := f.store.ApplyMutations(ctx, storeMuts, startTS, startTS); err != nil { + if err := f.store.ApplyMutations(ctx, storeMuts, nil, startTS, startTS); err != nil { return errors.WithStack(err) } return nil } // handleOnePhaseTxnRequest applies a single-shard transaction atomically. -// The isolation level is Snapshot Isolation (SI): only write-write conflicts -// are detected via ApplyMutations. Read-write conflicts (write skew) are NOT -// prevented because the read-set is not tracked. Callers requiring -// Serializable Snapshot Isolation (SSI) must implement read-set validation -// at a higher layer. +// Both write-write and read-write conflicts are checked: the read set carried +// in r.ReadKeys is validated alongside the mutation keys inside +// ApplyMutations under the store's apply lock. func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error { meta, muts, err := extractTxnMeta(r.Mutations) if err != nil { @@ -350,7 +348,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com if err != nil { return err } - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS)) + return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) } func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { @@ -419,7 +417,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start // The secondary-shard LatestCommitTS scan is intentionally deferred to the // write-conflict path so the hot (first-time) commit path pays no extra cost. func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error { - err := f.store.ApplyMutations(ctx, storeMuts, applyStartTS, commitTS) + err := f.store.ApplyMutations(ctx, storeMuts, nil, applyStartTS, commitTS) if err == nil { return nil } @@ -436,7 +434,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut return errors.WithStack(lErr) } if exists && latestTS >= commitTS { - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, commitTS, commitTS)) } } return errors.WithStack(err) @@ -475,7 +473,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u if len(storeMuts) == 0 { return nil } - return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, abortTS)) + return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, startTS, abortTS)) } func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) { diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index 6202a76d..dbc60b6e 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -232,11 +232,11 @@ func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uin return s.proxyRawLatestCommitTS(ctx, key) } -func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error { +func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { if s == nil || s.local == nil { return errors.WithStack(store.ErrNotSupported) } - return errors.WithStack(s.local.ApplyMutations(ctx, mutations, startTS, commitTS)) + return errors.WithStack(s.local.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS)) } func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { diff --git a/kv/shard_store.go b/kv/shard_store.go index ef71966c..4c37e45a 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -1116,7 +1116,7 @@ func cleanupTSWithNow(startTS, now uint64) uint64 { // // All mutations must belong to the same shard. Cross-shard mutation batches are // not supported. -func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error { +func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { if len(mutations) == 0 { return nil } @@ -1135,7 +1135,7 @@ func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPa return errors.WithStack(ErrCrossShardMutationBatchNotSupported) } } - return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, startTS, commitTS)) + return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS)) } // DeletePrefixAt applies a prefix delete to every shard in the store. diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 2420aece..5e375d16 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -90,7 +90,7 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[ } if reqs.IsTxn { - return c.dispatchTxn(reqs.StartTS, reqs.CommitTS, reqs.Elems) + return c.dispatchTxn(reqs.StartTS, reqs.CommitTS, reqs.Elems, reqs.ReadKeys) } logs, err := c.requestLogs(reqs) @@ -193,7 +193,7 @@ func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*Coor return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil } -func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP]) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { grouped, gids, err := c.groupMutations(elems) if err != nil { return nil, err @@ -209,7 +209,7 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems } if len(gids) == 1 { - return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems) + return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems, readKeys) } prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids) @@ -241,13 +241,13 @@ func (c *ShardedCoordinator) resolveTxnCommitTS(startTS, commitTS uint64) (uint6 return commitTS, nil } -func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP]) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { g, err := c.txnGroupForID(gid) if err != nil { return nil, err } resp, err := g.Txn.Commit([]*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primaryKey, elems), + onePhaseTxnRequest(startTS, commitTS, primaryKey, elems, readKeys), }) if err != nil { return nil, errors.WithStack(err) diff --git a/kv/transcoder.go b/kv/transcoder.go index 37eb598d..3088e879 100644 --- a/kv/transcoder.go +++ b/kv/transcoder.go @@ -29,4 +29,7 @@ type OperationGroup[T OP] struct { // CommitTS optionally pins the transaction commit timestamp. // Coordinators choose one automatically when this is zero. CommitTS uint64 + // ReadKeys carries the transaction's read set so the FSM can validate + // read-write conflicts atomically with the commit. + ReadKeys [][]byte } diff --git a/proto/internal.pb.go b/proto/internal.pb.go index 7e4d6e1e..391c4407 100644 --- a/proto/internal.pb.go +++ b/proto/internal.pb.go @@ -25,8 +25,13 @@ const ( type Op int32 const ( - Op_PUT Op = 0 - Op_DEL Op = 1 + Op_PUT Op = 0 + Op_DEL Op = 1 + // DEL_PREFIX deletes all visible keys matching the prefix stored in `key`. + // An empty key means "all keys". Transaction-internal keys (!txn|) are + // always excluded. This operation is applied locally by the FSM on each + // node, so the Raft log contains only a single mutation regardless of how + // many keys are deleted. Op_DEL_PREFIX Op = 2 ) @@ -184,11 +189,16 @@ func (x *Mutation) GetValue() []byte { } type Request struct { - state protoimpl.MessageState `protogen:"open.v1"` - IsTxn bool `protobuf:"varint,1,opt,name=is_txn,json=isTxn,proto3" json:"is_txn,omitempty"` - Phase Phase `protobuf:"varint,2,opt,name=phase,proto3,enum=Phase" json:"phase,omitempty"` - Ts uint64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"` - Mutations []*Mutation `protobuf:"bytes,4,rep,name=mutations,proto3" json:"mutations,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + IsTxn bool `protobuf:"varint,1,opt,name=is_txn,json=isTxn,proto3" json:"is_txn,omitempty"` + Phase Phase `protobuf:"varint,2,opt,name=phase,proto3,enum=Phase" json:"phase,omitempty"` + Ts uint64 `protobuf:"varint,3,opt,name=ts,proto3" json:"ts,omitempty"` + Mutations []*Mutation `protobuf:"bytes,4,rep,name=mutations,proto3" json:"mutations,omitempty"` + // read_keys carries the transaction's read set so that the FSM can validate + // read-write conflicts atomically with the commit. Each entry is a storage + // key that was read during the transaction; the FSM checks that none of them + // were written after ts (the transaction's start timestamp). + ReadKeys [][]byte `protobuf:"bytes,5,rep,name=read_keys,json=readKeys,proto3" json:"read_keys,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -251,6 +261,13 @@ func (x *Request) GetMutations() []*Mutation { return nil } +func (x *Request) GetReadKeys() [][]byte { + if x != nil { + return x.ReadKeys + } + return nil +} + type RaftCommand struct { state protoimpl.MessageState `protogen:"open.v1"` Requests []*Request `protobuf:"bytes,1,rep,name=requests,proto3" json:"requests,omitempty"` @@ -504,12 +521,13 @@ const file_proto_internal_proto_rawDesc = "" + "\bMutation\x12\x13\n" + "\x02op\x18\x01 \x01(\x0e2\x03.OpR\x02op\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03key\x12\x14\n" + - "\x05value\x18\x03 \x01(\fR\x05value\"w\n" + + "\x05value\x18\x03 \x01(\fR\x05value\"\x94\x01\n" + "\aRequest\x12\x15\n" + "\x06is_txn\x18\x01 \x01(\bR\x05isTxn\x12\x1c\n" + "\x05phase\x18\x02 \x01(\x0e2\x06.PhaseR\x05phase\x12\x0e\n" + "\x02ts\x18\x03 \x01(\x04R\x02ts\x12'\n" + - "\tmutations\x18\x04 \x03(\v2\t.MutationR\tmutations\"3\n" + + "\tmutations\x18\x04 \x03(\v2\t.MutationR\tmutations\x12\x1b\n" + + "\tread_keys\x18\x05 \x03(\fR\breadKeys\"3\n" + "\vRaftCommand\x12$\n" + "\brequests\x18\x01 \x03(\v2\b.RequestR\brequests\"M\n" + "\x0eForwardRequest\x12\x15\n" + @@ -522,10 +540,12 @@ const file_proto_internal_proto_rawDesc = "" + "\achannel\x18\x01 \x01(\fR\achannel\x12\x18\n" + "\amessage\x18\x02 \x01(\fR\amessage\"8\n" + "\x14RelayPublishResponse\x12 \n" + - "\vsubscribers\x18\x01 \x01(\x03R\vsubscribers*\x16\n" + + "\vsubscribers\x18\x01 \x01(\x03R\vsubscribers*&\n" + "\x02Op\x12\a\n" + "\x03PUT\x10\x00\x12\a\n" + - "\x03DEL\x10\x01*5\n" + + "\x03DEL\x10\x01\x12\x0e\n" + + "\n" + + "DEL_PREFIX\x10\x02*5\n" + "\x05Phase\x12\b\n" + "\x04NONE\x10\x00\x12\v\n" + "\aPREPARE\x10\x01\x12\n" + diff --git a/proto/internal.proto b/proto/internal.proto index 9bd32575..f0cfc7ed 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -39,6 +39,11 @@ message Request { Phase phase = 2; uint64 ts = 3; repeated Mutation mutations = 4; + // read_keys carries the transaction's read set so that the FSM can validate + // read-write conflicts atomically with the commit. Each entry is a storage + // key that was read during the transaction; the FSM checks that none of them + // were written after ts (the transaction's start timestamp). + repeated bytes read_keys = 5; } message RaftCommand { diff --git a/store/compact_txn_test.go b/store/compact_txn_test.go index d7a39305..99a84a62 100644 --- a/store/compact_txn_test.go +++ b/store/compact_txn_test.go @@ -272,7 +272,7 @@ func TestPebbleStore_ApplyMutations_ConcurrentConflictDetection(t *testing.T) { err := s.ApplyMutations(ctx, []*KVPairMutation{ {Op: OpTypePut, Key: key, Value: []byte("val")}, - }, startTS, commitTS) + }, nil, startTS, commitTS) r := result{} if err == nil { r.success++ diff --git a/store/lsm_store.go b/store/lsm_store.go index 4e98c475..beac7354 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -861,6 +861,19 @@ func (s *pebbleStore) checkConflicts(ctx context.Context, mutations []*KVPairMut return nil } +func (s *pebbleStore) checkReadConflicts(_ context.Context, readKeys [][]byte, startTS uint64) error { + for _, key := range readKeys { + ts, exists, err := s.latestCommitTS(context.Background(), key) + if err != nil { + return err + } + if exists && ts > startTS { + return NewWriteConflictError(key) + } + } + return nil +} + func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMutation, commitTS uint64) error { for _, mut := range mutations { k := encodeKey(mut.Key, commitTS) @@ -884,7 +897,7 @@ func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMu return nil } -func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, startTS, commitTS uint64) error { +func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -899,6 +912,9 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut if err := s.checkConflicts(ctx, mutations, startTS); err != nil { return err } + if err := s.checkReadConflicts(ctx, readKeys, startTS); err != nil { + return err + } if err := s.applyMutationsBatch(b, mutations, commitTS); err != nil { return err diff --git a/store/lsm_store_test.go b/store/lsm_store_test.go index 744008fa..a5411da3 100644 --- a/store/lsm_store_test.go +++ b/store/lsm_store_test.go @@ -892,7 +892,7 @@ func TestPebbleStore_ApplyMutations_ValueTooLarge(t *testing.T) { oversized := make([]byte, maxSnapshotValueSize+1) err = s.ApplyMutations(ctx, []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k"), Value: oversized}, - }, 0, 1) + }, nil, 0, 1) require.Error(t, err) assert.ErrorIs(t, err, ErrValueTooLarge) } @@ -928,7 +928,7 @@ func TestMVCCStore_ApplyMutations_ValueTooLarge(t *testing.T) { oversized := make([]byte, maxSnapshotValueSize+1) err := s.ApplyMutations(ctx, []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k"), Value: oversized}, - }, 0, 1) + }, nil, 0, 1) require.Error(t, err) assert.ErrorIs(t, err, ErrValueTooLarge) } diff --git a/store/lsm_store_txn_test.go b/store/lsm_store_txn_test.go index 228ec714..d4806af6 100644 --- a/store/lsm_store_txn_test.go +++ b/store/lsm_store_txn_test.go @@ -26,7 +26,7 @@ func TestPebbleStore_ApplyMutations_BasicPut(t *testing.T) { {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, } - err = s.ApplyMutations(ctx, mutations, 0, 10) + err = s.ApplyMutations(ctx, mutations, nil, 0, 10) require.NoError(t, err) // Both keys should be readable at commitTS. @@ -58,7 +58,7 @@ func TestPebbleStore_ApplyMutations_Delete(t *testing.T) { mutations := []*KVPairMutation{ {Op: OpTypeDelete, Key: []byte("k1")}, } - err = s.ApplyMutations(ctx, mutations, 10, 20) + err = s.ApplyMutations(ctx, mutations, nil, 10, 20) require.NoError(t, err) // After the delete, key should be a tombstone. @@ -82,7 +82,7 @@ func TestPebbleStore_ApplyMutations_PutWithTTL(t *testing.T) { mutations := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1"), ExpireAt: 50}, } - err = s.ApplyMutations(ctx, mutations, 0, 10) + err = s.ApplyMutations(ctx, mutations, nil, 0, 10) require.NoError(t, err) // Visible before expiry. @@ -113,7 +113,7 @@ func TestPebbleStore_ApplyMutations_WriteConflict(t *testing.T) { mutations := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v2")}, } - err = s.ApplyMutations(ctx, mutations, 10, 30) + err = s.ApplyMutations(ctx, mutations, nil, 10, 30) require.Error(t, err) assert.True(t, errors.Is(err, ErrWriteConflict), "expected ErrWriteConflict, got %v", err) @@ -143,7 +143,7 @@ func TestPebbleStore_ApplyMutations_NoConflictWhenStartTSGECommit(t *testing.T) mutations := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v2")}, } - err = s.ApplyMutations(ctx, mutations, 10, 20) + err = s.ApplyMutations(ctx, mutations, nil, 10, 20) require.NoError(t, err) val, err := s.GetAt(ctx, []byte("k1"), 20) @@ -164,14 +164,14 @@ func TestPebbleStore_ApplyMutations_UpdatesLastCommitTS(t *testing.T) { mutations := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, } - require.NoError(t, s.ApplyMutations(ctx, mutations, 0, 100)) + require.NoError(t, s.ApplyMutations(ctx, mutations, nil, 0, 100)) assert.Equal(t, uint64(100), s.LastCommitTS()) // A second apply with a higher commitTS advances lastCommitTS. mutations2 := []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, } - require.NoError(t, s.ApplyMutations(ctx, mutations2, 100, 200)) + require.NoError(t, s.ApplyMutations(ctx, mutations2, nil, 100, 200)) assert.Equal(t, uint64(200), s.LastCommitTS()) } @@ -191,7 +191,7 @@ func TestPebbleStore_ApplyMutations_Atomicity(t *testing.T) { {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, } - err = s.ApplyMutations(ctx, mutations, 10, 60) + err = s.ApplyMutations(ctx, mutations, nil, 10, 60) require.Error(t, err) assert.True(t, errors.Is(err, ErrWriteConflict)) diff --git a/store/mvcc_store.go b/store/mvcc_store.go index 9e6ebdcb..d3135f32 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -475,7 +475,7 @@ func (s *mvccStore) LatestCommitTS(_ context.Context, key []byte) (uint64, bool, return ver.TS, true, nil } -func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, startTS, commitTS uint64) error { +func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -484,6 +484,11 @@ func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutat return NewWriteConflictError(mut.Key) } } + for _, key := range readKeys { + if latestVer, ok := s.latestVersionLocked(key); ok && latestVer.TS > startTS { + return NewWriteConflictError(key) + } + } commitTS = s.alignCommitTS(commitTS) diff --git a/store/mvcc_store_concurrency_test.go b/store/mvcc_store_concurrency_test.go index 71e62e0b..3c42ca13 100644 --- a/store/mvcc_store_concurrency_test.go +++ b/store/mvcc_store_concurrency_test.go @@ -278,7 +278,7 @@ func TestMVCCConcurrentApplyMutations(t *testing.T) { Value: []byte(fmt.Sprintf("g%d-r%d", goroutineID, i)), }, } - err := st.ApplyMutations(ctx, mutations, startTS, commitTS) + err := st.ApplyMutations(ctx, mutations, nil, startTS, commitTS) if err == nil { successCount.Add(1) continue @@ -349,7 +349,7 @@ func TestMVCCConcurrentApplyMutationsMultiKey(t *testing.T) { {Op: OpTypePut, Key: keyA, Value: []byte(fmt.Sprintf("a-g%d-r%d", goroutineID, i))}, {Op: OpTypePut, Key: keyB, Value: []byte(fmt.Sprintf("b-g%d-r%d", goroutineID, i))}, } - err := st.ApplyMutations(ctx, mutations, startTS, commitTS) + err := st.ApplyMutations(ctx, mutations, nil, startTS, commitTS) if err == nil { successCount.Add(1) continue diff --git a/store/mvcc_store_snapshot_test.go b/store/mvcc_store_snapshot_test.go index f82837e9..75ce1489 100644 --- a/store/mvcc_store_snapshot_test.go +++ b/store/mvcc_store_snapshot_test.go @@ -68,7 +68,7 @@ func TestMVCCStore_ApplyMutations_WriteConflict(t *testing.T) { err := st.ApplyMutations(ctx, []*KVPairMutation{ {Op: OpTypePut, Key: []byte("k"), Value: []byte("v2")}, - }, 10, 30) + }, nil, 10, 30) require.ErrorIs(t, err, ErrWriteConflict) } @@ -80,7 +80,7 @@ func TestMVCCStore_ApplyMutations_UnknownOp(t *testing.T) { err := st.ApplyMutations(ctx, []*KVPairMutation{ {Op: OpType(99), Key: []byte("k"), Value: []byte("v")}, - }, 10, 20) + }, nil, 10, 20) require.ErrorIs(t, err, ErrUnknownOp) } diff --git a/store/store.go b/store/store.go index c418804b..b7652326 100644 --- a/store/store.go +++ b/store/store.go @@ -129,10 +129,11 @@ type MVCCStore interface { // The boolean reports whether the key has any version. LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error) // ApplyMutations atomically validates and appends the provided mutations. - // It must return ErrWriteConflict if any key has a newer commit timestamp - // than startTS. Note: only write-write conflicts are detected (Snapshot - // Isolation). Read-write conflicts (write skew) are not prevented. - ApplyMutations(ctx context.Context, mutations []*KVPairMutation, startTS, commitTS uint64) error + // It must return ErrWriteConflict if any mutation key or any read key has + // a newer commit timestamp than startTS. readKeys carries the transaction's + // read set for read-write conflict detection; pass nil when no read set + // validation is needed. + ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error // DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired) // keys matching prefix at commitTS by writing tombstone versions. An empty // prefix means "all keys". Keys matching excludePrefix are preserved. From 48a0ebe5308485dc1bd5ac18576fa07228ae94bf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 04:28:49 +0900 Subject: [PATCH 2/9] fix: address code review feedback on SSI read-set validation - Thread readKeys through multi-shard PREPARE phase so cross-shard transactions also validate read-write conflicts (Concurrency/Consistency review) - Remove redundant validateReadSet call in Redis adapter; the store-layer check under applyMu is sufficient and avoids double LatestCommitTS I/O (Performance review) - Extract checkConflictsLocked in mvcc_store to reduce ApplyMutations cyclomatic complexity below the cyclop threshold (Lint review) - Add dedicated read-write conflict tests: conflict detected, no conflict when read key is unchanged, nonexistent read key, and nil readKeys backward compatibility (Test Coverage review) --- adapter/redis.go | 6 +-- kv/fsm.go | 2 +- kv/sharded_coordinator.go | 5 +- store/lsm_store_txn_test.go | 94 +++++++++++++++++++++++++++++++++++++ store/mvcc_store.go | 25 ++++++---- 5 files changed, 117 insertions(+), 15 deletions(-) diff --git a/adapter/redis.go b/adapter/redis.go index 9b4c0c7a..b62f1a2f 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -2102,9 +2102,9 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err nextResults = append(nextResults, res) } - if err := txn.validateReadSet(dispatchCtx); err != nil { - return err - } + // Read-set validation is performed inside ApplyMutations under the + // store's apply lock, so a separate validateReadSet call here would + // be redundant and double the LatestCommitTS I/O. if err := txn.commit(); err != nil { return err } diff --git a/kv/fsm.go b/kv/fsm.go index c54673f3..b7d3473b 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -313,7 +313,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return err } - if err := f.store.ApplyMutations(ctx, storeMuts, nil, startTS, startTS); err != nil { + if err := f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { return errors.WithStack(err) } return nil diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 5e375d16..e0dadf67 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -212,7 +212,7 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems, readKeys) } - prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids) + prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids, readKeys) if err != nil { return nil, err } @@ -263,7 +263,7 @@ type preparedGroup struct { keys []*pb.Mutation } -func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64) ([]preparedGroup, error) { +func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, readKeys [][]byte) ([]preparedGroup, error) { prepareMeta := txnMetaMutation(primaryKey, defaultTxnLockTTLms, 0) prepared := make([]preparedGroup, 0, len(gids)) @@ -277,6 +277,7 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey [] Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: append([]*pb.Mutation{prepareMeta}, grouped[gid]...), + ReadKeys: readKeys, } if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil { c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS)) diff --git a/store/lsm_store_txn_test.go b/store/lsm_store_txn_test.go index d4806af6..ef827834 100644 --- a/store/lsm_store_txn_test.go +++ b/store/lsm_store_txn_test.go @@ -464,3 +464,97 @@ func TestPebbleStore_Compact_MultipleKeys(t *testing.T) { require.NoError(t, err) assert.Equal(t, []byte("k2v8"), val) } + +// --------------------------------------------------------------------------- +// Read-write conflict detection (SSI) +// --------------------------------------------------------------------------- + +func TestPebbleStore_ApplyMutations_ReadConflict(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // Commit k1 at TS=20. + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 20, 0)) + + // A transaction that started at TS=10 writes k2 but reads k1. + // k1 was committed at TS=20 > startTS=10 → read-write conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 30) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrWriteConflict)) + + conflictKey, ok := WriteConflictKey(err) + assert.True(t, ok) + assert.Equal(t, []byte("k1"), conflictKey) + + // k2 should NOT have been written. + _, err = s.GetAt(ctx, []byte("k2"), 30) + assert.ErrorIs(t, err, ErrKeyNotFound) +} + +func TestPebbleStore_ApplyMutations_ReadConflict_NoConflictWhenReadKeyUnchanged(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // Commit k1 at TS=5. + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 5, 0)) + + // Transaction starts at TS=10. k1 was committed at TS=5 <= startTS=10 → no conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 20) + require.NoError(t, err) + + val, err := s.GetAt(ctx, []byte("k2"), 20) + require.NoError(t, err) + assert.Equal(t, []byte("v2"), val) +} + +func TestPebbleStore_ApplyMutations_ReadConflict_NonexistentReadKey(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // Reading a key that doesn't exist should not cause a conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("nonexistent")}, 10, 20) + require.NoError(t, err) + + val, err := s.GetAt(ctx, []byte("k1"), 20) + require.NoError(t, err) + assert.Equal(t, []byte("v1"), val) +} + +func TestPebbleStore_ApplyMutations_ReadConflict_NilReadKeys(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // Passing nil readKeys should behave as before (write-write only). + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 20, 0)) + + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err = s.ApplyMutations(ctx, mutations, nil, 10, 30) + require.NoError(t, err) +} diff --git a/store/mvcc_store.go b/store/mvcc_store.go index d3135f32..18a9638f 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -479,15 +479,8 @@ func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutat s.mtx.Lock() defer s.mtx.Unlock() - for _, mut := range mutations { - if latestVer, ok := s.latestVersionLocked(mut.Key); ok && latestVer.TS > startTS { - return NewWriteConflictError(mut.Key) - } - } - for _, key := range readKeys { - if latestVer, ok := s.latestVersionLocked(key); ok && latestVer.TS > startTS { - return NewWriteConflictError(key) - } + if err := s.checkConflictsLocked(mutations, readKeys, startTS); err != nil { + return err } commitTS = s.alignCommitTS(commitTS) @@ -514,6 +507,20 @@ func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutat return nil } +func (s *mvccStore) checkConflictsLocked(mutations []*KVPairMutation, readKeys [][]byte, startTS uint64) error { + for _, mut := range mutations { + if latestVer, ok := s.latestVersionLocked(mut.Key); ok && latestVer.TS > startTS { + return NewWriteConflictError(mut.Key) + } + } + for _, key := range readKeys { + if latestVer, ok := s.latestVersionLocked(key); ok && latestVer.TS > startTS { + return NewWriteConflictError(key) + } + } + return nil +} + // DeletePrefixAt deletes all visible keys matching prefix by writing tombstones // at commitTS. An empty prefix deletes all keys. Keys matching excludePrefix // are preserved. From a9621016929c50ef1980eeeac98bdfa7ee3afec4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 04:37:42 +0900 Subject: [PATCH 3/9] fix: address second review round - Fix checkReadConflicts to propagate ctx instead of context.Background() (Concurrency review) - Route readKeys by shard ID in prewriteTxn so each shard only validates read keys that belong to it, and read keys on non-write shards are correctly checked by their owning shard (Consistency review) - Add mvccStore read-write conflict tests mirroring the pebbleStore coverage: conflict detected, no conflict when unchanged, nonexistent key, nil readKeys (Test Coverage review) --- kv/sharded_coordinator.go | 19 ++++++++- store/lsm_store.go | 4 +- store/mvcc_store_concurrency_test.go | 61 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index e0dadf67..d9e01367 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -267,6 +267,8 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey [] prepareMeta := txnMetaMutation(primaryKey, defaultTxnLockTTLms, 0) prepared := make([]preparedGroup, 0, len(gids)) + groupedReadKeys := c.groupReadKeysByShardID(readKeys) + for _, gid := range gids { g, err := c.txnGroupForID(gid) if err != nil { @@ -277,7 +279,7 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey [] Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: append([]*pb.Mutation{prepareMeta}, grouped[gid]...), - ReadKeys: readKeys, + ReadKeys: groupedReadKeys[gid], } if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil { c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS)) @@ -587,6 +589,21 @@ func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { return route.GroupID } +func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte { + if len(readKeys) == 0 { + return nil + } + grouped := make(map[uint64][][]byte) + for _, key := range readKeys { + gid := c.engineGroupIDForKey(key) + if gid == 0 { + continue + } + grouped[gid] = append(grouped[gid], key) + } + return grouped +} + var _ Coordinator = (*ShardedCoordinator)(nil) func validateOperationGroup(reqs *OperationGroup[OP]) error { diff --git a/store/lsm_store.go b/store/lsm_store.go index beac7354..1372a6a2 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -861,9 +861,9 @@ func (s *pebbleStore) checkConflicts(ctx context.Context, mutations []*KVPairMut return nil } -func (s *pebbleStore) checkReadConflicts(_ context.Context, readKeys [][]byte, startTS uint64) error { +func (s *pebbleStore) checkReadConflicts(ctx context.Context, readKeys [][]byte, startTS uint64) error { for _, key := range readKeys { - ts, exists, err := s.latestCommitTS(context.Background(), key) + ts, exists, err := s.latestCommitTS(ctx, key) if err != nil { return err } diff --git a/store/mvcc_store_concurrency_test.go b/store/mvcc_store_concurrency_test.go index 3c42ca13..aa98bd72 100644 --- a/store/mvcc_store_concurrency_test.go +++ b/store/mvcc_store_concurrency_test.go @@ -572,3 +572,64 @@ func TestMVCCConcurrentScanAtSnapshotConsistency(t *testing.T) { require.NoError(t, err) } } + +// --------------------------------------------------------------------------- +// Read-write conflict detection (SSI) for mvccStore +// --------------------------------------------------------------------------- + +func TestMVCCStore_ApplyMutations_ReadConflict(t *testing.T) { + ctx := context.Background() + st := newTestMVCCStore(t) + + // Commit k1 at TS=20. + require.NoError(t, st.PutAt(ctx, []byte("k1"), []byte("v1"), 20, 0)) + + // Write k2 but read k1. k1 committed at 20 > startTS=10 → conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err := st.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 30) + require.Error(t, err) + require.True(t, errors.Is(err, ErrWriteConflict)) +} + +func TestMVCCStore_ApplyMutations_ReadConflict_NoConflictWhenUnchanged(t *testing.T) { + ctx := context.Background() + st := newTestMVCCStore(t) + + // Commit k1 at TS=5. + require.NoError(t, st.PutAt(ctx, []byte("k1"), []byte("v1"), 5, 0)) + + // startTS=10 >= k1's commit(5) → no conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err := st.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 20) + require.NoError(t, err) +} + +func TestMVCCStore_ApplyMutations_ReadConflict_NonexistentKey(t *testing.T) { + ctx := context.Background() + st := newTestMVCCStore(t) + + // Reading a nonexistent key should not conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + } + err := st.ApplyMutations(ctx, mutations, [][]byte{[]byte("ghost")}, 10, 20) + require.NoError(t, err) +} + +func TestMVCCStore_ApplyMutations_ReadConflict_NilReadKeys(t *testing.T) { + ctx := context.Background() + st := newTestMVCCStore(t) + + require.NoError(t, st.PutAt(ctx, []byte("k1"), []byte("v1"), 20, 0)) + + // nil readKeys → only write-write check; k2 write has no conflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + err := st.ApplyMutations(ctx, mutations, nil, 10, 30) + require.NoError(t, err) +} From 07ef088fe9495949068d17908fd530f054bfaa9a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 04:45:33 +0900 Subject: [PATCH 4/9] fix: validate read keys on read-only shards in multi-shard txn Add validateReadOnlyShards to send read-set validation requests to shards that have read keys but no mutations. Previously, a read key on shard A would go unchecked if the transaction only wrote to shard B, allowing write skew across shards. Also fix checkReadConflicts to propagate the caller context (already addressed in prior commit for the function signature, this ensures consistent usage). Add partial-conflict test case where only some read keys conflict. --- kv/sharded_coordinator.go | 41 +++++++++++++++++++++++++++++++++++++ store/lsm_store_txn_test.go | 25 ++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index d9e01367..9143b691 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -288,6 +288,14 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey [] prepared = append(prepared, preparedGroup{gid: gid, keys: keyMutations(grouped[gid])}) } + // Validate read keys on read-only shards (shards that have read keys + // but no mutations in this transaction). Without this, a concurrent + // write to a read-only shard would go undetected. + if err := c.validateReadOnlyShards(groupedReadKeys, gids, startTS); err != nil { + c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS)) + return nil, err + } + return prepared, nil } @@ -604,6 +612,39 @@ func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint6 return grouped } +// validateReadOnlyShards checks read-write conflicts on shards that have +// read keys but no mutations in this transaction. writeGIDs is the set of +// shards that already received a PREPARE with their readKeys attached. +func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][][]byte, writeGIDs []uint64, startTS uint64) error { + if len(groupedReadKeys) == 0 { + return nil + } + writeSet := make(map[uint64]struct{}, len(writeGIDs)) + for _, gid := range writeGIDs { + writeSet[gid] = struct{}{} + } + for gid, keys := range groupedReadKeys { + if _, isWrite := writeSet[gid]; isWrite { + continue + } + g, ok := c.groups[gid] + if !ok { + continue + } + // Send a read-only validation request: no mutations, only readKeys. + req := &pb.Request{ + IsTxn: true, + Phase: pb.Phase_PREPARE, + Ts: startTS, + ReadKeys: keys, + } + if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil { + return errors.WithStack(err) + } + } + return nil +} + var _ Coordinator = (*ShardedCoordinator)(nil) func validateOperationGroup(reqs *OperationGroup[OP]) error { diff --git a/store/lsm_store_txn_test.go b/store/lsm_store_txn_test.go index ef827834..feb2eb1d 100644 --- a/store/lsm_store_txn_test.go +++ b/store/lsm_store_txn_test.go @@ -558,3 +558,28 @@ func TestPebbleStore_ApplyMutations_ReadConflict_NilReadKeys(t *testing.T) { err = s.ApplyMutations(ctx, mutations, nil, 10, 30) require.NoError(t, err) } + +func TestPebbleStore_ApplyMutations_ReadConflict_PartialConflict(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // k1 committed at TS=5 (before startTS), k2 committed at TS=20 (after startTS). + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 5, 0)) + require.NoError(t, s.PutAt(ctx, []byte("k2"), []byte("v2"), 20, 0)) + + // Reading both k1 and k2: k1 is fine but k2 conflicts. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k3"), Value: []byte("v3")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1"), []byte("k2")}, 10, 30) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrWriteConflict)) + + conflictKey, ok := WriteConflictKey(err) + assert.True(t, ok) + assert.Equal(t, []byte("k2"), conflictKey) +} From da3e2a27ae6caf200f252c857d41a0231d5b2ab5 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 04:54:45 +0900 Subject: [PATCH 5/9] fix: use direct store check for read-only shard validation The previous approach sent a PREPARE request with empty mutations to read-only shards, but handlePrepareRequest rejects requests without mutations (extractTxnMeta returns error). Instead, validate read keys directly against each shard's store via LatestCommitTS. --- kv/sharded_coordinator.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 9143b691..0599efd0 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -615,6 +615,10 @@ func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint6 // validateReadOnlyShards checks read-write conflicts on shards that have // read keys but no mutations in this transaction. writeGIDs is the set of // shards that already received a PREPARE with their readKeys attached. +// +// Because these shards have no mutations, we cannot send a PREPARE request +// (the FSM rejects empty mutation lists). Instead we validate directly +// against each shard's store by checking LatestCommitTS for each read key. func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][][]byte, writeGIDs []uint64, startTS uint64) error { if len(groupedReadKeys) == 0 { return nil @@ -623,6 +627,7 @@ func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][ for _, gid := range writeGIDs { writeSet[gid] = struct{}{} } + ctx := context.Background() for gid, keys := range groupedReadKeys { if _, isWrite := writeSet[gid]; isWrite { continue @@ -631,15 +636,14 @@ func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][ if !ok { continue } - // Send a read-only validation request: no mutations, only readKeys. - req := &pb.Request{ - IsTxn: true, - Phase: pb.Phase_PREPARE, - Ts: startTS, - ReadKeys: keys, - } - if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil { - return errors.WithStack(err) + for _, key := range keys { + ts, exists, err := g.Store.LatestCommitTS(ctx, key) + if err != nil { + return errors.WithStack(err) + } + if exists && ts > startTS { + return errors.WithStack(store.NewWriteConflictError(key)) + } } } return nil From 068c6da565a4ac3bbcb2e0681cdf17ba3c6bd10f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 14:47:39 +0900 Subject: [PATCH 6/9] fix: add linearizable read barrier for read-only shard validation Address critical review: validateReadOnlyShards was reading the local store directly, bypassing Raft linearization. A concurrent write that was committed in Raft but not yet applied to the FSM would be invisible. Fix: issue a linearizable read barrier (LinearizableRead) on each read-only shard before checking LatestCommitTS, ensuring the FSM has applied all committed entries. Also: - Propagate ctx through dispatchTxn/prewriteTxn/validateReadOnlyShards instead of using context.Background() (Medium review) - Document the remaining TOCTOU limitation for read-only shards in multi-shard transactions (the check is outside applyMu) - Add test: onePhaseTxnRequest includes ReadKeys in proto request - Add tests: readKey overlapping writeKey (conflict and no-conflict) --- kv/coordinator_txn_test.go | 19 ++++++++++++++++++ kv/sharded_coordinator.go | 34 +++++++++++++++++++++++--------- store/lsm_store_txn_test.go | 39 +++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index e7159966..98f70c25 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -116,3 +116,22 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { require.NoError(t, err) require.Equal(t, commitTS, meta.CommitTS) } + +func TestCoordinateDispatchTxn_ReadKeysInRequest(t *testing.T) { + t.Parallel() + + tx := &stubTransactional{} + c := &Coordinate{ + transactionManager: tx, + clock: NewHLC(), + } + + readKeys := [][]byte{[]byte("rk1"), []byte("rk2")} + _, err := c.dispatchTxn([]*Elem[OP]{ + {Op: Put, Key: []byte("k"), Value: []byte("v")}, + }, 10, 0, readKeys) + require.NoError(t, err) + require.Len(t, tx.reqs, 1) + require.Len(t, tx.reqs[0], 1) + require.Equal(t, readKeys, tx.reqs[0][0].ReadKeys) +} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 0599efd0..ae77f923 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -90,7 +90,7 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[ } if reqs.IsTxn { - return c.dispatchTxn(reqs.StartTS, reqs.CommitTS, reqs.Elems, reqs.ReadKeys) + return c.dispatchTxn(ctx, reqs.StartTS, reqs.CommitTS, reqs.Elems, reqs.ReadKeys) } logs, err := c.requestLogs(reqs) @@ -193,7 +193,7 @@ func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*Coor return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil } -func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, commitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { grouped, gids, err := c.groupMutations(elems) if err != nil { return nil, err @@ -212,7 +212,7 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems, readKeys) } - prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids, readKeys) + prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, readKeys) if err != nil { return nil, err } @@ -263,7 +263,7 @@ type preparedGroup struct { keys []*pb.Mutation } -func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, readKeys [][]byte) ([]preparedGroup, error) { +func (c *ShardedCoordinator) prewriteTxn(ctx context.Context, startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, readKeys [][]byte) ([]preparedGroup, error) { prepareMeta := txnMetaMutation(primaryKey, defaultTxnLockTTLms, 0) prepared := make([]preparedGroup, 0, len(gids)) @@ -291,7 +291,7 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey [] // Validate read keys on read-only shards (shards that have read keys // but no mutations in this transaction). Without this, a concurrent // write to a read-only shard would go undetected. - if err := c.validateReadOnlyShards(groupedReadKeys, gids, startTS); err != nil { + if err := c.validateReadOnlyShards(ctx, groupedReadKeys, gids, startTS); err != nil { c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS)) return nil, err } @@ -617,9 +617,19 @@ func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint6 // shards that already received a PREPARE with their readKeys attached. // // Because these shards have no mutations, we cannot send a PREPARE request -// (the FSM rejects empty mutation lists). Instead we validate directly -// against each shard's store by checking LatestCommitTS for each read key. -func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][][]byte, writeGIDs []uint64, startTS uint64) error { +// (the FSM rejects empty mutation lists). Instead we issue a linearizable +// read barrier on each read-only shard's Raft group (ensuring the local +// FSM has applied all committed log entries) and then check LatestCommitTS +// against the local store. +// +// NOTE: This check is performed outside the FSM's applyMu lock, so there +// is a small TOCTOU window between the linearizable read barrier and the +// LatestCommitTS check. A concurrent write that commits in this window may +// go undetected. Full SSI for read-only shards in multi-shard transactions +// would require a dedicated "read-validate" FSM request phase. For +// single-shard transactions and write-shard read keys, validation is fully +// atomic under applyMu. +func (c *ShardedCoordinator) validateReadOnlyShards(ctx context.Context, groupedReadKeys map[uint64][][]byte, writeGIDs []uint64, startTS uint64) error { if len(groupedReadKeys) == 0 { return nil } @@ -627,7 +637,6 @@ func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][ for _, gid := range writeGIDs { writeSet[gid] = struct{}{} } - ctx := context.Background() for gid, keys := range groupedReadKeys { if _, isWrite := writeSet[gid]; isWrite { continue @@ -636,6 +645,13 @@ func (c *ShardedCoordinator) validateReadOnlyShards(groupedReadKeys map[uint64][ if !ok { continue } + // Linearizable read barrier: wait until the shard's FSM has applied + // all Raft-committed entries so LatestCommitTS reflects the latest + // committed state. Without this, a concurrent write that is committed + // in Raft but not yet applied locally would be invisible. + if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil { + return errors.WithStack(err) + } for _, key := range keys { ts, exists, err := g.Store.LatestCommitTS(ctx, key) if err != nil { diff --git a/store/lsm_store_txn_test.go b/store/lsm_store_txn_test.go index feb2eb1d..be985cc8 100644 --- a/store/lsm_store_txn_test.go +++ b/store/lsm_store_txn_test.go @@ -583,3 +583,42 @@ func TestPebbleStore_ApplyMutations_ReadConflict_PartialConflict(t *testing.T) { assert.True(t, ok) assert.Equal(t, []byte("k2"), conflictKey) } + +func TestPebbleStore_ApplyMutations_ReadKeyOverlapsWriteKey(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // k1 committed at TS=20 (after startTS=10). + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 20, 0)) + + // Both writing and reading k1. The write-write check detects the conflict + // before the read-write check runs; either way the result is ErrWriteConflict. + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v2")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 30) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrWriteConflict)) +} + +func TestPebbleStore_ApplyMutations_ReadKeyOverlapsWriteKey_NoConflict(t *testing.T) { + dir := t.TempDir() + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // k1 committed at TS=5 (before startTS=10). No conflict expected. + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 5, 0)) + + mutations := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v2")}, + } + err = s.ApplyMutations(ctx, mutations, [][]byte{[]byte("k1")}, 10, 20) + require.NoError(t, err) +} From 17cbfbd77879e7f18a9caf95fa3e249ee089c3a3 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 22:46:50 +0900 Subject: [PATCH 7/9] fix: validate read set pre-Raft for single-shard transactions Jepsen detected G2-item-realtime and PL-1 cycle violations caused by read-set validation inside the FSM (post-Raft-commit). When the FSM rejects a transaction due to a read conflict, the retry gets a new timestamp, shifting its position in the log and breaking realtime ordering of appends. Move read-set validation back to the adapter layer (pre-Raft) for single-shard transactions so that retries happen before the write enters the Raft log, preserving realtime ordering. Multi-shard PREPARE keeps FSM-level readKeys validation since PREPARE failure triggers a clean abort-and-retry cycle that does not suffer from the same ordering issue. --- adapter/redis.go | 6 +++--- kv/coordinator.go | 9 +++++++-- kv/coordinator_txn_test.go | 7 +++++-- kv/sharded_coordinator.go | 3 ++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/adapter/redis.go b/adapter/redis.go index b62f1a2f..9b4c0c7a 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -2102,9 +2102,9 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err nextResults = append(nextResults, res) } - // Read-set validation is performed inside ApplyMutations under the - // store's apply lock, so a separate validateReadSet call here would - // be redundant and double the LatestCommitTS I/O. + if err := txn.validateReadSet(dispatchCtx); err != nil { + return err + } if err := txn.commit(); err != nil { return err } diff --git a/kv/coordinator.go b/kv/coordinator.go index 930b6808..55dd7b9c 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -143,8 +143,13 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint return nil, errors.WithStack(ErrTxnCommitTSRequired) } + // Read-set validation for single-shard transactions is performed by the + // adapter BEFORE Raft submission (validateReadSet). Passing readKeys + // into the Raft log would cause the FSM to reject transactions after + // they are already committed in the log, forcing retries at a later + // timestamp and breaking realtime ordering of appends. r, err := c.transactionManager.Commit([]*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primary, reqs, readKeys), + onePhaseTxnRequest(startTS, commitTS, primary, reqs, nil), }) if err != nil { return nil, errors.WithStack(err) @@ -258,7 +263,7 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C commitTS = 0 } requests = []*pb.Request{ - onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, reqs.ReadKeys), + onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, nil), } } else { for _, req := range reqs.Elems { diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index 98f70c25..8f1e7ee5 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -117,7 +117,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { require.Equal(t, commitTS, meta.CommitTS) } -func TestCoordinateDispatchTxn_ReadKeysInRequest(t *testing.T) { +func TestCoordinateDispatchTxn_ReadKeysNotInOnePhaseTxnRequest(t *testing.T) { t.Parallel() tx := &stubTransactional{} @@ -126,6 +126,9 @@ func TestCoordinateDispatchTxn_ReadKeysInRequest(t *testing.T) { clock: NewHLC(), } + // Single-shard transactions validate read keys pre-Raft (in the adapter), + // so readKeys must NOT be included in the Raft log entry to avoid + // post-commit rejections that break realtime ordering. readKeys := [][]byte{[]byte("rk1"), []byte("rk2")} _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, @@ -133,5 +136,5 @@ func TestCoordinateDispatchTxn_ReadKeysInRequest(t *testing.T) { require.NoError(t, err) require.Len(t, tx.reqs, 1) require.Len(t, tx.reqs[0], 1) - require.Equal(t, readKeys, tx.reqs[0][0].ReadKeys) + require.Nil(t, tx.reqs[0][0].ReadKeys) } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index ae77f923..54de2319 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -246,8 +246,9 @@ func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, pr if err != nil { return nil, err } + // Single-shard: read-set validated pre-Raft by the adapter. resp, err := g.Txn.Commit([]*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primaryKey, elems, readKeys), + onePhaseTxnRequest(startTS, commitTS, primaryKey, elems, nil), }) if err != nil { return nil, errors.WithStack(err) From 71975866e65bb3842233f03ccbc48b92b0b170e5 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Mon, 13 Apr 2026 23:53:25 +0900 Subject: [PATCH 8/9] test: add comprehensive SSI tests and document isolation guarantees Add tests for previously untested SSI functions: - groupReadKeysByShardID: nil/empty, multi-shard grouping, unroutable keys - validateReadOnlyShards: conflict detection, write-shard skip, no-conflict, error propagation, empty input - Cross-shard PREPARE readKey routing verification - Single-shard readKeys omitted from Raft entry - readKey overlapping writeKey (conflict and no-conflict) - onePhaseTxnRequest ReadKeys propagation Document isolation guarantees in ApplyMutations interface comment: - Single-shard: pre-Raft validation with narrow TOCTOU window - Multi-shard write shards: full SSI under applyMu - Multi-shard read-only shards: linearizable barrier + TOCTOU window --- kv/sharded_coordinator_txn_test.go | 257 +++++++++++++++++++++++++++++ store/store.go | 19 +++ 2 files changed, 276 insertions(+) diff --git a/kv/sharded_coordinator_txn_test.go b/kv/sharded_coordinator_txn_test.go index c6566bc6..6310a9b5 100644 --- a/kv/sharded_coordinator_txn_test.go +++ b/kv/sharded_coordinator_txn_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/internal/raftengine" pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" ) @@ -297,3 +299,258 @@ func TestCommitSecondaryWithRetry_ExhaustsRetries(t *testing.T) { require.Error(t, err) require.Len(t, txn.requests, txnSecondaryCommitRetryAttempts) } + +// --------------------------------------------------------------------------- +// groupReadKeysByShardID +// --------------------------------------------------------------------------- + +func TestGroupReadKeysByShardID_NilReturnsNil(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) + require.Nil(t, coord.groupReadKeysByShardID(nil)) +} + +func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) + require.Nil(t, coord.groupReadKeysByShardID([][]byte{})) +} + +func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}, 2: {}}, 1, NewHLC(), nil) + + grouped := coord.groupReadKeysByShardID([][]byte{ + []byte("b"), // shard 1 + []byte("c"), // shard 1 + []byte("x"), // shard 2 + }) + require.Len(t, grouped, 2) + require.Len(t, grouped[1], 2) + require.Equal(t, []byte("b"), grouped[1][0]) + require.Equal(t, []byte("c"), grouped[1][1]) + require.Len(t, grouped[2], 1) + require.Equal(t, []byte("x"), grouped[2][0]) +} + +func TestGroupReadKeysByShardID_SkipsUnroutableKeys(t *testing.T) { + t.Parallel() + // Only route "a"-"m" to shard 1. Keys outside this range are unroutable. + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) + + grouped := coord.groupReadKeysByShardID([][]byte{ + []byte("b"), // routable → shard 1 + []byte("zzz"), // unroutable → skipped + }) + require.Len(t, grouped, 1) + require.Len(t, grouped[1], 1) + require.Equal(t, []byte("b"), grouped[1][0]) +} + +// --------------------------------------------------------------------------- +// validateReadOnlyShards +// --------------------------------------------------------------------------- + +// stubMVCCStore wraps a real MVCCStore to inject controlled LatestCommitTS. +type stubMVCCStore struct { + store.MVCCStore + latestTS map[string]uint64 + returnErr error +} + +func (s *stubMVCCStore) LatestCommitTS(_ context.Context, key []byte) (uint64, bool, error) { + if s.returnErr != nil { + return 0, false, s.returnErr + } + ts, ok := s.latestTS[string(key)] + return ts, ok, nil +} + +// noopEngine satisfies raftengine.Engine for unit tests. +// LinearizableRead returns immediately (simulates an already-up-to-date FSM). +type noopEngine struct{} + +func (noopEngine) Propose(_ context.Context, _ []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (noopEngine) State() raftengine.State { return raftengine.StateLeader } +func (noopEngine) Leader() raftengine.LeaderInfo { return raftengine.LeaderInfo{} } +func (noopEngine) VerifyLeader(_ context.Context) error { return nil } +func (noopEngine) LinearizableRead(_ context.Context) (uint64, error) { return 0, nil } +func (noopEngine) Status() raftengine.Status { return raftengine.Status{} } +func (noopEngine) Configuration(_ context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (noopEngine) Close() error { return nil } + +func TestValidateReadOnlyShards_DetectsConflictOnReadOnlyShard(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + readOnlyStore := &stubMVCCStore{latestTS: map[string]uint64{ + "x": 20, // committed at TS=20 + }} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {}, + 2: {Store: readOnlyStore, Engine: noopEngine{}}, + }, 1, NewHLC(), nil) + + groupedReadKeys := map[uint64][][]byte{ + 2: {[]byte("x")}, + } + // shard 2 is read-only (not in writeGIDs), key "x" committed at 20 > startTS 10 + err := coord.validateReadOnlyShards(context.Background(), groupedReadKeys, []uint64{1}, 10) + require.Error(t, err) + require.ErrorIs(t, err, store.ErrWriteConflict) +} + +func TestValidateReadOnlyShards_SkipsWriteShards(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + // shard 1 has a conflicting key, but it's a write shard — should be skipped + writeStore := &stubMVCCStore{latestTS: map[string]uint64{ + "b": 20, + }} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Store: writeStore, Engine: noopEngine{}}, + 2: {}, + }, 1, NewHLC(), nil) + + groupedReadKeys := map[uint64][][]byte{ + 1: {[]byte("b")}, // write shard → skipped + } + err := coord.validateReadOnlyShards(context.Background(), groupedReadKeys, []uint64{1}, 10) + require.NoError(t, err) +} + +func TestValidateReadOnlyShards_NoConflictWhenKeyUnchanged(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + readOnlyStore := &stubMVCCStore{latestTS: map[string]uint64{ + "x": 5, // committed at TS=5 <= startTS=10 + }} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {}, + 2: {Store: readOnlyStore, Engine: noopEngine{}}, + }, 1, NewHLC(), nil) + + groupedReadKeys := map[uint64][][]byte{ + 2: {[]byte("x")}, + } + err := coord.validateReadOnlyShards(context.Background(), groupedReadKeys, []uint64{1}, 10) + require.NoError(t, err) +} + +func TestValidateReadOnlyShards_PropagatesStoreError(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + storeErr := errors.New("disk I/O error") + readOnlyStore := &stubMVCCStore{returnErr: storeErr} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {}, + 2: {Store: readOnlyStore, Engine: noopEngine{}}, + }, 1, NewHLC(), nil) + + groupedReadKeys := map[uint64][][]byte{ + 2: {[]byte("x")}, + } + err := coord.validateReadOnlyShards(context.Background(), groupedReadKeys, []uint64{1}, 10) + require.Error(t, err) + require.ErrorIs(t, err, storeErr) +} + +func TestValidateReadOnlyShards_EmptyGroupedReadKeys(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) + err := coord.validateReadOnlyShards(context.Background(), nil, []uint64{1}, 10) + require.NoError(t, err) +} + +// --------------------------------------------------------------------------- +// Cross-shard: readKeys routed to PREPARE per shard +// --------------------------------------------------------------------------- + +func TestShardedCoordinatorDispatchTxn_ReadKeysRoutedToPrepareByShard(t *testing.T) { + t.Parallel() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) // shard 1: a-m + engine.UpdateRoute([]byte("m"), nil, 2) // shard 2: m+ + + g1Txn := &recordingTransactional{} + g2Txn := &recordingTransactional{} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1Txn}, + 2: {Txn: g2Txn}, + }, 1, NewHLC(), nil) + + _, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + IsTxn: true, + StartTS: 10, + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("b"), Value: []byte("v1")}, // shard 1 + {Op: Put, Key: []byte("x"), Value: []byte("v2")}, // shard 2 + }, + ReadKeys: [][]byte{ + []byte("c"), // shard 1 read key + []byte("y"), // shard 2 read key + }, + }) + require.NoError(t, err) + + // PREPARE for shard 1 should have readKey "c" + g1Prepare := g1Txn.requests[0] + require.Equal(t, pb.Phase_PREPARE, g1Prepare.Phase) + require.Equal(t, [][]byte{[]byte("c")}, g1Prepare.ReadKeys) + + // PREPARE for shard 2 should have readKey "y" + g2Prepare := g2Txn.requests[0] + require.Equal(t, pb.Phase_PREPARE, g2Prepare.Phase) + require.Equal(t, [][]byte{[]byte("y")}, g2Prepare.ReadKeys) +} + +func TestShardedCoordinatorDispatchTxn_SingleShardOmitsReadKeysFromRaftEntry(t *testing.T) { + t.Parallel() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1Txn := &recordingTransactional{} + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1Txn}, + }, 1, NewHLC(), nil) + + _, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + IsTxn: true, + StartTS: 10, + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + ReadKeys: [][]byte{[]byte("rk1"), []byte("rk2")}, + }) + require.NoError(t, err) + require.Len(t, g1Txn.requests, 1) + // Single-shard: readKeys are validated pre-Raft by the adapter, + // so they must NOT be in the Raft log entry. + require.Nil(t, g1Txn.requests[0].ReadKeys) +} diff --git a/store/store.go b/store/store.go index b7652326..b9c6d87b 100644 --- a/store/store.go +++ b/store/store.go @@ -133,6 +133,25 @@ type MVCCStore interface { // a newer commit timestamp than startTS. readKeys carries the transaction's // read set for read-write conflict detection; pass nil when no read set // validation is needed. + // + // Isolation guarantees vary by transaction topology: + // + // Single-shard transactions: read-set validation is performed by the + // adapter layer BEFORE Raft submission (pre-Raft). readKeys is nil in + // the Raft log entry so the FSM does not re-validate. This avoids + // post-commit rejections that would break realtime ordering, but + // introduces a TOCTOU window between the adapter check and the FSM + // apply under applyMu. The window is narrow (single Raft round-trip) + // and matches the isolation level of the previous validateReadSet + // design. + // + // Multi-shard (2PC) write shards: readKeys are included in the + // PREPARE Raft entry and validated atomically under the FSM's applyMu + // lock. No TOCTOU window; full SSI. + // + // Multi-shard (2PC) read-only shards: validated via a linearizable + // read barrier followed by LatestCommitTS outside the FSM lock. A + // small TOCTOU window exists between the barrier and the check. ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error // DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired) // keys matching prefix at commitTS by writing tombstone versions. An empty From 05771181d3f8249a22a53840eba87c168f6b0dc0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 14 Apr 2026 00:05:28 +0900 Subject: [PATCH 9/9] fix: address golangci-lint warnings - Remove unused readKeys parameter from Coordinate.dispatchTxn and ShardedCoordinator.dispatchSingleShardTxn (unparam) - Extract validateReadKeysOnShard to reduce validateReadOnlyShards cyclomatic complexity below cyclop threshold - Fix gci import ordering in sharded_coordinator_txn_test.go --- kv/coordinator.go | 4 +-- kv/coordinator_txn_test.go | 31 +++++---------------- kv/sharded_coordinator.go | 43 +++++++++++++++++------------- kv/sharded_coordinator_txn_test.go | 5 ++-- 4 files changed, 36 insertions(+), 47 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 55dd7b9c..898aa8c1 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -75,7 +75,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C } if reqs.IsTxn { - return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS, reqs.ReadKeys) + return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS) } return c.dispatchRaw(reqs.Elems) @@ -122,7 +122,7 @@ func (c *Coordinate) nextStartTS() uint64 { return c.clock.Next() } -func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64, readKeys [][]byte) (*CoordinateResponse, error) { +func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64) (*CoordinateResponse, error) { primary := primaryKeyForElems(reqs) if len(primary) == 0 { return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index 8f1e7ee5..c65f53b3 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -36,7 +36,7 @@ func TestCoordinateDispatchTxn_RejectsNonMonotonicCommitTS(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, startTS, 0, nil) + }, startTS, 0) require.ErrorIs(t, err, ErrTxnCommitTSRequired) require.Equal(t, 0, tx.commits) } @@ -52,7 +52,7 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: nil, Value: []byte("v")}, - }, 1, 0, nil) + }, 1, 0) require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired) require.Equal(t, 0, tx.commits) } @@ -70,7 +70,7 @@ func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) { _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("b"), Value: []byte("v1")}, {Op: Del, Key: []byte("x")}, - }, startTS, 0, nil) + }, startTS, 0) require.NoError(t, err) require.Equal(t, 1, tx.commits) require.Len(t, tx.reqs, 1) @@ -107,7 +107,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { commitTS := uint64(25) _, err := c.dispatchTxn([]*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, startTS, commitTS, nil) + }, startTS, commitTS) require.NoError(t, err) require.Len(t, tx.reqs, 1) require.Len(t, tx.reqs[0], 1) @@ -117,24 +117,5 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { require.Equal(t, commitTS, meta.CommitTS) } -func TestCoordinateDispatchTxn_ReadKeysNotInOnePhaseTxnRequest(t *testing.T) { - t.Parallel() - - tx := &stubTransactional{} - c := &Coordinate{ - transactionManager: tx, - clock: NewHLC(), - } - - // Single-shard transactions validate read keys pre-Raft (in the adapter), - // so readKeys must NOT be included in the Raft log entry to avoid - // post-commit rejections that break realtime ordering. - readKeys := [][]byte{[]byte("rk1"), []byte("rk2")} - _, err := c.dispatchTxn([]*Elem[OP]{ - {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, 10, 0, readKeys) - require.NoError(t, err) - require.Len(t, tx.reqs, 1) - require.Len(t, tx.reqs[0], 1) - require.Nil(t, tx.reqs[0][0].ReadKeys) -} +// ReadKeys omission from single-shard Raft entries is tested in +// TestShardedCoordinatorDispatchTxn_SingleShardOmitsReadKeysFromRaftEntry. diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 54de2319..5ec5b1c5 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -209,7 +209,7 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co } if len(gids) == 1 { - return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems, readKeys) + return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems) } prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, readKeys) @@ -241,7 +241,7 @@ func (c *ShardedCoordinator) resolveTxnCommitTS(startTS, commitTS uint64) (uint6 return commitTS, nil } -func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP]) (*CoordinateResponse, error) { g, err := c.txnGroupForID(gid) if err != nil { return nil, err @@ -642,25 +642,32 @@ func (c *ShardedCoordinator) validateReadOnlyShards(ctx context.Context, grouped if _, isWrite := writeSet[gid]; isWrite { continue } - g, ok := c.groups[gid] - if !ok { - continue + if err := c.validateReadKeysOnShard(ctx, gid, keys, startTS); err != nil { + return err } - // Linearizable read barrier: wait until the shard's FSM has applied - // all Raft-committed entries so LatestCommitTS reflects the latest - // committed state. Without this, a concurrent write that is committed - // in Raft but not yet applied locally would be invisible. - if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil { + } + return nil +} + +func (c *ShardedCoordinator) validateReadKeysOnShard(ctx context.Context, gid uint64, keys [][]byte, startTS uint64) error { + g, ok := c.groups[gid] + if !ok { + return nil + } + // Linearizable read barrier: wait until the shard's FSM has applied + // all Raft-committed entries so LatestCommitTS reflects the latest + // committed state. Without this, a concurrent write that is committed + // in Raft but not yet applied locally would be invisible. + if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil { + return errors.WithStack(err) + } + for _, key := range keys { + ts, exists, err := g.Store.LatestCommitTS(ctx, key) + if err != nil { return errors.WithStack(err) } - for _, key := range keys { - ts, exists, err := g.Store.LatestCommitTS(ctx, key) - if err != nil { - return errors.WithStack(err) - } - if exists && ts > startTS { - return errors.WithStack(store.NewWriteConflictError(key)) - } + if exists && ts > startTS { + return errors.WithStack(store.NewWriteConflictError(key)) } } return nil diff --git a/kv/sharded_coordinator_txn_test.go b/kv/sharded_coordinator_txn_test.go index 6310a9b5..a4ecd4e8 100644 --- a/kv/sharded_coordinator_txn_test.go +++ b/kv/sharded_coordinator_txn_test.go @@ -6,12 +6,13 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/internal/raftengine" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" ) type recordingTransactional struct {