Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions src/util/sharded-stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ void ShardedStackBase::push(StackEntry * entry) noexcept
// Release ordering publishes all writes to entry's content (e.g. deinitialisation)
// before the entry becomes visible to a concurrent pop.
// The matching acquire is in pop after the rseq commit.
TSAN_IGNORE_BEGIN();
StackEntry * oldHead = state->head.load(std::memory_order_relaxed);
TSAN_IGNORE_END();
entry->next.store(oldHead, std::memory_order_release);

// Fast path: rseq critical section, zero atomic instructions.
Expand Down Expand Up @@ -101,13 +99,9 @@ void ShardedStackBase::push(StackEntry * entry) noexcept

void ShardedStackBase::flush(ProcessorState * state) noexcept
{
// acq_rel: acquire observes any in-progress rseq commit; release
// publishes the null so subsequent rseq reads see an empty list.
// TSAN_IGNORE: head may have been written by rseq asm (hidden from TSan);
// wrapping the exchange keeps TSan's happens-before chain consistent.
TSAN_IGNORE_BEGIN();
// acq_rel: acquire observes any in-progress rseq commit;
// release publishes the null so subsequent rseq reads see an empty list.
StackEntry * head = state->head.exchange(nullptr, std::memory_order_acq_rel);
TSAN_IGNORE_END();
state->count.store(0, std::memory_order_relaxed);

if (head)
Expand Down Expand Up @@ -194,14 +188,38 @@ bool ShardedStackBase::refill(ProcessorState * state) noexcept
return false;
}

// Install the batch as the per-CPU list. If we migrated between entering
// the slow path and this store, the entries remain on cpu's list and are
// recovered by the next flush().
// TSAN_IGNORE: keeps TSan's tracking consistent with the rseq asm writes.
TSAN_IGNORE_BEGIN();
state->head.store(newHead, std::memory_order_relaxed);
TSAN_IGNORE_END();
state->count.store(batchSize, std::memory_order_relaxed);
// Walk the chain to find the tail and actual count. Both are needed below:
// the tail for pushBatch on CAS failure, the count to correctly initialize
// ProcessorState::count (which may be less than batchSize if the FreeList
// was partially filled by an explicit flush rather than a full overflow).
StackEntry * tail = newHead;
uint32_t actualCount = 1;
while (StackEntry * next = tail->next.load(std::memory_order_relaxed))
{
tail = next;
actualCount++;
}

// Install the batch as the per-CPU list. Use CAS rather than a plain store:
// if we migrated between entering the slow path and reaching this point,
// another thread may have pushed to this CPU's head concurrently, and a
// plain store would clobber that entry.
StackEntry * expected = state->head.load(std::memory_order_relaxed);
for (;;)
{
if (expected)
{
pushBatch(newHead, tail);
break;
}

if (state->head.compare_exchange_weak(expected, newHead, std::memory_order_relaxed, std::memory_order_relaxed))
{
state->count.store(actualCount, std::memory_order_relaxed);
break;
}
}

return true;
}

Expand Down
Loading