From 6a6acb00eb93abc85cbaaa1a0746ad33d85a656b Mon Sep 17 00:00:00 2001 From: Vadim Skipin Date: Wed, 6 May 2026 13:47:09 +0000 Subject: [PATCH] Fix ShardedStackBase::refill to properly use CAS --- src/util/sharded-stack.cpp | 50 ++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/src/util/sharded-stack.cpp b/src/util/sharded-stack.cpp index 820cf99..e62a196 100644 --- a/src/util/sharded-stack.cpp +++ b/src/util/sharded-stack.cpp @@ -56,9 +56,7 @@ void ShardedStackBase::push(StackEntry * entry) noexcept // Release ordering publishes all writes to entry's content (e.g. deinitialisation) // before the entry becomes visible to a concurrent pop. // The matching acquire is in pop after the rseq commit. - TSAN_IGNORE_BEGIN(); StackEntry * oldHead = state->head.load(std::memory_order_relaxed); - TSAN_IGNORE_END(); entry->next.store(oldHead, std::memory_order_release); // Fast path: rseq critical section, zero atomic instructions. @@ -101,13 +99,9 @@ void ShardedStackBase::push(StackEntry * entry) noexcept void ShardedStackBase::flush(ProcessorState * state) noexcept { - // acq_rel: acquire observes any in-progress rseq commit; release - // publishes the null so subsequent rseq reads see an empty list. - // TSAN_IGNORE: head may have been written by rseq asm (hidden from TSan); - // wrapping the exchange keeps TSan's happens-before chain consistent. - TSAN_IGNORE_BEGIN(); + // acq_rel: acquire observes any in-progress rseq commit; + // release publishes the null so subsequent rseq reads see an empty list. StackEntry * head = state->head.exchange(nullptr, std::memory_order_acq_rel); - TSAN_IGNORE_END(); state->count.store(0, std::memory_order_relaxed); if (head) @@ -194,14 +188,38 @@ bool ShardedStackBase::refill(ProcessorState * state) noexcept return false; } - // Install the batch as the per-CPU list. If we migrated between entering - // the slow path and this store, the entries remain on cpu's list and are - // recovered by the next flush(). - // TSAN_IGNORE: keeps TSan's tracking consistent with the rseq asm writes. - TSAN_IGNORE_BEGIN(); - state->head.store(newHead, std::memory_order_relaxed); - TSAN_IGNORE_END(); - state->count.store(batchSize, std::memory_order_relaxed); + // Walk the chain to find the tail and actual count. Both are needed below: + // the tail for pushBatch on CAS failure, the count to correctly initialize + // ProcessorState::count (which may be less than batchSize if the FreeList + // was partially filled by an explicit flush rather than a full overflow). + StackEntry * tail = newHead; + uint32_t actualCount = 1; + while (StackEntry * next = tail->next.load(std::memory_order_relaxed)) + { + tail = next; + actualCount++; + } + + // Install the batch as the per-CPU list. Use CAS rather than a plain store: + // if we migrated between entering the slow path and reaching this point, + // another thread may have pushed to this CPU's head concurrently, and a + // plain store would clobber that entry. + StackEntry * expected = state->head.load(std::memory_order_relaxed); + for (;;) + { + if (expected) + { + pushBatch(newHead, tail); + break; + } + + if (state->head.compare_exchange_weak(expected, newHead, std::memory_order_relaxed, std::memory_order_relaxed)) + { + state->count.store(actualCount, std::memory_order_relaxed); + break; + } + } + return true; }