From 1d12490fa0add682d60469e2f2c21658f4a42d99 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 09:54:02 -0400 Subject: [PATCH 1/8] Add MpscRingBuffer for pre-allocated, recyclable slot rings Bounded multi-producer / single-consumer ring buffer of long-lived T instances. Producers mutate slots in place via callbacks; the consumer reads them the same way. No allocation per write/read after construction. BiConsumer/TriConsumer variants take context object(s) before the slot, matching the TagMap.forEach / Hashtable.forEach convention so callers can use static final non-capturing lambdas. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 196 ++++++++++++++ .../util/concurrent/MpscRingBufferTest.java | 247 ++++++++++++++++++ 2 files changed, 443 insertions(+) create mode 100644 internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java create mode 100644 internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java new file mode 100644 index 00000000000..f6b9ee998ed --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -0,0 +1,196 @@ +package datadog.trace.util.concurrent; + +import datadog.trace.api.function.TriConsumer; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * Bounded multi-producer / single-consumer ring buffer of pre-allocated {@code T} instances. + * + *

Each slot is a long-lived {@code T} instance created at construction time via the supplied + * factory and recycled forever. Producers mutate slots in place via callbacks; the consumer reads + * them the same way. No allocation occurs per write/read after construction, which is the entire + * point of this class over a queue of references. + * + *

The {@code BiConsumer} and {@link TriConsumer} variants take their context object(s) + * before the slot, matching the convention of {@code TagMap.forEach} and {@code + * Hashtable.forEach}. That ordering lets callers declare the callback as a {@code static final} + * non-capturing lambda and pass per-call context at the call site without allocating a closure. + * + *

