Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions ddprof-lib/src/main/cpp/callTraceHashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,19 @@ CallTrace *CallTraceHashTable::findCallTrace(LongHashTable *table, u64 hash) {

u32 slot = probe.slot();
while (true) {
if (keys[slot] == hash) {
return table->values()[slot].trace;
// Use atomic load: keys[] can be written concurrently via CAS in put()
// when a table is promoted to prev but still has in-flight insertions.
u64 key = __atomic_load_n(&keys[slot], __ATOMIC_ACQUIRE);
if (key == hash) {
// Use acquireTrace() to pair with the RELEASE store in setTrace().
// If still PREPARING, treat as not found: callers will create a new entry.
CallTrace *trace = table->values()[slot].acquireTrace();
if (trace == CallTraceSample::PREPARING) {
return nullptr;
}
return trace;
}
if (keys[slot] == 0) {
if (key == 0) {
return nullptr;
}
if (!probe.hasNext()) {
Expand Down
113 changes: 113 additions & 0 deletions ddprof-lib/src/test/cpp/stress_callTraceStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,119 @@ TEST_F(StressTestSuite, HashTableSpinWaitEdgeCasesTest) {
EXPECT_LT(drop_rate, 0.5) << "Excessive trace drop rate: " << drop_rate;
}

// Regression test: TSan data race in findCallTrace() — keys[] plain reads
// raced with atomic CAS writes from concurrent put(). When a table is
// promoted to prev after expansion, threads that loaded the old table
// pointer before the CAS swap still write into it while new threads call
// findCallTrace(prev, hash) — the read must be atomic.
//
// Memory-ordering races on 8-byte aligned values are benign on x86_64 (the
// CPU guarantees naturally-atomic loads), so the race is only reliably
// detectable under ThreadSanitizer. The test is therefore skipped outside
// TSan builds to avoid giving false confidence that the regression is covered.
//
// The dedup assertion (dedup_mismatches == 0) validates findCallTrace
// correctness across expansion boundaries independently of memory ordering;
// it catches logic regressions in all build configurations.
TEST_F(StressTestSuite, FindCallTraceAtomicReadRaceTest) {
#if !defined(__SANITIZE_THREAD__) && !(defined(__has_feature) && __has_feature(thread_sanitizer))
GTEST_SKIP() << "TSan-only race regression: re-run with -fsanitize=thread";
#endif

// Fill the table to just below the 75% expansion threshold
// (INITIAL_CAPACITY = 65536; 75% = 49152).
const int FILL_TARGET = 48800;
const int NUM_THREADS = 16;
const int OPS_PER_THREAD = 400;
// Number of pre-filled entries to re-insert concurrently as a dedup check.
// After expansion these will be looked up via findCallTrace(prev, hash).
const int VERIFY_COUNT = 200;

void* aligned_memory = std::aligned_alloc(alignof(CallTraceHashTable), sizeof(CallTraceHashTable));
ASSERT_NE(aligned_memory, nullptr);
auto hash_table_ptr = std::unique_ptr<CallTraceHashTable, void(*)(CallTraceHashTable*)>(
new(aligned_memory) CallTraceHashTable(),
[](CallTraceHashTable* ptr) {
ptr->~CallTraceHashTable();
std::free(ptr);
}
);
CallTraceHashTable& hash_table = *hash_table_ptr;
hash_table.setInstanceId(42);

// Pre-fill single-threaded up to just below the expansion threshold,
// recording trace_ids for the last VERIFY_COUNT entries.
std::vector<u64> expected_ids(VERIFY_COUNT);
std::vector<ASGCT_CallFrame> verify_frames(VERIFY_COUNT);

for (int i = 0; i < FILL_TARGET; ++i) {
ASGCT_CallFrame frame;
frame.bci = i + 1;
frame.method_id = reinterpret_cast<jmethodID>(0x10000 + i);
u64 id = hash_table.put(1, &frame, false, 1);
if (i >= FILL_TARGET - VERIFY_COUNT) {
int vi = i - (FILL_TARGET - VERIFY_COUNT);
expected_ids[vi] = id;
verify_frames[vi] = frame;
}
}

std::atomic<bool> go{false};
std::atomic<uint64_t> successes{0};
// Counts put() calls that returned a trace_id different from the originally
// recorded id for a known pre-filled entry. A non-zero value means
// findCallTrace(prev, hash) returned nullptr when it should have returned
// the entry — either a logic bug or a memory-ordering failure.
std::atomic<uint64_t> dedup_mismatches{0};
std::vector<std::thread> workers;

// Half the threads insert new distinct frames to trigger expansion. They
// load the old table pointer before the CAS swap and write keys[] in prev,
// creating the concurrent-write side of the race window.
for (int t = 0; t < NUM_THREADS / 2; ++t) {
workers.emplace_back([&, t]() {
while (!go.load(std::memory_order_acquire)) { /* spin */ }
for (int i = 0; i < OPS_PER_THREAD; ++i) {
ASGCT_CallFrame frame;
frame.bci = FILL_TARGET + 1 + t * OPS_PER_THREAD + i;
frame.method_id = reinterpret_cast<jmethodID>(0x80000 + t * OPS_PER_THREAD + i);
u64 id = hash_table.put(1, &frame, false, 1);
if (id != 0 && id != CallTraceStorage::DROPPED_TRACE_ID) {
successes.fetch_add(1, std::memory_order_relaxed);
}
}
});
}

// The other half re-insert pre-filled frames. After expansion the new
// table is empty for those hashes, so put() claims a new slot and calls
// findCallTrace(prev, hash) — the read side of the race window. A correct
// findCallTrace must return the original trace so dedup produces the same
// trace_id.
for (int t = NUM_THREADS / 2; t < NUM_THREADS; ++t) {
workers.emplace_back([&, t]() {
while (!go.load(std::memory_order_acquire)) { /* spin */ }
for (int i = 0; i < VERIFY_COUNT; ++i) {
u64 id = hash_table.put(1, &verify_frames[i], false, 1);
if (id != 0 && id != CallTraceStorage::DROPPED_TRACE_ID &&
id != expected_ids[i]) {
dedup_mismatches.fetch_add(1, std::memory_order_relaxed);
}
}
});
}

go.store(true, std::memory_order_release);
for (auto& w : workers) {
w.join();
}

EXPECT_GT(successes.load(), 0u) << "No successful insertions after expansion";
Comment thread
jbachorik marked this conversation as resolved.
EXPECT_EQ(dedup_mismatches.load(), 0u)
<< "findCallTrace returned wrong trace_id for pre-filled entries after expansion "
"(findCallTrace failed to locate entry in prev table)";
}

// Test 13: Hash Table Memory Allocation Failure Stress Test
TEST_F(StressTestSuite, HashTableAllocationFailureStressTest) {
const int NUM_THREADS = 8;
Expand Down
Loading