Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion adapter/distribution_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *distributionCoordinatorStub) applyDispatch(
startTS uint64,
commitTS uint64,
) error {
if err := s.store.ApplyMutations(ctx, mutations, startTS, commitTS); err != nil {
if err := s.store.ApplyMutations(ctx, mutations, nil, startTS, commitTS); err != nil {
return err
}
if s.afterDispatch != nil {
Expand Down
12 changes: 8 additions & 4 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,11 @@ func (t *txnContext) commit() error {
return nil
}

group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS}
readKeys := make([][]byte, 0, len(t.readKeys))
for _, k := range t.readKeys {
readKeys = append(readKeys, k)
}
group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS, ReadKeys: readKeys}
ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
defer cancel()
if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil {
Expand Down Expand Up @@ -2098,9 +2102,9 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
nextResults = append(nextResults, res)
}

if err := txn.validateReadSet(dispatchCtx); err != nil {
return err
}
// Read-set validation is performed inside ApplyMutations under the
// store's apply lock, so a separate validateReadSet call here would
// be redundant and double the LatestCommitTS I/O.
if err := txn.commit(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,7 @@ func TestS3Server_BackwardCompatibility_NoBucketAclFieldIsPrivate(t *testing.T)
commitTS := coord.Clock().Next()
err = st.ApplyMutations(context.Background(), []*store.KVPairMutation{
{Op: store.OpTypePut, Key: s3keys.BucketMetaKey("legacy-bucket"), Value: legacyJSON},
}, commitTS-1, commitTS)
}, nil, commitTS-1, commitTS)
require.NoError(t, err)

// Create a server WITH credentials; the legacy bucket has no acl field.
Expand Down
2 changes: 1 addition & 1 deletion distribution/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mu
if err != nil {
return err
}
if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, commitTS); err != nil {
if err := s.store.ApplyMutations(ctx, mutations, nil, plan.readTS, commitTS); err != nil {
if errors.Is(err, store.ErrWriteConflict) {
return errors.WithStack(ErrCatalogVersionMismatch)
}
Expand Down
11 changes: 6 additions & 5 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C
}

if reqs.IsTxn {
return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS)
return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS, reqs.ReadKeys)
}

return c.dispatchRaw(reqs.Elems)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (c *Coordinate) nextStartTS() uint64 {
return c.clock.Next()
}

func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64) (*CoordinateResponse, error) {
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64, readKeys [][]byte) (*CoordinateResponse, error) {
primary := primaryKeyForElems(reqs)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
Expand All @@ -144,7 +144,7 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint
}

r, err := c.transactionManager.Commit([]*pb.Request{
onePhaseTxnRequest(startTS, commitTS, primary, reqs),
onePhaseTxnRequest(startTS, commitTS, primary, reqs, readKeys),
})
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
commitTS = 0
}
requests = []*pb.Request{
onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems),
onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, reqs.ReadKeys),
}
} else {
for _, req := range reqs.Elems {
Expand Down Expand Up @@ -318,7 +318,7 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
panic("unreachable")
}

func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request {
func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP], readKeys [][]byte) *pb.Request {
muts := make([]*pb.Mutation, 0, len(reqs)+1)
muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS))
for _, req := range reqs {
Expand All @@ -329,6 +329,7 @@ func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Ele
Phase: pb.Phase_NONE,
Ts: startTS,
Mutations: muts,
ReadKeys: readKeys,
}
}

Expand Down
27 changes: 23 additions & 4 deletions kv/coordinator_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCoordinateDispatchTxn_RejectsNonMonotonicCommitTS(t *testing.T) {

_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: []byte("k"), Value: []byte("v")},
}, startTS, 0)
}, startTS, 0, nil)
require.ErrorIs(t, err, ErrTxnCommitTSRequired)
require.Equal(t, 0, tx.commits)
}
Expand All @@ -52,7 +52,7 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) {

_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: nil, Value: []byte("v")},
}, 1, 0)
}, 1, 0, nil)
require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired)
require.Equal(t, 0, tx.commits)
}
Expand All @@ -70,7 +70,7 @@ func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) {
_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: []byte("b"), Value: []byte("v1")},
{Op: Del, Key: []byte("x")},
}, startTS, 0)
}, startTS, 0, nil)
require.NoError(t, err)
require.Equal(t, 1, tx.commits)
require.Len(t, tx.reqs, 1)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
commitTS := uint64(25)
_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: []byte("k"), Value: []byte("v")},
}, startTS, commitTS)
}, startTS, commitTS, nil)
require.NoError(t, err)
require.Len(t, tx.reqs, 1)
require.Len(t, tx.reqs[0], 1)
Expand All @@ -116,3 +116,22 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, commitTS, meta.CommitTS)
}

