observer, @Nonnull final Context parentContext) {
+ this.observer = observer;
+ this.parentContext = parentContext;
+ }
+
+ @Override
+ public void onSubscribe(final Disposable d) {
+ observer.onSubscribe(d);
+ }
+
+ @Override
+ public void onSuccess(final T value) {
+ try (final ContextScope scope = parentContext.attach()) {
+ observer.onSuccess(value);
+ }
+ }
+
+ @Override
+ public void onError(final Throwable e) {
+ try (final ContextScope scope = parentContext.attach()) {
+ observer.onError(e);
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java
new file mode 100644
index 00000000000..95698c80853
--- /dev/null
+++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java
@@ -0,0 +1,52 @@
+package datadog.trace.instrumentation.rxjava3;
+
+import datadog.context.Context;
+import datadog.context.ContextScope;
+import io.reactivex.rxjava3.core.FlowableSubscriber;
+import javax.annotation.Nonnull;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * Wrapper that makes sure spans from subscriber events treat the captured span as their parent.
+ *
+ * Implements both {@link Subscriber} and {@link FlowableSubscriber} so it can be assigned back
+ * to argument slots of either {@code Flowable#subscribe(Subscriber)} or {@code
+ * Flowable#subscribe(FlowableSubscriber)} from the same Advice class.
+ */
+public final class TracingSubscriber implements FlowableSubscriber, Subscriber {
+ private final Subscriber subscriber;
+ private final Context parentContext;
+
+ public TracingSubscriber(
+ @Nonnull final Subscriber subscriber, @Nonnull final Context parentContext) {
+ this.subscriber = subscriber;
+ this.parentContext = parentContext;
+ }
+
+ @Override
+ public void onSubscribe(final Subscription subscription) {
+ subscriber.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(final T value) {
+ try (final ContextScope scope = parentContext.attach()) {
+ subscriber.onNext(value);
+ }
+ }
+
+ @Override
+ public void onError(final Throwable e) {
+ try (final ContextScope scope = parentContext.attach()) {
+ subscriber.onError(e);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ try (final ContextScope scope = parentContext.attach()) {
+ subscriber.onComplete();
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3ResultExtensionTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3ResultExtensionTest.java
new file mode 100644
index 00000000000..93af25d4866
--- /dev/null
+++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3ResultExtensionTest.java
@@ -0,0 +1,191 @@
+import static datadog.trace.agent.test.assertions.Matchers.matches;
+import static datadog.trace.agent.test.assertions.SpanMatcher.span;
+import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags;
+import static datadog.trace.agent.test.assertions.TagsMatcher.error;
+import static datadog.trace.agent.test.assertions.TagsMatcher.tag;
+import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.COMPONENT;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import annotatedsample.RxJava3TracedMethods;
+import datadog.trace.agent.test.AbstractInstrumentationTest;
+import datadog.trace.junit.utils.config.WithConfig;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.disposables.Disposable;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Verifies that {@code @WithSpan}-annotated methods returning RxJava 3 reactive types produce a
+ * span whose duration spans until the reactive value completes, errors, or is cancelled.
+ */
+@WithConfig(key = "trace.otel.enabled", value = "true")
+@WithConfig(key = "integration.opentelemetry-annotations-1.20.enabled", value = "true")
+class RxJava3ResultExtensionTest extends AbstractInstrumentationTest {
+
+ static Stream reactiveTypeArguments() {
+ return Stream.of(
+ arguments("Completable", "blockingAwait"),
+ arguments("Maybe", "blockingGet"),
+ arguments("Single", "blockingGet"),
+ arguments("Observable", "blockingLast"),
+ arguments("Flowable", "blockingLast"));
+ }
+
+ @ParameterizedTest(name = "WithSpan annotated async method ''{0}''")
+ @MethodSource("reactiveTypeArguments")
+ void withSpanAnnotatedAsyncMethod(String type, String operation) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ String method = "traceAsync" + type;
+ Object asyncType = invokeFactory(method, latch, null);
+
+ assertEquals(0, writer.size());
+
+ latch.countDown();
+ consume(asyncType, operation);
+
+ assertTraces(
+ trace(
+ span()
+ .root()
+ .operationName(Pattern.compile(Pattern.quote("RxJava3TracedMethods." + method)))
+ .resourceName(cs -> ("RxJava3TracedMethods." + method).contentEquals(cs))
+ .tags(
+ defaultTags(),
+ tag(COMPONENT, matches("opentelemetry")),
+ tag(SPAN_KIND, matches("internal")))));
+ }
+
+ @ParameterizedTest(name = "WithSpan annotated async method failing ''{0}''")
+ @MethodSource("reactiveTypeArguments")
+ void withSpanAnnotatedAsyncMethodFailing(String type, String operation)
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ IllegalStateException expected = new IllegalStateException("Test exception");
+ String method = "traceAsyncFailing" + type;
+ Object asyncType = invokeFactory(method, latch, expected);
+
+ assertEquals(0, writer.size());
+
+ latch.countDown();
+ assertThrows(IllegalStateException.class, () -> consume(asyncType, operation));
+
+ assertTraces(
+ trace(
+ span()
+ .root()
+ .operationName(Pattern.compile(Pattern.quote("RxJava3TracedMethods." + method)))
+ .resourceName(cs -> ("RxJava3TracedMethods." + method).contentEquals(cs))
+ .error()
+ .tags(
+ defaultTags(),
+ tag(COMPONENT, matches("opentelemetry")),
+ tag(SPAN_KIND, matches("internal")),
+ error(IllegalStateException.class, "Test exception"))));
+ }
+
+ @ParameterizedTest(name = "WithSpan annotated async method cancelled ''{0}''")
+ @MethodSource("reactiveTypeArguments")
+ void withSpanAnnotatedAsyncMethodCancelled(String type, String operation)
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ String method = "traceAsync" + type;
+ Object asyncType = invokeFactory(method, latch, null);
+
+ assertEquals(0, writer.size());
+
+ latch.countDown();
+ Disposable disposable = subscribe(asyncType);
+ disposable.dispose();
+
+ assertTraces(
+ trace(
+ span()
+ .root()
+ .operationName(Pattern.compile(Pattern.quote("RxJava3TracedMethods." + method)))
+ .resourceName(cs -> ("RxJava3TracedMethods." + method).contentEquals(cs))
+ .tags(
+ defaultTags(),
+ tag(COMPONENT, matches("opentelemetry")),
+ tag(SPAN_KIND, matches("internal")))));
+ }
+
+ private static Object invokeFactory(String method, CountDownLatch latch, Exception ex) {
+ switch (method) {
+ case "traceAsyncCompletable":
+ return RxJava3TracedMethods.traceAsyncCompletable(latch);
+ case "traceAsyncFailingCompletable":
+ return RxJava3TracedMethods.traceAsyncFailingCompletable(latch, ex);
+ case "traceAsyncMaybe":
+ return RxJava3TracedMethods.traceAsyncMaybe(latch);
+ case "traceAsyncFailingMaybe":
+ return RxJava3TracedMethods.traceAsyncFailingMaybe(latch, ex);
+ case "traceAsyncSingle":
+ return RxJava3TracedMethods.traceAsyncSingle(latch);
+ case "traceAsyncFailingSingle":
+ return RxJava3TracedMethods.traceAsyncFailingSingle(latch, ex);
+ case "traceAsyncObservable":
+ return RxJava3TracedMethods.traceAsyncObservable(latch);
+ case "traceAsyncFailingObservable":
+ return RxJava3TracedMethods.traceAsyncFailingObservable(latch, ex);
+ case "traceAsyncFlowable":
+ return RxJava3TracedMethods.traceAsyncFlowable(latch);
+ case "traceAsyncFailingFlowable":
+ return RxJava3TracedMethods.traceAsyncFailingFlowable(latch, ex);
+ default:
+ throw new IllegalArgumentException("Unknown method: " + method);
+ }
+ }
+
+ private static Object consume(Object reactive, String operation) {
+ if (reactive instanceof Completable) {
+ ((Completable) reactive).blockingAwait();
+ return null;
+ }
+ if (reactive instanceof Maybe) {
+ return ((Maybe>) reactive).blockingGet();
+ }
+ if (reactive instanceof Single) {
+ return ((Single>) reactive).blockingGet();
+ }
+ if (reactive instanceof Observable) {
+ return ((Observable>) reactive).blockingLast();
+ }
+ if (reactive instanceof Flowable) {
+ return ((Flowable>) reactive).blockingLast();
+ }
+ throw new IllegalArgumentException(
+ "Unsupported reactive type: " + reactive.getClass().getName());
+ }
+
+ private static Disposable subscribe(Object reactive) {
+ if (reactive instanceof Completable) {
+ return ((Completable) reactive).subscribe(() -> {}, t -> {});
+ }
+ if (reactive instanceof Maybe) {
+ return ((Maybe>) reactive).subscribe(v -> {}, t -> {});
+ }
+ if (reactive instanceof Single) {
+ return ((Single>) reactive).subscribe(v -> {}, t -> {});
+ }
+ if (reactive instanceof Observable) {
+ return ((Observable>) reactive).subscribe(v -> {}, t -> {});
+ }
+ if (reactive instanceof Flowable) {
+ return ((Flowable>) reactive).subscribe(v -> {}, t -> {});
+ }
+ throw new IllegalArgumentException(
+ "Unsupported reactive type: " + reactive.getClass().getName());
+ }
+}
diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3Test.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3Test.java
new file mode 100644
index 00000000000..0f637089a85
--- /dev/null
+++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/RxJava3Test.java
@@ -0,0 +1,687 @@
+import static datadog.trace.agent.test.assertions.Matchers.matches;
+import static datadog.trace.agent.test.assertions.SpanMatcher.span;
+import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags;
+import static datadog.trace.agent.test.assertions.TagsMatcher.error;
+import static datadog.trace.agent.test.assertions.TagsMatcher.tag;
+import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME;
+import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
+import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
+import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.COMPONENT;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import datadog.trace.agent.test.AbstractInstrumentationTest;
+import datadog.trace.agent.test.assertions.SpanMatcher;
+import datadog.trace.api.Trace;
+import datadog.trace.bootstrap.instrumentation.api.AgentScope;
+import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * Verifies that the five RxJava 3 reactive types (Observable, Flowable, Single, Maybe, Completable)
+ * propagate the context captured at subscription time so that downstream operators and callbacks
+ * become children of the assembling span.
+ */
+class RxJava3Test extends AbstractInstrumentationTest {
+
+ static {
+ // Delayed operators (Maybe.delay) run on a scheduler thread; spans may outlive the
+ // subscribing scope, causing the pending-trace reference count to go negative when
+ // strictTraceWrites is on. Mirror RxJava2Test's useStrictTraceWrites() = false.
+ testConfig.strictTraceWrites(false);
+ }
+
+ private static final String EXCEPTION_MESSAGE = "test exception";
+
+ private static final Function ADD_ONE = RxJava3Test::addOneFunc;
+
+ private static final Function ADD_TWO = RxJava3Test::addTwoFunc;
+
+ private static final Function THROW_EXCEPTION =
+ i -> {
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ };
+
+ static Stream publisherArguments() {
+ return Stream.of(
+ arguments("basic maybe", 2, 1, (Callable