diff --git a/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java
new file mode 100644
index 00000000000..6a1e3d7c802
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java
@@ -0,0 +1,191 @@
+package datadog.trace.util.concurrent;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.function.BiConsumer;
+import org.jctools.queues.MpscArrayQueue;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Head-to-head comparison of {@link MpscRingBuffer} (mutable pre-allocated slots) against the
+ * conventional approach of a jctools {@link MpscArrayQueue} with a fresh {@code Slot} allocated per
+ * publish. The latter is the pattern the current CSS code uses for {@code SpanSnapshot} on the
+ * producer side, so the delta between the two measures the actual allocation/handoff saving of the
+ * ring-buffer rewrite.
+ *
+ *
+ * - {@code write_*_8p} — 8 producers, background drainer keeps the structure empty so the
+ * measurement reflects publish cost, not full-structure drop cost. Pair-compare ring vs queue
+ * at matched capacity.
+ *
- {@code e2e_*_8p} — JMH {@code @Group} pairing 8 producers with 1 consumer for each
+ * structure. End-to-end ops/s under realistic backpressure.
+ *
+ *
+ * Run with {@code -prof gc} to also see per-op allocation rate — that's where the ring's win is
+ * loudest, since the queue allocates one {@code Slot} per publish and the ring allocates none.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Fork(value = 1)
+public class RingVsQueueBenchmark {
+
+ /**
+ * Shared slot type. {@code MpscRingBuffer} pre-allocates these and the producer mutates in place;
+ * the queue path allocates a fresh one per publish and offers the reference. Two constructors so
+ * both styles read naturally.
+ */
+ public static final class Slot {
+ long value;
+
+ Slot() {}
+
+ Slot(final long value) {
+ this.value = value;
+ }
+ }
+
+ // Static (non-capturing) handlers. Passing ts/bh as context lets the JIT keep these as
+ // singleton functions and avoid per-call lambda allocation.
+ private static final BiConsumer RING_FILLER =
+ (ts, slot) -> {
+ slot.value = ts.counter;
+ ts.counter++;
+ };
+
+ private static final BiConsumer RING_CONSUMER =
+ (bh, slot) -> bh.consume(slot.value);
+
+ @Param({"1024"})
+ public int capacity;
+
+ /** Write-side benchmark structures. Drained by a background thread so they never fill. */
+ MpscRingBuffer ring;
+
+ MpscArrayQueue queue;
+
+ /** E2e benchmark structures. JMH drives both sides via {@code @Group}; no background drainer. */
+ MpscRingBuffer e2eRing;
+
+ MpscArrayQueue e2eQueue;
+
+ private volatile boolean stopDrainers;
+ private Thread ringDrainer;
+ private Thread queueDrainer;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ ring = new MpscRingBuffer<>(Slot::new, capacity);
+ queue = new MpscArrayQueue<>(capacity);
+ e2eRing = new MpscRingBuffer<>(Slot::new, capacity);
+ e2eQueue = new MpscArrayQueue<>(capacity);
+
+ stopDrainers = false;
+ ringDrainer =
+ new Thread(
+ () -> {
+ while (!stopDrainers) {
+ if (ring.drain((Slot s) -> {}) == 0) Thread.yield();
+ }
+ },
+ "RingVsQueueBenchmark-ringDrainer");
+ ringDrainer.setDaemon(true);
+ ringDrainer.start();
+
+ queueDrainer =
+ new Thread(
+ () -> {
+ while (!stopDrainers) {
+ Slot s = queue.poll();
+ if (s == null) Thread.yield();
+ }
+ },
+ "RingVsQueueBenchmark-queueDrainer");
+ queueDrainer.setDaemon(true);
+ queueDrainer.start();
+ }
+
+ @TearDown(Level.Trial)
+ public void teardown() throws InterruptedException {
+ stopDrainers = true;
+ ringDrainer.join(5_000);
+ queueDrainer.join(5_000);
+ }
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ long counter;
+ }
+
+ // ============ Write-side throughput ============
+
+ @Threads(8)
+ @Benchmark
+ public boolean write_ring_8p(final ThreadState ts) {
+ return ring.tryWrite(ts, RING_FILLER);
+ }
+
+ /** Mirrors the SpanSnapshot pattern: allocate a fresh instance per publish, offer it. */
+ @Threads(8)
+ @Benchmark
+ public boolean write_queue_8p(final ThreadState ts) {
+ return queue.offer(new Slot(ts.counter++));
+ }
+
+ // ============ End-to-end producer/consumer ============
+
+ @Group("e2e_ring_8p")
+ @GroupThreads(8)
+ @Benchmark
+ public boolean e2e_ring_producer(final ThreadState ts) {
+ return e2eRing.tryWrite(ts, RING_FILLER);
+ }
+
+ @Group("e2e_ring_8p")
+ @GroupThreads(1)
+ @Benchmark
+ public int e2e_ring_consumer(final Blackhole bh) {
+ int drained = e2eRing.drain(bh, RING_CONSUMER);
+ if (drained == 0) Thread.yield();
+ return drained;
+ }
+
+ @Group("e2e_queue_8p")
+ @GroupThreads(8)
+ @Benchmark
+ public boolean e2e_queue_producer(final ThreadState ts) {
+ return e2eQueue.offer(new Slot(ts.counter++));
+ }
+
+ @Group("e2e_queue_8p")
+ @GroupThreads(1)
+ @Benchmark
+ public int e2e_queue_consumer(final Blackhole bh) {
+ int drained = 0;
+ Slot slot;
+ while ((slot = e2eQueue.poll()) != null) {
+ bh.consume(slot.value);
+ drained++;
+ }
+ if (drained == 0) Thread.yield();
+ return drained;
+ }
+}
diff --git a/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java b/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java
new file mode 100644
index 00000000000..e9d084f3334
--- /dev/null
+++ b/internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java
@@ -0,0 +1,146 @@
+package datadog.trace.util.concurrent;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.function.BiConsumer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Throughput benchmarks for {@link MpscRingBuffer}.
+ *
+ *
+ * - {@code write_1p / write_8p / write_16p} — producer-side throughput with a background
+ * drainer consuming what's published. Measures the cost of one {@code tryWrite} including CAS
+ * contention on the producer cursor at the given thread count.
+ *
- {@code e2e_8p} — JMH {@code @Group} pairing 8 producers with 1 consumer. Aggregate
+ * throughput reflects whichever side is the bottleneck under realistic pressure.
+ *
+ *
+ * Run with {@code -p capacity=...} to override the default ring capacity.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Fork(value = 1)
+public class MpscRingBufferBenchmark {
+
+ /**
+ * Static filler so the lambda is non-capturing and the JIT can hoist it past the {@code tryWrite}
+ * call. Context arg comes first, slot last — matches {@code TagMap.forEach} convention.
+ */
+ private static final BiConsumer FILLER = (v, slot) -> slot.value = v;
+
+ /** Mutable slot. Replicates the per-publish allocation a real producer wants to avoid. */
+ public static final class Slot {
+ long value;
+ }
+
+ @Param({"1024"})
+ public int capacity;
+
+ /**
+ * Shared ring for the {@code write_*} benches. A background drainer keeps space available so
+ * producer benchmarks measure write throughput rather than full-ring drop throughput.
+ */
+ MpscRingBuffer ring;
+
+ private volatile boolean stopDrainer;
+ private Thread drainerThread;
+
+ /**
+ * Separate ring for the {@code e2e_*} group benches. JMH drives both sides directly so we don't
+ * want our own background drainer for those.
+ */
+ MpscRingBuffer e2eRing;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ ring = new MpscRingBuffer<>(Slot::new, capacity);
+ e2eRing = new MpscRingBuffer<>(Slot::new, capacity);
+ stopDrainer = false;
+ drainerThread =
+ new Thread(
+ () -> {
+ while (!stopDrainer) {
+ if (ring.drain((Slot s) -> {}) == 0) Thread.yield();
+ }
+ },
+ "MpscRingBufferBenchmark-drainer");
+ drainerThread.setDaemon(true);
+ drainerThread.start();
+ }
+
+ @TearDown(Level.Trial)
+ public void teardown() throws InterruptedException {
+ stopDrainer = true;
+ drainerThread.join(5_000);
+ }
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ long counter;
+ }
+
+ // ============ Write throughput with background drainer ============
+
+ @Threads(1)
+ @Benchmark
+ public boolean write_1p(ThreadState ts) {
+ return ring.tryWrite(ts.counter++, FILLER);
+ }
+
+ @Threads(8)
+ @Benchmark
+ public boolean write_8p(ThreadState ts) {
+ return ring.tryWrite(ts.counter++, FILLER);
+ }
+
+ @Threads(16)
+ @Benchmark
+ public boolean write_16p(ThreadState ts) {
+ return ring.tryWrite(ts.counter++, FILLER);
+ }
+
+ // ============ End-to-end producer/consumer pairing ============
+ //
+ // JMH runs both methods in the same trial: 8 producer threads + 1 consumer thread. Throughput
+ // is reported as ops/sec aggregated across all 9 threads, but the consumer's drain count
+ // dwarfs the producer ops since one call processes many slots -- in practice the bottleneck
+ // is the producer side (CAS contention), and that's what the number reflects.
+
+ private static final BiConsumer CONSUMER = (bh, slot) -> bh.consume(slot.value);
+
+ @Group("e2e_8p")
+ @GroupThreads(8)
+ @Benchmark
+ public boolean e2e_producer(ThreadState ts) {
+ return e2eRing.tryWrite(ts.counter++, FILLER);
+ }
+
+ @Group("e2e_8p")
+ @GroupThreads(1)
+ @Benchmark
+ public int e2e_consumer(Blackhole bh) {
+ int drained = e2eRing.drain(bh, CONSUMER);
+ if (drained == 0) Thread.yield();
+ return drained;
+ }
+}
diff --git a/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java
new file mode 100644
index 00000000000..4cefa43cac0
--- /dev/null
+++ b/internal-api/src/main/java/datadog/trace/util/concurrent/MpscRingBuffer.java
@@ -0,0 +1,434 @@
+package datadog.trace.util.concurrent;
+
+import datadog.trace.api.function.TriConsumer;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+// Disruptor-style cache-line padding around the cursors. The two cursors live on different cache
+// lines so consumer-side writes to consumerCursor don't invalidate the line producers read for
+// producerCursor (and vice versa). Each padding class declares 7 longs (56 bytes); combined with
+// the cursor's own 8 bytes plus the JVM object header, each cursor + its surrounding pad fills
+// a 64-byte cache line. The HotSpot field-layout strategy preserves the declaration order across
+// the class hierarchy, so this pattern is reliable on all production JVMs we target.
+
+abstract class MpscRingBufferPad0 {
+ long p01, p02, p03, p04, p05, p06, p07;
+}
+
+abstract class MpscRingBufferProducerCursor extends MpscRingBufferPad0 {
+ /** Next sequence to claim. Producers increment via CAS through {@code PRODUCER_CURSOR}. */
+ volatile long producerCursor = -1L;
+}
+
+abstract class MpscRingBufferPad1 extends MpscRingBufferProducerCursor {
+ long p11, p12, p13, p14, p15, p16, p17;
+}
+
+abstract class MpscRingBufferConsumerCursor extends MpscRingBufferPad1 {
+ /** Highest sequence consumed. Only the consumer thread writes; producers read volatile. */
+ volatile long consumerCursor = -1L;
+}
+
+abstract class MpscRingBufferPad2 extends MpscRingBufferConsumerCursor {
+ long p21, p22, p23, p24, p25, p26, p27;
+}
+
+/**
+ * Bounded multi-producer / single-consumer ring buffer of pre-allocated {@code T} instances.
+ *
+ * Each slot is a long-lived {@code T} instance created at construction time via the supplied
+ * factory and recycled forever. Producers mutate slots in place via callbacks; the consumer reads
+ * them the same way. No allocation occurs per write/read after construction, which is the entire
+ * point of this class over a queue of references.
+ *
+ *
The {@code BiConsumer} and {@link TriConsumer} variants take their context object(s)
+ * before the slot, matching the convention of {@code TagMap.forEach} and {@code
+ * Hashtable.forEach}. That ordering lets callers declare the callback as a {@code static final}
+ * non-capturing lambda and pass per-call context at the call site without allocating a closure.
+ *
+ *
Thread safety contract
+ *
+ * The ring buffer is thread-safe for any number of producer threads plus exactly one consumer
+ * thread. Calling {@code drain} from multiple threads concurrently is not supported and will
+ * corrupt state.
+ *
+ *
For the slot type {@code T}:
+ *
+ *
+ * - Slot fields can be plain ({@code int}, {@code long}, object references) -- they do
+ * not need to be {@code volatile} or guarded by synchronization. Happens-before
+ * between the producer's slot mutation and the consumer's slot read is provided by the ring's
+ * internal publication-sequence machinery: a release write on the per-slot sequence inside
+ * {@code tryWrite}, paired with an acquire read inside {@code drain}.
+ *
- Don't retain slot references past your handler's return. Once a {@code tryWrite}
+ * filler returns, the slot becomes visible to the consumer; once a {@code drain} handler
+ * returns, the slot may be reclaimed by another producer and its fields overwritten. If the
+ * consumer needs to keep any state from a slot, it must extract by value (or copy references)
+ * before returning.
+ *
- Don't expose slot references outside the ring. Treat {@code T} as ring-buffer-owned;
+ * sharing a slot reference with code that doesn't follow the same discipline breaks the
+ * happens-before story.
+ *
+ *
+ * For producer fillers:
+ *
+ *
+ * - Filler invocations on the same slot are serialized (one producer wins the sequence CAS), so
+ * the filler can write fields without synchronization.
+ *
- If a filler throws, the slot is published anyway (with whatever the filler had
+ * written so far) and the exception propagates to the caller. This prevents the consumer from
+ * getting stuck waiting for an unfinished slot; the cost is that the consumer may observe a
+ * partially-filled or stale-fielded slot. Fillers should be written to either not throw or to
+ * leave the slot in a state the consumer can recognize and skip.
+ *
+ *
+ * Implementation
+ *
+ * Producer cursor is CAS-claimed; visibility of a claimed slot to the consumer is gated by a
+ * per-slot publication-sequence array. Consumer cursor is updated with a volatile write so
+ * producers observe space being freed. Cursors are cache-line-padded against each other (see the
+ * {@code MpscRingBufferPad*} hierarchy at the top of this file) and the publication-sequence array
+ * is strided so each logical entry occupies a distinct cache line.
+ */
+public final class MpscRingBuffer extends MpscRingBufferPad2 {
+
+ /**
+ * Cache line size in {@code long}-units. 64-byte cache lines on every common CPU we ship to (x86,
+ * ARM); 8 bytes per long. Each logical slot in {@link #publishedSequences} is spread out by this
+ * stride so adjacent logical sequences don't share a cache line and don't ping-pong between
+ * producer cores under heavy contention.
+ */
+ private static final int CACHE_LINE_LONGS = 8;
+
+ @SuppressWarnings("rawtypes") // AtomicLongFieldUpdater can't take a parameterized class
+ private static final AtomicLongFieldUpdater PRODUCER_CURSOR =
+ AtomicLongFieldUpdater.newUpdater(MpscRingBufferProducerCursor.class, "producerCursor");
+
+ private final T[] slots;
+
+ /**
+ * Per-slot publication sequence, strided by {@link #CACHE_LINE_LONGS} to avoid false sharing.
+ * Producers write the claimed sequence here as the last step of a publish (release write via
+ * {@link AtomicLongArray#set}); the consumer reads it (acquire read) to determine whether the
+ * slot at the next position is ready. A slot is considered published for sequence {@code s} iff
+ * {@code publishedSequences[(s & mask) * CACHE_LINE_LONGS] == s}.
+ */
+ private final AtomicLongArray publishedSequences;
+
+ private final int capacity;
+ private final int mask;
+
+ @SuppressWarnings("unchecked")
+ public MpscRingBuffer(final Supplier factory, final int capacityHint) {
+ if (capacityHint < 1) {
+ throw new IllegalArgumentException("capacityHint must be >= 1, got " + capacityHint);
+ }
+ this.capacity = nextPowerOfTwo(capacityHint);
+ if (this.capacity < 1) {
+ throw new IllegalArgumentException("capacity overflow for hint " + capacityHint);
+ }
+ this.mask = capacity - 1;
+ this.slots = (T[]) new Object[capacity];
+ this.publishedSequences = new AtomicLongArray(capacity * CACHE_LINE_LONGS);
+ for (int i = 0; i < capacity; i++) {
+ slots[i] = factory.get();
+ // Initial: sentinel "no sequence published here yet" -- anything < 0 works since
+ // sequences are 0-based and monotonically increasing.
+ publishedSequences.set(i * CACHE_LINE_LONGS, Long.MIN_VALUE);
+ }
+ }
+
+ public int capacity() {
+ return capacity;
+ }
+
+ /** Approximate count of slots holding unread items. May briefly exceed capacity under race. */
+ public int size() {
+ final long p = producerCursor;
+ final long c = consumerCursor;
+ final long diff = p - c;
+ if (diff <= 0) return 0;
+ if (diff > capacity) return capacity;
+ return (int) diff;
+ }
+
+ public boolean isEmpty() {
+ return producerCursor == consumerCursor;
+ }
+
+ /** {@code true} if the slot was filled and published; {@code false} if the ring is full. */
+ public boolean tryWrite(final Consumer super T> filler) {
+ final long seq = claim();
+ if (seq < 0L) return false;
+ // publish in finally so a throwing filler doesn't leave the slot un-published -- the
+ // consumer would otherwise wait at that sequence forever. See class javadoc.
+ try {
+ filler.accept(slots[(int) (seq & mask)]);
+ } finally {
+ publish(seq);
+ }
+ return true;
+ }
+
+ public boolean tryWrite(final C context, final BiConsumer super C, ? super T> filler) {
+ final long seq = claim();
+ if (seq < 0L) return false;
+ try {
+ filler.accept(context, slots[(int) (seq & mask)]);
+ } finally {
+ publish(seq);
+ }
+ return true;
+ }
+
+ public boolean tryWrite(
+ final C1 context1,
+ final C2 context2,
+ final TriConsumer super C1, ? super C2, ? super T> filler) {
+ final long seq = claim();
+ if (seq < 0L) return false;
+ try {
+ filler.accept(context1, context2, slots[(int) (seq & mask)]);
+ } finally {
+ publish(seq);
+ }
+ return true;
+ }
+
+ /** Drains all currently-available slots. Returns the count processed. */
+ public int drain(final Consumer super T> handler) {
+ long cursor = consumerCursor;
+ int count = 0;
+ while (true) {
+ final long nextSeq = cursor + 1L;
+ final int idx = (int) (nextSeq & mask);
+ if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break;
+ handler.accept(slots[idx]);
+ cursor = nextSeq;
+ count++;
+ }
+ if (count > 0) consumerCursor = cursor;
+ return count;
+ }
+
+ public int drain(final C context, final BiConsumer super C, ? super T> handler) {
+ long cursor = consumerCursor;
+ int count = 0;
+ while (true) {
+ final long nextSeq = cursor + 1L;
+ final int idx = (int) (nextSeq & mask);
+ if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break;
+ handler.accept(context, slots[idx]);
+ cursor = nextSeq;
+ count++;
+ }
+ if (count > 0) consumerCursor = cursor;
+ return count;
+ }
+
+ public int drain(
+ final C1 context1,
+ final C2 context2,
+ final TriConsumer super C1, ? super C2, ? super T> handler) {
+ long cursor = consumerCursor;
+ int count = 0;
+ while (true) {
+ final long nextSeq = cursor + 1L;
+ final int idx = (int) (nextSeq & mask);
+ if (publishedSequences.get(idx * CACHE_LINE_LONGS) != nextSeq) break;
+ handler.accept(context1, context2, slots[idx]);
+ cursor = nextSeq;
+ count++;
+ }
+ if (count > 0) consumerCursor = cursor;
+ return count;
+ }
+
+ /**
+ * Try to claim a contiguous range of {@code n} sequences in a single CAS. Returns {@code null} if
+ * the ring doesn't have room for the whole batch -- the caller treats that as "drop all {@code
+ * n}", which is the natural shape for callers that batch by a higher-level unit (e.g. one CSS
+ * publish per completed trace). When the caller has a list of N items to write, this amortizes
+ * producer-cursor contention from O(N) CASes to O(1) per call.
+ *
+ * The returned {@link Batch} must be filled via {@link Batch#fillAndPublish} exactly {@code n}
+ * times. Under-publishing leaves the ring stuck at the unfilled sequence -- the consumer waits
+ * there forever. Over-publishing throws {@link IllegalStateException}.
+ *
+ * @throws IllegalArgumentException if {@code n < 1}
+ */
+ public Batch tryClaim(final int n) {
+ if (n < 1) {
+ throw new IllegalArgumentException("n must be >= 1, got " + n);
+ }
+ while (true) {
+ final long current = producerCursor;
+ // Stale read of consumerCursor is fine: a false "full" reading just causes a drop, and a
+ // real one is correctly identified because consumerCursor only advances.
+ final long consumed = consumerCursor;
+ final long next = current + n;
+ if (next - consumed > capacity) {
+ return null;
+ }
+ if (PRODUCER_CURSOR.compareAndSet(this, current, next)) {
+ // Claimed sequences [current + 1, next] inclusive (== n sequences total).
+ return new Batch(current + 1L, n);
+ }
+ // CAS failure -> another producer claimed; retry.
+ }
+ }
+
+ /** CAS-claim the next sequence, or return {@code -1} if the ring is full. */
+ private long claim() {
+ while (true) {
+ final long current = producerCursor;
+ // Stale read of consumerCursor is fine: a false "full" reading just causes a drop, and
+ // a real "full" reading is correctly identified because consumerCursor only advances.
+ final long consumed = consumerCursor;
+ if (current - consumed >= capacity) {
+ return -1L;
+ }
+ final long next = current + 1L;
+ if (PRODUCER_CURSOR.compareAndSet(this, current, next)) {
+ return next;
+ }
+ // CAS failure -> another producer claimed; retry.
+ }
+ }
+
+ /**
+ * Handle returned by {@link MpscRingBuffer#tryClaim}. Holds a contiguous range of pre-claimed
+ * sequences belonging to the producer thread that called {@code tryClaim}; the caller must fill
+ * and publish each via {@link #fillAndPublish}.
+ *
+ *
Not thread-safe -- the producer thread owns it for the lifetime of the call. Do not
+ * share across threads.
+ */
+ public final class Batch {
+ private final long startSeq;
+ private final int size;
+ private int published;
+
+ Batch(final long startSeq, final int size) {
+ this.startSeq = startSeq;
+ this.size = size;
+ }
+
+ /** Total slots in this batch (the {@code n} passed to {@code tryClaim}). */
+ public int size() {
+ return size;
+ }
+
+ /** Slots not yet filled. */
+ public int remaining() {
+ return size - published;
+ }
+
+ public void fillAndPublish(final Consumer super T> filler) {
+ final long seq = nextSeq();
+ final int idx = (int) (seq & mask);
+ try {
+ filler.accept(slots[idx]);
+ } finally {
+ publishedSequences.set(idx * CACHE_LINE_LONGS, seq);
+ }
+ }
+
+ public void fillAndPublish(final C context, final BiConsumer super C, ? super T> filler) {
+ final long seq = nextSeq();
+ final int idx = (int) (seq & mask);
+ try {
+ filler.accept(context, slots[idx]);
+ } finally {
+ publishedSequences.set(idx * CACHE_LINE_LONGS, seq);
+ }
+ }
+
+ public void fillAndPublish(
+ final C1 context1,
+ final C2 context2,
+ final TriConsumer super C1, ? super C2, ? super T> filler) {
+ final long seq = nextSeq();
+ final int idx = (int) (seq & mask);
+ try {
+ filler.accept(context1, context2, slots[idx]);
+ } finally {
+ publishedSequences.set(idx * CACHE_LINE_LONGS, seq);
+ }
+ }
+
+ private long nextSeq() {
+ if (published >= size) {
+ throw new IllegalStateException(
+ "Batch over-published: size=" + size + " published=" + published);
+ }
+ final long seq = startSeq + published;
+ published++;
+ return seq;
+ }
+ }
+
+ // ============ Low-level primitives ============
+ //
+ // These three methods (tryClaimRange / slotAt / publish) expose the producer-side machinery
+ // directly. Callers manage the claimed sequence range themselves -- no per-call handle
+ // allocated, no callback dispatched. This is intended for hot paths where the higher-level
+ // tryWrite/tryClaim+Batch APIs would allocate per call (Iterator on the iterable being
+ // batched, or the Batch object itself) that escape analysis doesn't eliminate.
+ //
+ // Misuse hazards: every claimed sequence MUST be published exactly once; sequences must be
+ // published in [start, start+n) range only; the slot returned by slotAt() must not escape past
+ // the next publish() of any sequence (the producer "owns" the slot until publish).
+
+ /**
+ * Try to claim a contiguous range of {@code n} sequences in a single CAS. Returns the
+ * start sequence on success (a non-negative long) or {@code -1L} if the ring doesn't have
+ * room for the whole batch. The caller must publish each sequence in {@code [start, start + n)}
+ * exactly once via {@link #publish(long)}.
+ *
+ * @throws IllegalArgumentException if {@code n < 1}
+ */
+ public long tryClaimRange(final int n) {
+ if (n < 1) {
+ throw new IllegalArgumentException("n must be >= 1, got " + n);
+ }
+ while (true) {
+ final long current = producerCursor;
+ final long consumed = consumerCursor;
+ final long next = current + n;
+ if (next - consumed > capacity) {
+ return -1L;
+ }
+ if (PRODUCER_CURSOR.compareAndSet(this, current, next)) {
+ return current + 1L;
+ }
+ }
+ }
+
+ /**
+ * Slot for sequence {@code seq}. Only safe when {@code seq} is in a range currently owned by the
+ * caller (claimed via {@link #tryClaimRange} and not yet published).
+ */
+ public T slotAt(final long seq) {
+ return slots[(int) (seq & mask)];
+ }
+
+ /**
+ * Publish sequence {@code seq}, making the slot at {@code slotAt(seq)} visible to the consumer.
+ * Release semantics via {@link AtomicLongArray#set}.
+ */
+ public void publish(final long seq) {
+ publishedSequences.set(((int) (seq & mask)) * CACHE_LINE_LONGS, seq);
+ }
+
+ private static int nextPowerOfTwo(final int n) {
+ if (n <= 1) return 1;
+ // 32 - leadingZeros(n-1) gives the number of bits needed to represent n-1; shifting 1 by that
+ // gives the smallest power of two >= n.
+ final int bits = 32 - Integer.numberOfLeadingZeros(n - 1);
+ return 1 << bits;
+ }
+}
diff --git a/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java
new file mode 100644
index 00000000000..6aeb635abd0
--- /dev/null
+++ b/internal-api/src/test/java/datadog/trace/util/concurrent/MpscRingBufferTest.java
@@ -0,0 +1,498 @@
+package datadog.trace.util.concurrent;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import datadog.trace.api.function.TriConsumer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import org.junit.jupiter.api.Test;
+
+class MpscRingBufferTest {
+
+ /** Mutable slot used by the tests; replaces the per-publish allocation a real consumer avoids. */
+ static final class Slot {
+ int value;
+ String tag;
+ }
+
+ // ============ Construction ============
+
+ @Test
+ void capacityRoundsUpToPowerOfTwo() {
+ assertEquals(1, new MpscRingBuffer<>(Slot::new, 1).capacity());
+ assertEquals(2, new MpscRingBuffer<>(Slot::new, 2).capacity());
+ assertEquals(4, new MpscRingBuffer<>(Slot::new, 3).capacity());
+ assertEquals(16, new MpscRingBuffer<>(Slot::new, 10).capacity());
+ assertEquals(1024, new MpscRingBuffer<>(Slot::new, 1024).capacity());
+ assertEquals(2048, new MpscRingBuffer<>(Slot::new, 1025).capacity());
+ }
+
+ @Test
+ void rejectsNonPositiveCapacityHint() {
+ assertThrows(IllegalArgumentException.class, () -> new MpscRingBuffer<>(Slot::new, 0));
+ assertThrows(IllegalArgumentException.class, () -> new MpscRingBuffer<>(Slot::new, -1));
+ }
+
+ @Test
+ void slotsArePreAllocatedFromFactory() {
+ AtomicInteger calls = new AtomicInteger();
+ MpscRingBuffer ring =
+ new MpscRingBuffer<>(
+ () -> {
+ calls.incrementAndGet();
+ return new Slot();
+ },
+ 8);
+ assertEquals(8, ring.capacity());
+ assertEquals(8, calls.get(), "factory must run capacity times during construction");
+ }
+
+ // ============ Basic produce / consume ============
+
+ @Test
+ void emptyRingIsEmpty() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ assertTrue(ring.isEmpty());
+ assertEquals(0, ring.size());
+ assertEquals(0, ring.drain(s -> {}));
+ }
+
+ @Test
+ void singleWriteThenDrain() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ assertTrue(
+ ring.tryWrite(
+ s -> {
+ s.value = 42;
+ s.tag = "hello";
+ }));
+ assertEquals(1, ring.size());
+
+ List seen = new ArrayList<>();
+ int drained = ring.drain(s -> seen.add(s.value));
+ assertEquals(1, drained);
+ assertEquals(Arrays.asList(42), seen);
+ assertTrue(ring.isEmpty());
+ }
+
+ @Test
+ void writesPreserveFIFOOrder() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ for (int i = 0; i < 6; i++) {
+ final int v = i;
+ assertTrue(ring.tryWrite(s -> s.value = v));
+ }
+ List seen = new ArrayList<>();
+ ring.drain(s -> seen.add(s.value));
+ assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), seen);
+ }
+
+ // ============ Full / drop behavior ============
+
+ @Test
+ void writesBeyondCapacityFail() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ for (int i = 0; i < 4; i++) {
+ assertTrue(ring.tryWrite(s -> s.value = 0));
+ }
+ assertFalse(ring.tryWrite(s -> s.value = 0), "5th write must fail when capacity == 4");
+ assertEquals(4, ring.size());
+ }
+
+ @Test
+ void writesResumeAfterDrain() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ for (int i = 0; i < 4; i++) {
+ final int v = i;
+ assertTrue(ring.tryWrite(s -> s.value = v));
+ }
+ assertFalse(ring.tryWrite(s -> {}));
+
+ AtomicInteger sum = new AtomicInteger();
+ ring.drain(s -> sum.addAndGet(s.value));
+ assertEquals(0 + 1 + 2 + 3, sum.get());
+
+ // Now should accept another full round.
+ for (int i = 100; i < 104; i++) {
+ final int v = i;
+ assertTrue(ring.tryWrite(s -> s.value = v));
+ }
+ assertFalse(ring.tryWrite(s -> {}));
+ }
+
+ // ============ Context-passing variants (context first, slot last) ============
+
+ @Test
+ void writeAndDrainWithOneContext() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ BiConsumer filler = (ctx, s) -> s.value = ctx;
+ assertTrue(ring.tryWrite(7, filler));
+ assertTrue(ring.tryWrite(8, filler));
+
+ List seen = new ArrayList<>();
+ BiConsumer, Slot> reader = (sink, s) -> sink.add(s.value);
+ assertEquals(2, ring.drain(seen, reader));
+ assertEquals(Arrays.asList(7, 8), seen);
+ }
+
+ @Test
+ void writeAndDrainWithTwoContexts() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ TriConsumer filler =
+ (v, t, s) -> {
+ s.value = v;
+ s.tag = t;
+ };
+ assertTrue(ring.tryWrite(1, "a", filler));
+ assertTrue(ring.tryWrite(2, "b", filler));
+
+ List tags = new ArrayList<>();
+ List vals = new ArrayList<>();
+ TriConsumer, List, Slot> reader =
+ (ts, vs, s) -> {
+ ts.add(s.tag);
+ vs.add(s.value);
+ };
+ assertEquals(2, ring.drain(tags, vals, reader));
+ assertEquals(Arrays.asList("a", "b"), tags);
+ assertEquals(Arrays.asList(1, 2), vals);
+ }
+
+ // ============ Concurrency ============
+
+ @Test
+ void manyProducersSingleConsumerSeesEveryWrittenValue() throws InterruptedException {
+ final int producers = 8;
+ final int perProducer = 50_000;
+ final int total = producers * perProducer;
+
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 1024);
+
+ ExecutorService producerPool = Executors.newFixedThreadPool(producers);
+ AtomicInteger dropped = new AtomicInteger();
+ AtomicInteger written = new AtomicInteger();
+ CountDownLatch start = new CountDownLatch(1);
+
+ BiConsumer filler = (v, s) -> s.value = v;
+
+ for (int p = 0; p < producers; p++) {
+ final int base = p * perProducer;
+ producerPool.submit(
+ () -> {
+ try {
+ start.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ for (int i = 0; i < perProducer; i++) {
+ int v = base + i;
+ // Spin until the consumer makes room. We're testing correctness, not throughput.
+ while (!ring.tryWrite(v, filler)) {
+ dropped.incrementAndGet();
+ Thread.yield();
+ }
+ written.incrementAndGet();
+ }
+ });
+ }
+
+ Set seen = new HashSet<>(total);
+ BiConsumer, Slot> reader = (sink, s) -> sink.add(s.value);
+
+ Thread consumer =
+ new Thread(
+ () -> {
+ while (seen.size() < total) {
+ if (ring.drain(seen, reader) == 0) Thread.yield();
+ }
+ },
+ "ring-consumer");
+ consumer.start();
+
+ start.countDown();
+ producerPool.shutdown();
+ assertTrue(producerPool.awaitTermination(30, TimeUnit.SECONDS), "producers timed out");
+ assertTrue(consumer.isAlive() || seen.size() == total);
+ consumer.join(30_000);
+ assertFalse(consumer.isAlive(), "consumer timed out");
+
+ assertEquals(total, written.get(), "every producer call should eventually succeed");
+ assertEquals(total, seen.size(), "consumer must see every value exactly once");
+ }
+
+ @Test
+ void sizeReflectsOutstandingWrites() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ assertEquals(0, ring.size());
+ ring.tryWrite(s -> {});
+ assertEquals(1, ring.size());
+ ring.tryWrite(s -> {});
+ assertEquals(2, ring.size());
+ ring.drain(s -> {});
+ assertEquals(0, ring.size());
+ }
+
+ @Test
+ void throwingFillerStillPublishesSoConsumerDoesntHang() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+
+ // First write succeeds.
+ assertTrue(ring.tryWrite(s -> s.value = 1));
+
+ // Second write throws midway through filling. Slot must still be published so the consumer's
+ // drain can advance past it.
+ RuntimeException boom = new RuntimeException("boom");
+ RuntimeException thrown =
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ ring.tryWrite(
+ s -> {
+ s.value = 2;
+ throw boom;
+ }));
+ assertSame(boom, thrown);
+
+ // Third write proves the ring's cursors are still healthy after the throw.
+ assertTrue(ring.tryWrite(s -> s.value = 3));
+
+ // Consumer drains all three: it must not hang on the partially-filled slot.
+ List seen = new ArrayList<>();
+ int drained = ring.drain(s -> seen.add(s.value));
+ assertEquals(3, drained, "consumer must advance past the throwing slot");
+ assertEquals(Arrays.asList(1, 2, 3), seen, "throwing slot keeps whatever filler had written");
+ }
+
+ // ============ Batch claim (tryClaim) ============
+
+ @Test
+ void tryClaimReturnsBatchOfRequestedSize() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ MpscRingBuffer.Batch batch = ring.tryClaim(3);
+
+ assertNotNull(batch);
+ assertEquals(3, batch.size());
+ assertEquals(3, batch.remaining());
+ }
+
+ @Test
+ void tryClaimRejectsZeroOrNegative() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ assertThrows(IllegalArgumentException.class, () -> ring.tryClaim(0));
+ assertThrows(IllegalArgumentException.class, () -> ring.tryClaim(-1));
+ }
+
+ @Test
+ void tryClaimReturnsNullWhenRingCantFitBatch() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ assertNotNull(ring.tryClaim(3));
+ // Only 1 slot left; claiming 2 must fail wholesale.
+ assertNull(ring.tryClaim(2), "all-or-nothing: partial batches are not allowed");
+ // But one more slot does fit.
+ assertNotNull(ring.tryClaim(1));
+ // Now full.
+ assertNull(ring.tryClaim(1));
+ }
+
+ @Test
+ void tryClaimFillAndPublishDeliversAllToDrain() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ MpscRingBuffer.Batch batch = ring.tryClaim(5);
+
+ for (int i = 0; i < 5; i++) {
+ final int v = i;
+ batch.fillAndPublish(s -> s.value = v);
+ }
+ assertEquals(0, batch.remaining());
+
+ List seen = new ArrayList<>();
+ int drained = ring.drain(s -> seen.add(s.value));
+ assertEquals(5, drained);
+ assertEquals(Arrays.asList(0, 1, 2, 3, 4), seen, "batch publishes in order");
+ }
+
+ @Test
+ void overPublishingBatchThrows() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ MpscRingBuffer.Batch batch = ring.tryClaim(2);
+
+ batch.fillAndPublish(s -> s.value = 1);
+ batch.fillAndPublish(s -> s.value = 2);
+ assertThrows(IllegalStateException.class, () -> batch.fillAndPublish(s -> s.value = 3));
+ }
+
+ @Test
+ void batchSupportsContextFillers() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ MpscRingBuffer.Batch batch = ring.tryClaim(3);
+
+ BiConsumer oneCtx = (v, s) -> s.value = v;
+ TriConsumer twoCtx =
+ (v, t, s) -> {
+ s.value = v;
+ s.tag = t;
+ };
+
+ batch.fillAndPublish(s -> s.value = 1);
+ batch.fillAndPublish(2, oneCtx);
+ batch.fillAndPublish(3, "three", twoCtx);
+
+ List seen = new ArrayList<>();
+ ring.drain(s -> seen.add(s.value + "/" + s.tag));
+ assertEquals(Arrays.asList("1/null", "2/null", "3/three"), seen);
+ }
+
+ @Test
+ void batchFillerThrowStillPublishesAndAdvances() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ MpscRingBuffer.Batch batch = ring.tryClaim(3);
+
+ batch.fillAndPublish(s -> s.value = 1);
+ RuntimeException boom = new RuntimeException("boom");
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ batch.fillAndPublish(
+ s -> {
+ s.value = 2;
+ throw boom;
+ }));
+ // The throwing slot's sequence has already been consumed; published counter advanced.
+ assertEquals(1, batch.remaining(), "throwing slot still counts as published");
+ batch.fillAndPublish(s -> s.value = 3);
+
+ List seen = new ArrayList<>();
+ int drained = ring.drain(s -> seen.add(s.value));
+ assertEquals(3, drained);
+ assertEquals(Arrays.asList(1, 2, 3), seen);
+ }
+
+ // ============ Low-level primitives (tryClaimRange / slotAt / publish) ============
+
+ @Test
+ void tryClaimRangeReturnsStartSequence() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ long start1 = ring.tryClaimRange(3);
+ long start2 = ring.tryClaimRange(2);
+
+ assertEquals(0L, start1, "first range starts at sequence 0");
+ assertEquals(3L, start2, "second range begins immediately after the first");
+ }
+
+ @Test
+ void tryClaimRangeRejectsZeroOrNegative() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ assertThrows(IllegalArgumentException.class, () -> ring.tryClaimRange(0));
+ assertThrows(IllegalArgumentException.class, () -> ring.tryClaimRange(-1));
+ }
+
+ @Test
+ void tryClaimRangeReturnsMinusOneWhenFull() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ assertTrue(ring.tryClaimRange(3) >= 0);
+ assertEquals(-1L, ring.tryClaimRange(2), "all-or-nothing");
+ assertTrue(ring.tryClaimRange(1) >= 0);
+ assertEquals(-1L, ring.tryClaimRange(1));
+ }
+
+ @Test
+ void slotAtAndPublishRoundTrip() {
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 8);
+ long start = ring.tryClaimRange(3);
+ assertTrue(start >= 0);
+
+ for (int i = 0; i < 3; i++) {
+ long seq = start + i;
+ Slot slot = ring.slotAt(seq);
+ slot.value = (int) (seq + 100);
+ ring.publish(seq);
+ }
+
+ List seen = new ArrayList<>();
+ int drained = ring.drain(s -> seen.add(s.value));
+ assertEquals(3, drained);
+ assertEquals(Arrays.asList(100, 101, 102), seen);
+ }
+
+ @Test
+ void slotAtReturnsSameInstanceForSameModuloPosition() {
+ // After publish+drain wraps around, the slot at sequence N and sequence N+capacity are the
+ // same physical object (this is the whole point of the ring).
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 4);
+ Slot firstSlot = ring.slotAt(0L);
+ Slot wrappedSlot = ring.slotAt(4L); // 4 & mask(3) == 0
+ assertSame(firstSlot, wrappedSlot);
+ }
+
+ @Test
+ void concurrentBatchClaimsAreOrderedAndDontInterleave() throws InterruptedException {
+ final int producers = 8;
+ final int batchesPerProducer = 200;
+ final int batchSize = 16;
+ final int total = producers * batchesPerProducer * batchSize;
+
+ MpscRingBuffer ring = new MpscRingBuffer<>(Slot::new, 256);
+ ExecutorService pool = Executors.newFixedThreadPool(producers);
+ AtomicInteger writes = new AtomicInteger();
+ CountDownLatch start = new CountDownLatch(1);
+
+ for (int p = 0; p < producers; p++) {
+ final int base = p * batchesPerProducer * batchSize;
+ pool.submit(
+ () -> {
+ try {
+ start.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ for (int b = 0; b < batchesPerProducer; b++) {
+ MpscRingBuffer.Batch batch;
+ while ((batch = ring.tryClaim(batchSize)) == null) {
+ Thread.yield();
+ }
+ for (int i = 0; i < batchSize; i++) {
+ final int v = base + b * batchSize + i;
+ batch.fillAndPublish(s -> s.value = v);
+ }
+ writes.addAndGet(batchSize);
+ }
+ });
+ }
+
+ Set seen = new HashSet<>(total);
+ Thread consumer =
+ new Thread(
+ () -> {
+ while (seen.size() < total) {
+ if (ring.drain((Slot s) -> seen.add(s.value)) == 0) Thread.yield();
+ }
+ },
+ "ring-batch-consumer");
+ consumer.start();
+
+ start.countDown();
+ pool.shutdown();
+ assertTrue(pool.awaitTermination(30, TimeUnit.SECONDS), "producers timed out");
+ consumer.join(30_000);
+ assertFalse(consumer.isAlive(), "consumer timed out");
+ assertEquals(total, writes.get());
+ assertEquals(total, seen.size(), "consumer must see every value exactly once");
+ }
+}