Producer cursor is CAS-claimed; visibility of a claimed slot to the consumer is gated by a + * per-slot publication-sequence array. Consumer cursor is updated with a volatile write so + * producers observe space being freed. + */ +public final class MpscRingBuffer { + + private final T[] slots; + + /** + * Per-slot publication sequence. Producers write the claimed sequence here as the last step of a + * publish (release write via {@link AtomicLongArray#set}); the consumer reads it (acquire read) + * to determine whether the slot at the next position is ready. A slot is considered published for + * sequence {@code s} iff {@code sequences[s & mask] == s}. + */ + private final AtomicLongArray publishedSequences; + + private final int capacity; + private final int mask; + + /** Next sequence to claim. Producers increment via CAS. */ + private final AtomicLong producerCursor = new AtomicLong(-1L); + + /** + * Highest sequence consumed. Volatile so producers see space freed up; only the consumer thread + * writes to it, so a plain field with a manual volatile-store would also work. + */ + private volatile long consumerCursor = -1L; + + @SuppressWarnings("unchecked") + public MpscRingBuffer(final Supplier factory, final int capacityHint) { + if (capacityHint < 1) { + throw new IllegalArgumentException("capacityHint must be >= 1, got " + capacityHint); + } + this.capacity = nextPowerOfTwo(capacityHint); + if (this.capacity < 1) { + throw new IllegalArgumentException("capacity overflow for hint " + capacityHint); + } + this.mask = capacity - 1; + this.slots = (T[]) new Object[capacity]; + this.publishedSequences = new AtomicLongArray(capacity); + for (int i = 0; i < capacity; i++) { + slots[i] = factory.get(); + // Initial: sentinel "no sequence published here yet" -- anything < 0 works since + // sequences are 0-based and monotonically increasing. + publishedSequences.set(i, Long.MIN_VALUE); + } + } + + public int capacity() { + return capacity; + } + + /** Approximate count of slots holding unread items. May briefly exceed capacity under race. */ + public int size() { + final long p = producerCursor.get(); + final long c = consumerCursor; + final long diff = p - c; + if (diff <= 0) return 0; + if (diff > capacity) return capacity; + return (int) diff; + } + + public boolean isEmpty() { + return producerCursor.get() == consumerCursor; + } + + /** {@code true} if the slot was filled and published; {@code false} if the ring is full. */ + public boolean tryWrite(final Consumer filler) { + final long seq = claim(); + if (seq < 0L) return false; + filler.accept(slots[(int) (seq & mask)]); + publish(seq); + return true; + } + + public boolean tryWrite(final C context, final BiConsumer filler) { + final long seq = claim(); + if (seq < 0L) return false; + filler.accept(context, slots[(int) (seq & mask)]); + publish(seq); + return true; + } + + public boolean tryWrite( + final C1 context1, + final C2 context2, + final TriConsumer filler) { + final long seq = claim(); + if (seq < 0L) return false; + filler.accept(context1, context2, slots[(int) (seq & mask)]); + publish(seq); + return true; + } + + /** Drains all currently-available slots. Returns the count processed. */ + public int drain(final Consumer handler) { + long cursor = consumerCursor; + int count = 0; + while (true) { + final long nextSeq = cursor + 1L; + final int idx = (int) (nextSeq & mask); + if (publishedSequences.get(idx) != nextSeq) break; + handler.accept(slots[idx]); + cursor = nextSeq; + count++; + } + if (count > 0) consumerCursor = cursor; + return count; + } + + public int drain(final C context, final BiConsumer handler) { + long cursor = consumerCursor; + int count = 0; + while (true) { + final long nextSeq = cursor + 1L; + final int idx = (int) (nextSeq & mask); + if (publishedSequences.get(idx) != nextSeq) break; + handler.accept(context, slots[idx]); + cursor = nextSeq; + count++; + } + if (count > 0) consumerCursor = cursor; + return count; + } + + public int drain( + final C1 context1, + final C2 context2, + final TriConsumer handler) { + long cursor = consumerCursor; + int count = 0; + while (true) { + final long nextSeq = cursor + 1L; + final int idx = (int) (nextSeq & mask); + if (publishedSequences.get(idx) != nextSeq) break; + handler.accept(context1, context2, slots[idx]); + cursor = nextSeq; + count++; + } + if (count > 0) consumerCursor = cursor; + return count; + } + + /** CAS-claim the next sequence, or return {@code -1} if the ring is full. */ + private long claim() { + while (true) { + final long current = producerCursor.get(); + // Stale read of consumerCursor is fine: a false "full" reading just causes a drop, and + // a real "full" reading is correctly identified because consumerCursor only advances. + final long consumed = consumerCursor; + if (current - consumed >= capacity) { + return -1L; + } + final long next = current + 1L; + if (producerCursor.compareAndSet(current, next)) { + return next; + } + // CAS failure -> another producer claimed; retry. + } + } + + /** Mark sequence {@code seq} as published. Release semantics via {@link AtomicLongArray#set}. */ + private void publish(final long seq) { + publishedSequences.set((int) (seq & mask), seq); + } + + private static int nextPowerOfTwo(final int n) { + if (n <= 1) return 1; + // 32 - leadingZeros(n-1) gives the number of bits needed to represent n-1; shifting 1 by that + // gives the smallest power of two >= n. + final int bits = 32 - Integer.numberOfLeadingZeros(n - 1); + return 1 << bits; + } +} diff --git a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java new file mode 100644 index 00000000000..43b89271621 --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java @@ -0,0 +1,247 @@ +package datadog.trace.util.concurrent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.api.function.TriConsumer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import org.junit.jupiter.api.Test; + +class MpscRingBufferTest { + + /** Mutable slot used by the tests; replaces the per-publish allocation a real consumer avoids. */ + static final class Slot { + int value; + String tag; + } + + // ============ Construction ============ + + @Test + void capacityRoundsUpToPowerOfTwo() { + assertEquals(1, new MpscRingBuffer<>(Slot::new, 1).capacity()); + assertEquals(2, new MpscRingBuffer<>(Slot::new, 2).capacity()); + assertEquals(4, new MpscRingBuffer<>(Slot::new, 3).capacity()); + assertEquals(16, new MpscRingBuffer<>(Slot::new, 10).capacity()); + assertEquals(1024, new MpscRingBuffer<>(Slot::new, 1024).capacity()); + assertEquals(2048, new MpscRingBuffer<>(Slot::new, 1025).capacity()); + } + + @Test + void rejectsNonPositiveCapacityHint() { + assertThrows(IllegalArgumentException.class, () -> new MpscRingBuffer<>(Slot::new, 0)); + assertThrows(IllegalArgumentException.class, () -> new MpscRingBuffer<>(Slot::new, -1)); + } + + @Test + void slotsArePreAllocatedFromFactory() { + AtomicInteger calls = new AtomicInteger(); + MpscRingBuffer ring = + new MpscRingBuffer<>( + () -> { + calls.incrementAndGet(); + return new Slot(); + }, + 8); + assertEquals(8, ring.capacity()); + assertEquals(8, calls.get(), "factory must run capacity times during construction"); + } + + // ============ Basic produce / consume ============ + + @Test + void emptyRingIsEmpty() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + assertTrue(ring.isEmpty()); + assertEquals(0, ring.size()); + assertEquals(0, ring.drain(s -> {})); + } + + @Test + void singleWriteThenDrain() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + assertTrue( + ring.tryWrite( + s -> { + s.value = 42; + s.tag = "hello"; + })); + assertEquals(1, ring.size()); + + List seen = new ArrayList<>(); + int drained = ring.drain(s -> seen.add(s.value)); + assertEquals(1, drained); + assertEquals(Arrays.asList(42), seen); + assertTrue(ring.isEmpty()); + } + + @Test + void writesPreserveFIFOOrder() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + for (int i = 0; i < 6; i++) { + final int v = i; + assertTrue(ring.tryWrite(s -> s.value = v)); + } + List seen = new ArrayList<>(); + ring.drain(s -> seen.add(s.value)); + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), seen); + } + + // ============ Full / drop behavior ============ + + @Test + void writesBeyondCapacityFail() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + for (int i = 0; i < 4; i++) { + assertTrue(ring.tryWrite(s -> s.value = 0)); + } + assertFalse(ring.tryWrite(s -> s.value = 0), "5th write must fail when capacity == 4"); + assertEquals(4, ring.size()); + } + + @Test + void writesResumeAfterDrain() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + for (int i = 0; i < 4; i++) { + final int v = i; + assertTrue(ring.tryWrite(s -> s.value = v)); + } + assertFalse(ring.tryWrite(s -> {})); + + AtomicInteger sum = new AtomicInteger(); + ring.drain(s -> sum.addAndGet(s.value)); + assertEquals(0 + 1 + 2 + 3, sum.get()); + + // Now should accept another full round. + for (int i = 100; i < 104; i++) { + final int v = i; + assertTrue(ring.tryWrite(s -> s.value = v)); + } + assertFalse(ring.tryWrite(s -> {})); + } + + // ============ Context-passing variants (context first, slot last) ============ + + @Test + void writeAndDrainWithOneContext() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + BiConsumer filler = (ctx, s) -> s.value = ctx; + assertTrue(ring.tryWrite(7, filler)); + assertTrue(ring.tryWrite(8, filler)); + + List seen = new ArrayList<>(); + BiConsumer, Slot> reader = (sink, s) -> sink.add(s.value); + assertEquals(2, ring.drain(seen, reader)); + assertEquals(Arrays.asList(7, 8), seen); + } + + @Test + void writeAndDrainWithTwoContexts() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + TriConsumer filler = + (v, t, s) -> { + s.value = v; + s.tag = t; + }; + assertTrue(ring.tryWrite(1, "a", filler)); + assertTrue(ring.tryWrite(2, "b", filler)); + + List tags = new ArrayList<>(); + List vals = new ArrayList<>(); + TriConsumer, List, Slot> reader = + (ts, vs, s) -> { + ts.add(s.tag); + vs.add(s.value); + }; + assertEquals(2, ring.drain(tags, vals, reader)); + assertEquals(Arrays.asList("a", "b"), tags); + assertEquals(Arrays.asList(1, 2), vals); + } + + // ============ Concurrency ============ + + @Test + void manyProducersSingleConsumerSeesEveryWrittenValue() throws InterruptedException { + final int producers = 8; + final int perProducer = 50_000; + final int total = producers * perProducer; + + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 1024); + + ExecutorService producerPool = Executors.newFixedThreadPool(producers); + AtomicInteger dropped = new AtomicInteger(); + AtomicInteger written = new AtomicInteger(); + CountDownLatch start = new CountDownLatch(1); + + BiConsumer filler = (v, s) -> s.value = v; + + for (int p = 0; p < producers; p++) { + final int base = p * perProducer; + producerPool.submit( + () -> { + try { + start.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + for (int i = 0; i < perProducer; i++) { + int v = base + i; + // Spin until the consumer makes room. We're testing correctness, not throughput. + while (!ring.tryWrite(v, filler)) { + dropped.incrementAndGet(); + Thread.yield(); + } + written.incrementAndGet(); + } + }); + } + + Set seen = new HashSet<>(total); + BiConsumer, Slot> reader = (sink, s) -> sink.add(s.value); + + Thread consumer = + new Thread( + () -> { + while (seen.size() < total) { + if (ring.drain(seen, reader) == 0) Thread.yield(); + } + }, + "ring-consumer"); + consumer.start(); + + start.countDown(); + producerPool.shutdown(); + assertTrue(producerPool.awaitTermination(30, TimeUnit.SECONDS), "producers timed out"); + assertTrue(consumer.isAlive() || seen.size() == total); + consumer.join(30_000); + assertFalse(consumer.isAlive(), "consumer timed out"); + + assertEquals(total, written.get(), "every producer call should eventually succeed"); + assertEquals(total, seen.size(), "consumer must see every value exactly once"); + } + + @Test + void sizeReflectsOutstandingWrites() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + assertEquals(0, ring.size()); + ring.tryWrite(s -> {}); + assertEquals(1, ring.size()); + ring.tryWrite(s -> {}); + assertEquals(2, ring.size()); + ring.drain(s -> {}); + assertEquals(0, ring.size()); + } +} From 16b2ec6f0c472e497819af25c0773b6949827379 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 09:58:25 -0400 Subject: [PATCH 2/8] Add throughput benchmarks for MpscRingBuffer Five benchmarks: producer-only throughput at 1/8/16 threads with a background drainer, plus an e2e @Group bench pairing 8 producers with 1 consumer for system throughput. Ring capacity is a @Param so runs can sweep capacities without recompiling. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../concurrent/MpscRingBufferBenchmark.java | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java diff --git a/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java b/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java new file mode 100644 index 00000000000..e9d084f3334 --- /dev/null +++ b/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java @@ -0,0 +1,146 @@ +package datadog.trace.util.concurrent; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.function.BiConsumer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Throughput benchmarks for {@link MpscRingBuffer}. + * + *

+ * + *

Run with {@code -p capacity=...} to override the default ring capacity. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Fork(value = 1) +public class MpscRingBufferBenchmark { + + /** + * Static filler so the lambda is non-capturing and the JIT can hoist it past the {@code tryWrite} + * call. Context arg comes first, slot last — matches {@code TagMap.forEach} convention. + */ + private static final BiConsumer FILLER = (v, slot) -> slot.value = v; + + /** Mutable slot. Replicates the per-publish allocation a real producer wants to avoid. */ + public static final class Slot { + long value; + } + + @Param({"1024"}) + public int capacity; + + /** + * Shared ring for the {@code write_*} benches. A background drainer keeps space available so + * producer benchmarks measure write throughput rather than full-ring drop throughput. + */ + MpscRingBuffer ring; + + private volatile boolean stopDrainer; + private Thread drainerThread; + + /** + * Separate ring for the {@code e2e_*} group benches. JMH drives both sides directly so we don't + * want our own background drainer for those. + */ + MpscRingBuffer e2eRing; + + @Setup(Level.Trial) + public void setup() { + ring = new MpscRingBuffer<>(Slot::new, capacity); + e2eRing = new MpscRingBuffer<>(Slot::new, capacity); + stopDrainer = false; + drainerThread = + new Thread( + () -> { + while (!stopDrainer) { + if (ring.drain((Slot s) -> {}) == 0) Thread.yield(); + } + }, + "MpscRingBufferBenchmark-drainer"); + drainerThread.setDaemon(true); + drainerThread.start(); + } + + @TearDown(Level.Trial) + public void teardown() throws InterruptedException { + stopDrainer = true; + drainerThread.join(5_000); + } + + @State(Scope.Thread) + public static class ThreadState { + long counter; + } + + // ============ Write throughput with background drainer ============ + + @Threads(1) + @Benchmark + public boolean write_1p(ThreadState ts) { + return ring.tryWrite(ts.counter++, FILLER); + } + + @Threads(8) + @Benchmark + public boolean write_8p(ThreadState ts) { + return ring.tryWrite(ts.counter++, FILLER); + } + + @Threads(16) + @Benchmark + public boolean write_16p(ThreadState ts) { + return ring.tryWrite(ts.counter++, FILLER); + } + + // ============ End-to-end producer/consumer pairing ============ + // + // JMH runs both methods in the same trial: 8 producer threads + 1 consumer thread. Throughput + // is reported as ops/sec aggregated across all 9 threads, but the consumer's drain count + // dwarfs the producer ops since one call processes many slots -- in practice the bottleneck + // is the producer side (CAS contention), and that's what the number reflects. + + private static final BiConsumer CONSUMER = (bh, slot) -> bh.consume(slot.value); + + @Group("e2e_8p") + @GroupThreads(8) + @Benchmark + public boolean e2e_producer(ThreadState ts) { + return e2eRing.tryWrite(ts.counter++, FILLER); + } + + @Group("e2e_8p") + @GroupThreads(1) + @Benchmark + public int e2e_consumer(Blackhole bh) { + int drained = e2eRing.drain(bh, CONSUMER); + if (drained == 0) Thread.yield(); + return drained; + } +} From e67dde72a50c9ea4e5870fe209d0164db737ffab Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 10:19:54 -0400 Subject: [PATCH 3/8] Add RingVsQueueBenchmark for MpscRingBuffer vs MpscArrayQueue Head-to-head benches in dd-trace-core's jmh source set (which already depends on jctools): MpscRingBuffer mutating pre-allocated slots vs MpscArrayQueue with a fresh Slot allocated per publish -- the latter mirrors the existing SpanSnapshot pattern in the CSS code. write_ring_8p / write_queue_8p compare publish cost with a background drainer; e2e_ring_8p / e2e_queue_8p use @Group to pair 8 producers with 1 consumer for end-to-end throughput. Run with -prof gc to see per-op allocation rate where the ring's win shows up loudest. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../util/concurrent/RingVsQueueBenchmark.java | 191 ++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java new file mode 100644 index 00000000000..6a1e3d7c802 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java @@ -0,0 +1,191 @@ +package datadog.trace.util.concurrent; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.function.BiConsumer; +import org.jctools.queues.MpscArrayQueue; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Head-to-head comparison of {@link MpscRingBuffer} (mutable pre-allocated slots) against the + * conventional approach of a jctools {@link MpscArrayQueue} with a fresh {@code Slot} allocated per + * publish. The latter is the pattern the current CSS code uses for {@code SpanSnapshot} on the + * producer side, so the delta between the two measures the actual allocation/handoff saving of the + * ring-buffer rewrite. + * + *

+ * + *

Run with {@code -prof gc} to also see per-op allocation rate — that's where the ring's win is + * loudest, since the queue allocates one {@code Slot} per publish and the ring allocates none. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Fork(value = 1) +public class RingVsQueueBenchmark { + + /** + * Shared slot type. {@code MpscRingBuffer} pre-allocates these and the producer mutates in place; + * the queue path allocates a fresh one per publish and offers the reference. Two constructors so + * both styles read naturally. + */ + public static final class Slot { + long value; + + Slot() {} + + Slot(final long value) { + this.value = value; + } + } + + // Static (non-capturing) handlers. Passing ts/bh as context lets the JIT keep these as + // singleton functions and avoid per-call lambda allocation. + private static final BiConsumer RING_FILLER = + (ts, slot) -> { + slot.value = ts.counter; + ts.counter++; + }; + + private static final BiConsumer RING_CONSUMER = + (bh, slot) -> bh.consume(slot.value); + + @Param({"1024"}) + public int capacity; + + /** Write-side benchmark structures. Drained by a background thread so they never fill. */ + MpscRingBuffer ring; + + MpscArrayQueue queue; + + /** E2e benchmark structures. JMH drives both sides via {@code @Group}; no background drainer. */ + MpscRingBuffer e2eRing; + + MpscArrayQueue e2eQueue; + + private volatile boolean stopDrainers; + private Thread ringDrainer; + private Thread queueDrainer; + + @Setup(Level.Trial) + public void setup() { + ring = new MpscRingBuffer<>(Slot::new, capacity); + queue = new MpscArrayQueue<>(capacity); + e2eRing = new MpscRingBuffer<>(Slot::new, capacity); + e2eQueue = new MpscArrayQueue<>(capacity); + + stopDrainers = false; + ringDrainer = + new Thread( + () -> { + while (!stopDrainers) { + if (ring.drain((Slot s) -> {}) == 0) Thread.yield(); + } + }, + "RingVsQueueBenchmark-ringDrainer"); + ringDrainer.setDaemon(true); + ringDrainer.start(); + + queueDrainer = + new Thread( + () -> { + while (!stopDrainers) { + Slot s = queue.poll(); + if (s == null) Thread.yield(); + } + }, + "RingVsQueueBenchmark-queueDrainer"); + queueDrainer.setDaemon(true); + queueDrainer.start(); + } + + @TearDown(Level.Trial) + public void teardown() throws InterruptedException { + stopDrainers = true; + ringDrainer.join(5_000); + queueDrainer.join(5_000); + } + + @State(Scope.Thread) + public static class ThreadState { + long counter; + } + + // ============ Write-side throughput ============ + + @Threads(8) + @Benchmark + public boolean write_ring_8p(final ThreadState ts) { + return ring.tryWrite(ts, RING_FILLER); + } + + /** Mirrors the SpanSnapshot pattern: allocate a fresh instance per publish, offer it. */ + @Threads(8) + @Benchmark + public boolean write_queue_8p(final ThreadState ts) { + return queue.offer(new Slot(ts.counter++)); + } + + // ============ End-to-end producer/consumer ============ + + @Group("e2e_ring_8p") + @GroupThreads(8) + @Benchmark + public boolean e2e_ring_producer(final ThreadState ts) { + return e2eRing.tryWrite(ts, RING_FILLER); + } + + @Group("e2e_ring_8p") + @GroupThreads(1) + @Benchmark + public int e2e_ring_consumer(final Blackhole bh) { + int drained = e2eRing.drain(bh, RING_CONSUMER); + if (drained == 0) Thread.yield(); + return drained; + } + + @Group("e2e_queue_8p") + @GroupThreads(8) + @Benchmark + public boolean e2e_queue_producer(final ThreadState ts) { + return e2eQueue.offer(new Slot(ts.counter++)); + } + + @Group("e2e_queue_8p") + @GroupThreads(1) + @Benchmark + public int e2e_queue_consumer(final Blackhole bh) { + int drained = 0; + Slot slot; + while ((slot = e2eQueue.poll()) != null) { + bh.consume(slot.value); + drained++; + } + if (drained == 0) Thread.yield(); + return drained; + } +} From cec6abf082cdd018fc2bb73dc348de00cbcc16b0 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 11:16:24 -0400 Subject: [PATCH 4/8] Document MpscRingBuffer thread-safety contract; publish on filler throw Spell out the contract that slot users rely on: plain (non-volatile) fields, no retention past handler return, slot-reference-not-shared. Producer fillers must not throw if possible -- and if they do, the slot is now published anyway (try/finally) so the consumer can't deadlock waiting for an unfinished sequence. Test covers the throw-then-recover path; the ring's cursors stay healthy. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 61 +++++++++++++++++-- .../util/concurrent/MpscRingBufferTest.java | 32 ++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java index f6b9ee998ed..121c5717a1d 100644 --- a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -20,6 +20,44 @@ * Hashtable.forEach}. That ordering lets callers declare the callback as a {@code static final} * non-capturing lambda and pass per-call context at the call site without allocating a closure. * + *

Thread safety contract

+ * + *

The ring buffer is thread-safe for any number of producer threads plus exactly one consumer + * thread. Calling {@code drain} from multiple threads concurrently is not supported and will + * corrupt state. + * + *

For the slot type {@code T}: + * + *

+ * + *

For producer fillers: + * + *

+ * + *

Implementation

+ * *

Producer cursor is CAS-claimed; visibility of a claimed slot to the consumer is gated by a * per-slot publication-sequence array. Consumer cursor is updated with a volatile write so * producers observe space being freed. @@ -90,16 +128,24 @@ public boolean isEmpty() { public boolean tryWrite(final Consumer filler) { final long seq = claim(); if (seq < 0L) return false; - filler.accept(slots[(int) (seq & mask)]); - publish(seq); + // publish in finally so a throwing filler doesn't leave the slot un-published -- the + // consumer would otherwise wait at that sequence forever. See class javadoc. + try { + filler.accept(slots[(int) (seq & mask)]); + } finally { + publish(seq); + } return true; } public boolean tryWrite(final C context, final BiConsumer filler) { final long seq = claim(); if (seq < 0L) return false; - filler.accept(context, slots[(int) (seq & mask)]); - publish(seq); + try { + filler.accept(context, slots[(int) (seq & mask)]); + } finally { + publish(seq); + } return true; } @@ -109,8 +155,11 @@ public boolean tryWrite( final TriConsumer filler) { final long seq = claim(); if (seq < 0L) return false; - filler.accept(context1, context2, slots[(int) (seq & mask)]); - publish(seq); + try { + filler.accept(context1, context2, slots[(int) (seq & mask)]); + } finally { + publish(seq); + } return true; } diff --git a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java index 43b89271621..d48c872883f 100644 --- a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java +++ b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -244,4 +245,35 @@ void sizeReflectsOutstandingWrites() { ring.drain(s -> {}); assertEquals(0, ring.size()); } + + @Test + void throwingFillerStillPublishesSoConsumerDoesntHang() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + + // First write succeeds. + assertTrue(ring.tryWrite(s -> s.value = 1)); + + // Second write throws midway through filling. Slot must still be published so the consumer's + // drain can advance past it. + RuntimeException boom = new RuntimeException("boom"); + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> + ring.tryWrite( + s -> { + s.value = 2; + throw boom; + })); + assertSame(boom, thrown); + + // Third write proves the ring's cursors are still healthy after the throw. + assertTrue(ring.tryWrite(s -> s.value = 3)); + + // Consumer drains all three: it must not hang on the partially-filled slot. + List seen = new ArrayList<>(); + int drained = ring.drain(s -> seen.add(s.value)); + assertEquals(3, drained, "consumer must advance past the throwing slot"); + assertEquals(Arrays.asList(1, 2, 3), seen, "throwing slot keeps whatever filler had written"); + } } From 9162a1ef472e5e4a09db85a72f6512ac346d48d0 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 11:25:10 -0400 Subject: [PATCH 5/8] Use AtomicLongFieldUpdater for the producer cursor Saves one wrapper allocation per ring instance: producerCursor becomes a volatile long on the instance, paired with a static AtomicLongFieldUpdater for CAS. Same memory ordering as the prior AtomicLong (volatile field + field-updater CAS), but no per-instance wrapper object. publishedSequences stays AtomicLongArray -- the field updater approach doesn't apply to array element access. consumerCursor was already a plain volatile long with no wrapper. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java index 121c5717a1d..77cc8d08435 100644 --- a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -1,8 +1,8 @@ package datadog.trace.util.concurrent; import datadog.trace.api.function.TriConsumer; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; @@ -77,12 +77,17 @@ public final class MpscRingBuffer { private final int capacity; private final int mask; - /** Next sequence to claim. Producers increment via CAS. */ - private final AtomicLong producerCursor = new AtomicLong(-1L); + /** Next sequence to claim. Producers increment via CAS through {@link #PRODUCER_CURSOR}. */ + @SuppressWarnings("unused") // accessed via PRODUCER_CURSOR field updater + private volatile long producerCursor = -1L; + + @SuppressWarnings("rawtypes") // AtomicLongFieldUpdater cannot reference a parameterized type + private static final AtomicLongFieldUpdater PRODUCER_CURSOR = + AtomicLongFieldUpdater.newUpdater(MpscRingBuffer.class, "producerCursor"); /** * Highest sequence consumed. Volatile so producers see space freed up; only the consumer thread - * writes to it, so a plain field with a manual volatile-store would also work. + * writes to it. */ private volatile long consumerCursor = -1L; @@ -112,7 +117,7 @@ public int capacity() { /** Approximate count of slots holding unread items. May briefly exceed capacity under race. */ public int size() { - final long p = producerCursor.get(); + final long p = producerCursor; final long c = consumerCursor; final long diff = p - c; if (diff <= 0) return 0; @@ -121,7 +126,7 @@ public int size() { } public boolean isEmpty() { - return producerCursor.get() == consumerCursor; + return producerCursor == consumerCursor; } /** {@code true} if the slot was filled and published; {@code false} if the ring is full. */ @@ -215,7 +220,7 @@ public int drain( /** CAS-claim the next sequence, or return {@code -1} if the ring is full. */ private long claim() { while (true) { - final long current = producerCursor.get(); + final long current = producerCursor; // Stale read of consumerCursor is fine: a false "full" reading just causes a drop, and // a real "full" reading is correctly identified because consumerCursor only advances. final long consumed = consumerCursor; @@ -223,7 +228,7 @@ private long claim() { return -1L; } final long next = current + 1L; - if (producerCursor.compareAndSet(current, next)) { + if (PRODUCER_CURSOR.compareAndSet(this, current, next)) { return next; } // CAS failure -> another producer claimed; retry. From e8ead019ba9cfd9f88fc07284654690c9123701d Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 13:44:44 -0400 Subject: [PATCH 6/8] Pad MpscRingBuffer cursors and stride publishedSequences Two related cache-line fixes for the producer hot path under heavy contention: 1. Stride publishedSequences by 8 longs (one cache line). Without this, adjacent logical slots share cache lines and concurrent producers writing nearby sequences ping-pong the same line between cores. The array grows by 8x but the upfront cost is bounded by the ring's capacity (e.g. 8 MB at the CSS default cap=131072). 2. Cache-line-pad the producerCursor and consumerCursor against each other using the standard Disruptor class-hierarchy pattern. Every consumer-side advance of consumerCursor would otherwise invalidate the line producers read for producerCursor (and vice versa). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 82 +++++++++++++------ 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java index 77cc8d08435..b84ee54922f 100644 --- a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -7,6 +7,35 @@ import java.util.function.Consumer; import java.util.function.Supplier; +// Disruptor-style cache-line padding around the cursors. The two cursors live on different cache +// lines so consumer-side writes to consumerCursor don't invalidate the line producers read for +// producerCursor (and vice versa). Each padding class declares 7 longs (56 bytes); combined with +// the cursor's own 8 bytes plus the JVM object header, each cursor + its surrounding pad fills +// a 64-byte cache line. The HotSpot field-layout strategy preserves the declaration order across +// the class hierarchy, so this pattern is reliable on all production JVMs we target. + +abstract class MpscRingBufferPad0 { + long p01, p02, p03, p04, p05, p06, p07; +} + +abstract class MpscRingBufferProducerCursor extends MpscRingBufferPad0 { + /** Next sequence to claim. Producers increment via CAS through {@code PRODUCER_CURSOR}. */ + volatile long producerCursor = -1L; +} + +abstract class MpscRingBufferPad1 extends MpscRingBufferProducerCursor { + long p11, p12, p13, p14, p15, p16, p17; +} + +abstract class MpscRingBufferConsumerCursor extends MpscRingBufferPad1 { + /** Highest sequence consumed. Only the consumer thread writes; producers read volatile. */ + volatile long consumerCursor = -1L; +} + +abstract class MpscRingBufferPad2 extends MpscRingBufferConsumerCursor { + long p21, p22, p23, p24, p25, p26, p27; +} + /** * Bounded multi-producer / single-consumer ring buffer of pre-allocated {@code T} instances. * @@ -60,37 +89,38 @@ * *

Producer cursor is CAS-claimed; visibility of a claimed slot to the consumer is gated by a * per-slot publication-sequence array. Consumer cursor is updated with a volatile write so - * producers observe space being freed. + * producers observe space being freed. Cursors are cache-line-padded against each other (see the + * {@code MpscRingBufferPad*} hierarchy at the top of this file) and the publication-sequence array + * is strided so each logical entry occupies a distinct cache line. */ -public final class MpscRingBuffer { +public final class MpscRingBuffer extends MpscRingBufferPad2 { + + /** + * Cache line size in {@code long}-units. 64-byte cache lines on every common CPU we ship to (x86, + * ARM); 8 bytes per long. Each logical slot in {@link #publishedSequences} is spread out by this + * stride so adjacent logical sequences don't share a cache line and don't ping-pong between + * producer cores under heavy contention. + */ + private static final int CACHE_LINE_LONGS = 8; + + @SuppressWarnings("rawtypes") // AtomicLongFieldUpdater can't take a parameterized class + private static final AtomicLongFieldUpdater PRODUCER_CURSOR = + AtomicLongFieldUpdater.newUpdater(MpscRingBufferProducerCursor.class, "producerCursor"); private final T[] slots; /** - * Per-slot publication sequence. Producers write the claimed sequence here as the last step of a - * publish (release write via {@link AtomicLongArray#set}); the consumer reads it (acquire read) - * to determine whether the slot at the next position is ready. A slot is considered published for - * sequence {@code s} iff {@code sequences[s & mask] == s}. + * Per-slot publication sequence, strided by {@link #CACHE_LINE_LONGS} to avoid false sharing. + * Producers write the claimed sequence here as the last step of a publish (release write via + * {@link AtomicLongArray#set}); the consumer reads it (acquire read) to determine whether the + * slot at the next position is ready. A slot is considered published for sequence {@code s} iff + * {@code publishedSequences[(s & mask) * CACHE_LINE_LONGS] == s}. */ private final AtomicLongArray publishedSequences; private final int capacity; private final int mask; - /** Next sequence to claim. Producers increment via CAS through {@link #PRODUCER_CURSOR}. */ - @SuppressWarnings("unused") // accessed via PRODUCER_CURSOR field updater - private volatile long producerCursor = -1L; - - @SuppressWarnings("rawtypes") // AtomicLongFieldUpdater cannot reference a parameterized type - private static final AtomicLongFieldUpdater PRODUCER_CURSOR = - AtomicLongFieldUpdater.newUpdater(MpscRingBuffer.class, "producerCursor"); - - /** - * Highest sequence consumed. Volatile so producers see space freed up; only the consumer thread - * writes to it. - */ - private volatile long consumerCursor = -1L; - @SuppressWarnings("unchecked") public MpscRingBuffer(final Supplier factory, final int capacityHint) { if (capacityHint < 1) { @@ -102,12 +132,12 @@ public MpscRingBuffer(final Supplier factory, final int capacityHint) { } this.mask = capacity - 1; this.slots = (T[]) new Object[capacity]; - this.publishedSequences = new AtomicLongArray(capacity); + this.publishedSequences = new AtomicLongArray(capacity * CACHE_LINE_LONGS); for (int i = 0; i < capacity; i++) { slots[i] = factory.get(); // Initial: sentinel "no sequence published here yet" -- anything < 0 works since // sequences are 0-based and monotonically increasing. - publishedSequences.set(i, Long.MIN_VALUE); + publishedSequences.set(i * CACHE_LINE_LONGS, Long.MIN_VALUE); } } @@ -175,7 +205,7 @@ public int drain(final Consumer handler) { while (true) { final long nextSeq = cursor + 1L; final int idx = (int) (nextSeq & mask); - if (publishedSequences.get(idx) != nextSeq) break; + if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break; handler.accept(slots[idx]); cursor = nextSeq; count++; @@ -190,7 +220,7 @@ public int drain(final C context, final BiConsumer han while (true) { final long nextSeq = cursor + 1L; final int idx = (int) (nextSeq & mask); - if (publishedSequences.get(idx) != nextSeq) break; + if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break; handler.accept(context, slots[idx]); cursor = nextSeq; count++; @@ -208,7 +238,7 @@ public int drain( while (true) { final long nextSeq = cursor + 1L; final int idx = (int) (nextSeq & mask); - if (publishedSequences.get(idx) != nextSeq) break; + if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break; handler.accept(context1, context2, slots[idx]); cursor = nextSeq; count++; @@ -237,7 +267,7 @@ private long claim() { /** Mark sequence {@code seq} as published. Release semantics via {@link AtomicLongArray#set}. */ private void publish(final long seq) { - publishedSequences.set((int) (seq & mask), seq); + publishedSequences.set(((int) (seq & mask)) * CACHE_LINE_LONGS, seq); } private static int nextPowerOfTwo(final int n) { From 7e6f49713b17ea9e3dc53ea2496610e392f1a232 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 16:17:22 -0400 Subject: [PATCH 7/8] Add MpscRingBuffer.tryClaim(n) batch-claim API A single CAS claims n contiguous slots, returning a Batch handle that the caller fills via fillAndPublish(slot...). Designed for callers whose work has a natural batch boundary (e.g. CSS publishing a trace's worth of metrics-eligible spans in one shot): cuts producer-cursor contention from O(N) CASes to O(1) per call. All-or-nothing: tryClaim(n) returns null if the ring can't fit the whole batch. The Batch is single-threaded (owned by the claiming thread), short-lived (scoped to one publish call), and has no thread-shutdown hazard -- the batch is fully consumed before returning. Filler-throw safety matches the existing tryWrite contract: the slot is published in a finally block so the consumer can advance, and the batch's published counter increments either way. Tests cover: requested size, capacity rejection, all-or-nothing, three filler overloads, over-publish IllegalStateException, throw recovery, and 8-producer concurrency (200 batches/thread x 16 size = 25600 items, single consumer sees every value exactly once). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 106 ++++++++++++ .../util/concurrent/MpscRingBufferTest.java | 162 ++++++++++++++++++ 2 files changed, 268 insertions(+) diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java index b84ee54922f..cce1b46aa6f 100644 --- a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -247,6 +247,40 @@ public int drain( return count; } + /** + * Try to claim a contiguous range of {@code n} sequences in a single CAS. Returns {@code null} if + * the ring doesn't have room for the whole batch -- the caller treats that as "drop all {@code + * n}", which is the natural shape for callers that batch by a higher-level unit (e.g. one CSS + * publish per completed trace). When the caller has a list of N items to write, this amortizes + * producer-cursor contention from O(N) CASes to O(1) per call. + * + *

The returned {@link Batch} must be filled via {@link Batch#fillAndPublish} exactly {@code n} + * times. Under-publishing leaves the ring stuck at the unfilled sequence -- the consumer waits + * there forever. Over-publishing throws {@link IllegalStateException}. + * + * @throws IllegalArgumentException if {@code n < 1} + */ + public Batch tryClaim(final int n) { + if (n < 1) { + throw new IllegalArgumentException("n must be >= 1, got " + n); + } + while (true) { + final long current = producerCursor; + // Stale read of consumerCursor is fine: a false "full" reading just causes a drop, and a + // real one is correctly identified because consumerCursor only advances. + final long consumed = consumerCursor; + final long next = current + n; + if (next - consumed > capacity) { + return null; + } + if (PRODUCER_CURSOR.compareAndSet(this, current, next)) { + // Claimed sequences [current + 1, next] inclusive (== n sequences total). + return new Batch(current + 1L, n); + } + // CAS failure -> another producer claimed; retry. + } + } + /** CAS-claim the next sequence, or return {@code -1} if the ring is full. */ private long claim() { while (true) { @@ -265,6 +299,78 @@ private long claim() { } } + /** + * Handle returned by {@link MpscRingBuffer#tryClaim}. Holds a contiguous range of pre-claimed + * sequences belonging to the producer thread that called {@code tryClaim}; the caller must fill + * and publish each via {@link #fillAndPublish}. + * + *

Not thread-safe -- the producer thread owns it for the lifetime of the call. Do not + * share across threads. + */ + public final class Batch { + private final long startSeq; + private final int size; + private int published; + + Batch(final long startSeq, final int size) { + this.startSeq = startSeq; + this.size = size; + } + + /** Total slots in this batch (the {@code n} passed to {@code tryClaim}). */ + public int size() { + return size; + } + + /** Slots not yet filled. */ + public int remaining() { + return size - published; + } + + public void fillAndPublish(final Consumer filler) { + final long seq = nextSeq(); + final int idx = (int) (seq & mask); + try { + filler.accept(slots[idx]); + } finally { + publishedSequences.set(idx * CACHE_LINE_LONGS, seq); + } + } + + public void fillAndPublish(final C context, final BiConsumer filler) { + final long seq = nextSeq(); + final int idx = (int) (seq & mask); + try { + filler.accept(context, slots[idx]); + } finally { + publishedSequences.set(idx * CACHE_LINE_LONGS, seq); + } + } + + public void fillAndPublish( + final C1 context1, + final C2 context2, + final TriConsumer filler) { + final long seq = nextSeq(); + final int idx = (int) (seq & mask); + try { + filler.accept(context1, context2, slots[idx]); + } finally { + publishedSequences.set(idx * CACHE_LINE_LONGS, seq); + } + } + + private long nextSeq() { + if (published >= size) { + throw new IllegalStateException( + "Batch over-published: size=" + size + " published=" + published); + } + final long seq = startSeq + published; + published++; + return seq; + } + } + /** Mark sequence {@code seq} as published. Release semantics via {@link AtomicLongArray#set}. */ private void publish(final long seq) { publishedSequences.set(((int) (seq & mask)) * CACHE_LINE_LONGS, seq); diff --git a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java index d48c872883f..48ab2ea4415 100644 --- a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java +++ b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java @@ -2,6 +2,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -276,4 +278,164 @@ void throwingFillerStillPublishesSoConsumerDoesntHang() { assertEquals(3, drained, "consumer must advance past the throwing slot"); assertEquals(Arrays.asList(1, 2, 3), seen, "throwing slot keeps whatever filler had written"); } + + // ============ Batch claim (tryClaim) ============ + + @Test + void tryClaimReturnsBatchOfRequestedSize() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + MpscRingBuffer.Batch batch = ring.tryClaim(3); + + assertNotNull(batch); + assertEquals(3, batch.size()); + assertEquals(3, batch.remaining()); + } + + @Test + void tryClaimRejectsZeroOrNegative() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + assertThrows(IllegalArgumentException.class, () -> ring.tryClaim(0)); + assertThrows(IllegalArgumentException.class, () -> ring.tryClaim(-1)); + } + + @Test + void tryClaimReturnsNullWhenRingCantFitBatch() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + assertNotNull(ring.tryClaim(3)); + // Only 1 slot left; claiming 2 must fail wholesale. + assertNull(ring.tryClaim(2), "all-or-nothing: partial batches are not allowed"); + // But one more slot does fit. + assertNotNull(ring.tryClaim(1)); + // Now full. + assertNull(ring.tryClaim(1)); + } + + @Test + void tryClaimFillAndPublishDeliversAllToDrain() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + MpscRingBuffer.Batch batch = ring.tryClaim(5); + + for (int i = 0; i < 5; i++) { + final int v = i; + batch.fillAndPublish(s -> s.value = v); + } + assertEquals(0, batch.remaining()); + + List seen = new ArrayList<>(); + int drained = ring.drain(s -> seen.add(s.value)); + assertEquals(5, drained); + assertEquals(Arrays.asList(0, 1, 2, 3, 4), seen, "batch publishes in order"); + } + + @Test + void overPublishingBatchThrows() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + MpscRingBuffer.Batch batch = ring.tryClaim(2); + + batch.fillAndPublish(s -> s.value = 1); + batch.fillAndPublish(s -> s.value = 2); + assertThrows(IllegalStateException.class, () -> batch.fillAndPublish(s -> s.value = 3)); + } + + @Test + void batchSupportsContextFillers() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + MpscRingBuffer.Batch batch = ring.tryClaim(3); + + BiConsumer oneCtx = (v, s) -> s.value = v; + TriConsumer twoCtx = + (v, t, s) -> { + s.value = v; + s.tag = t; + }; + + batch.fillAndPublish(s -> s.value = 1); + batch.fillAndPublish(2, oneCtx); + batch.fillAndPublish(3, "three", twoCtx); + + List seen = new ArrayList<>(); + ring.drain(s -> seen.add(s.value + "/" + s.tag)); + assertEquals(Arrays.asList("1/null", "2/null", "3/three"), seen); + } + + @Test + void batchFillerThrowStillPublishesAndAdvances() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + MpscRingBuffer.Batch batch = ring.tryClaim(3); + + batch.fillAndPublish(s -> s.value = 1); + RuntimeException boom = new RuntimeException("boom"); + assertThrows( + RuntimeException.class, + () -> + batch.fillAndPublish( + s -> { + s.value = 2; + throw boom; + })); + // The throwing slot's sequence has already been consumed; published counter advanced. + assertEquals(1, batch.remaining(), "throwing slot still counts as published"); + batch.fillAndPublish(s -> s.value = 3); + + List seen = new ArrayList<>(); + int drained = ring.drain(s -> seen.add(s.value)); + assertEquals(3, drained); + assertEquals(Arrays.asList(1, 2, 3), seen); + } + + @Test + void concurrentBatchClaimsAreOrderedAndDontInterleave() throws InterruptedException { + final int producers = 8; + final int batchesPerProducer = 200; + final int batchSize = 16; + final int total = producers * batchesPerProducer * batchSize; + + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 256); + ExecutorService pool = Executors.newFixedThreadPool(producers); + AtomicInteger writes = new AtomicInteger(); + CountDownLatch start = new CountDownLatch(1); + + for (int p = 0; p < producers; p++) { + final int base = p * batchesPerProducer * batchSize; + pool.submit( + () -> { + try { + start.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + for (int b = 0; b < batchesPerProducer; b++) { + MpscRingBuffer.Batch batch; + while ((batch = ring.tryClaim(batchSize)) == null) { + Thread.yield(); + } + for (int i = 0; i < batchSize; i++) { + final int v = base + b * batchSize + i; + batch.fillAndPublish(s -> s.value = v); + } + writes.addAndGet(batchSize); + } + }); + } + + Set seen = new HashSet<>(total); + Thread consumer = + new Thread( + () -> { + while (seen.size() < total) { + if (ring.drain((Slot s) -> seen.add(s.value)) == 0) Thread.yield(); + } + }, + "ring-batch-consumer"); + consumer.start(); + + start.countDown(); + pool.shutdown(); + assertTrue(pool.awaitTermination(30, TimeUnit.SECONDS), "producers timed out"); + consumer.join(30_000); + assertFalse(consumer.isAlive(), "consumer timed out"); + assertEquals(total, writes.get()); + assertEquals(total, seen.size(), "consumer must see every value exactly once"); + } } From 780a20428af3690682b25c104b990d52e230f196 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Thu, 28 May 2026 16:28:52 -0400 Subject: [PATCH 8/8] Add low-level tryClaimRange/slotAt/publish primitives to MpscRingBuffer The Batch handle from tryClaim was supposed to be scalar-replaced by escape analysis, but JMH measurements showed it's not -- the inner-class implicit this$0 plus the CAS-retry inside tryClaim block scalarization on HotSpot. Result: ~24 bytes of Batch + cursor state allocated per publish on the hot path, ~50% throughput drop on single-element claims in CSS-style benches. Add three sequence-based primitives that callers manage directly: long tryClaimRange(int n) -> start sequence or -1L T slotAt(long seq) -> slot for that sequence void publish(long seq) -> release the slot to the consumer No per-call allocation, no callback dispatch. Callers handle the sequence arithmetic themselves and trade safety (forget to publish -> ring stuck) for hot-path predictability. The Batch API stays for safer use cases. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/util/concurrent/MpscRingBuffer.java | 52 ++++++++++++++++- .../util/concurrent/MpscRingBufferTest.java | 57 +++++++++++++++++++ 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java index cce1b46aa6f..4cefa43cac0 100644 --- a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java +++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java @@ -371,8 +371,56 @@ private long nextSeq() { } } - /** Mark sequence {@code seq} as published. Release semantics via {@link AtomicLongArray#set}. */ - private void publish(final long seq) { + // ============ Low-level primitives ============ + // + // These three methods (tryClaimRange / slotAt / publish) expose the producer-side machinery + // directly. Callers manage the claimed sequence range themselves -- no per-call handle + // allocated, no callback dispatched. This is intended for hot paths where the higher-level + // tryWrite/tryClaim+Batch APIs would allocate per call (Iterator on the iterable being + // batched, or the Batch object itself) that escape analysis doesn't eliminate. + // + // Misuse hazards: every claimed sequence MUST be published exactly once; sequences must be + // published in [start, start+n) range only; the slot returned by slotAt() must not escape past + // the next publish() of any sequence (the producer "owns" the slot until publish). + + /** + * Try to claim a contiguous range of {@code n} sequences in a single CAS. Returns the + * start sequence on success (a non-negative long) or {@code -1L} if the ring doesn't have + * room for the whole batch. The caller must publish each sequence in {@code [start, start + n)} + * exactly once via {@link #publish(long)}. + * + * @throws IllegalArgumentException if {@code n < 1} + */ + public long tryClaimRange(final int n) { + if (n < 1) { + throw new IllegalArgumentException("n must be >= 1, got " + n); + } + while (true) { + final long current = producerCursor; + final long consumed = consumerCursor; + final long next = current + n; + if (next - consumed > capacity) { + return -1L; + } + if (PRODUCER_CURSOR.compareAndSet(this, current, next)) { + return current + 1L; + } + } + } + + /** + * Slot for sequence {@code seq}. Only safe when {@code seq} is in a range currently owned by the + * caller (claimed via {@link #tryClaimRange} and not yet published). + */ + public T slotAt(final long seq) { + return slots[(int) (seq & mask)]; + } + + /** + * Publish sequence {@code seq}, making the slot at {@code slotAt(seq)} visible to the consumer. + * Release semantics via {@link AtomicLongArray#set}. + */ + public void publish(final long seq) { publishedSequences.set(((int) (seq & mask)) * CACHE_LINE_LONGS, seq); } diff --git a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java index 48ab2ea4415..6aeb635abd0 100644 --- a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java +++ b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java @@ -383,6 +383,63 @@ void batchFillerThrowStillPublishesAndAdvances() { assertEquals(Arrays.asList(1, 2, 3), seen); } + // ============ Low-level primitives (tryClaimRange / slotAt / publish) ============ + + @Test + void tryClaimRangeReturnsStartSequence() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + long start1 = ring.tryClaimRange(3); + long start2 = ring.tryClaimRange(2); + + assertEquals(0L, start1, "first range starts at sequence 0"); + assertEquals(3L, start2, "second range begins immediately after the first"); + } + + @Test + void tryClaimRangeRejectsZeroOrNegative() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + assertThrows(IllegalArgumentException.class, () -> ring.tryClaimRange(0)); + assertThrows(IllegalArgumentException.class, () -> ring.tryClaimRange(-1)); + } + + @Test + void tryClaimRangeReturnsMinusOneWhenFull() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + assertTrue(ring.tryClaimRange(3) >= 0); + assertEquals(-1L, ring.tryClaimRange(2), "all-or-nothing"); + assertTrue(ring.tryClaimRange(1) >= 0); + assertEquals(-1L, ring.tryClaimRange(1)); + } + + @Test + void slotAtAndPublishRoundTrip() { + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8); + long start = ring.tryClaimRange(3); + assertTrue(start >= 0); + + for (int i = 0; i < 3; i++) { + long seq = start + i; + Slot slot = ring.slotAt(seq); + slot.value = (int) (seq + 100); + ring.publish(seq); + } + + List seen = new ArrayList<>(); + int drained = ring.drain(s -> seen.add(s.value)); + assertEquals(3, drained); + assertEquals(Arrays.asList(100, 101, 102), seen); + } + + @Test + void slotAtReturnsSameInstanceForSameModuloPosition() { + // After publish+drain wraps around, the slot at sequence N and sequence N+capacity are the + // same physical object (this is the whole point of the ring). + MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4); + Slot firstSlot = ring.slotAt(0L); + Slot wrappedSlot = ring.slotAt(4L); // 4 & mask(3) == 0 + assertSame(firstSlot, wrappedSlot); + } + @Test void concurrentBatchClaimsAreOrderedAndDontInterleave() throws InterruptedException { final int producers = 8;