func TestCoordinateDispatchTxn_ReadKeysInRequest(t *testing.T) {
t.Parallel()

tx := &stubTransactional{}
c := &Coordinate{
transactionManager: tx,
clock: NewHLC(),
}

readKeys := [][]byte{[]byte("rk1"), []byte("rk2")}
_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: []byte("k"), Value: []byte("v")},
}, 10, 0, readKeys)
require.NoError(t, err)
require.Len(t, tx.reqs, 1)
require.Len(t, tx.reqs[0], 1)
require.Equal(t, readKeys, tx.reqs[0][0].ReadKeys)
}
20 changes: 9 additions & 11 deletions kv/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui
}
// Raw requests always commit against the latest state; use commitTS as both
// the validation snapshot and the commit timestamp.
return errors.WithStack(f.store.ApplyMutations(ctx, muts, commitTS, commitTS))
return errors.WithStack(f.store.ApplyMutations(ctx, muts, nil, commitTS, commitTS))
}

// extractDelPrefix checks if the mutations contain a DEL_PREFIX operation.
Expand Down Expand Up @@ -313,18 +313,16 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error {
return err
}

if err := f.store.ApplyMutations(ctx, storeMuts, startTS, startTS); err != nil {
if err := f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil {
return errors.WithStack(err)
}
return nil
}

// handleOnePhaseTxnRequest applies a single-shard transaction atomically.
// The isolation level is Snapshot Isolation (SI): only write-write conflicts
// are detected via ApplyMutations. Read-write conflicts (write skew) are NOT
// prevented because the read-set is not tracked. Callers requiring
// Serializable Snapshot Isolation (SSI) must implement read-set validation
// at a higher layer.
// Both write-write and read-write conflicts are checked: the read set carried
// in r.ReadKeys is validated alongside the mutation keys inside
// ApplyMutations under the store's apply lock.
func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error {
meta, muts, err := extractTxnMeta(r.Mutations)
if err != nil {
Expand All @@ -350,7 +348,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com
if err != nil {
return err
}
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS))
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, commitTS))
}

func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
Expand Down Expand Up @@ -419,7 +417,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start
// The secondary-shard LatestCommitTS scan is intentionally deferred to the
// write-conflict path so the hot (first-time) commit path pays no extra cost.
func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error {
err := f.store.ApplyMutations(ctx, storeMuts, applyStartTS, commitTS)
err := f.store.ApplyMutations(ctx, storeMuts, nil, applyStartTS, commitTS)
if err == nil {
return nil
}
Expand All @@ -436,7 +434,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut
return errors.WithStack(lErr)
}
if exists && latestTS >= commitTS {
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, commitTS, commitTS))
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, commitTS, commitTS))
}
}
return errors.WithStack(err)
Expand Down Expand Up @@ -475,7 +473,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u
if len(storeMuts) == 0 {
return nil
}
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, abortTS))
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, startTS, abortTS))
}

func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) {
Expand Down
4 changes: 2 additions & 2 deletions kv/leader_routed_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uin
return s.proxyRawLatestCommitTS(ctx, key)
}

func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error {
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
if s == nil || s.local == nil {
return errors.WithStack(store.ErrNotSupported)
}
return errors.WithStack(s.local.ApplyMutations(ctx, mutations, startTS, commitTS))
return errors.WithStack(s.local.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
}

func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {
Expand Down
4 changes: 2 additions & 2 deletions kv/shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func cleanupTSWithNow(startTS, now uint64) uint64 {
//
// All mutations must belong to the same shard. Cross-shard mutation batches are
// not supported.
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error {
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
if len(mutations) == 0 {
return nil
}
Expand All @@ -1135,7 +1135,7 @@ func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPa
return errors.WithStack(ErrCrossShardMutationBatchNotSupported)
}
}
return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, startTS, commitTS))
return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
}

// DeletePrefixAt applies a prefix delete to every shard in the store.
Expand Down
Loading
Loading