Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public String failureReason() {

@Override
public boolean test(T t) {
if (this.expected instanceof CharSequence && t instanceof CharSequence) {
return this.expected.toString().equals(t.toString());
}
return this.expected.equals(t);
}
}
27 changes: 27 additions & 0 deletions dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
muzzle {
pass {
group = "io.reactivex.rxjava3"
module = "rxjava"
versions = "[3.0.0,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

dependencies {
compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3'
compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0'

testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20')

testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.1.10'
testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0'

testImplementation libs.junit.jupiter

latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import net.bytebuddy.asm.Advice;

public final class CompletableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Completable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Completable completable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Completable.class, Context.class)
.put(completable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Completable completable,
@Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Completable.class, Context.class).get(completable);
if (parentContext != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingCompletableObserver(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Flowable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import org.reactivestreams.Subscriber;

public final class FlowableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Flowable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
// Hook both the org.reactivestreams.Subscriber overload and the RxJava-3-specific
// FlowableSubscriber overload. DYNAMIC typing allows the same Advice class to write back a
// TracingSubscriber (which implements both interfaces) into either argument slot.
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("org.reactivestreams.Subscriber"))),
getClass().getName() + "$PropagateParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.FlowableSubscriber"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Flowable<?> flowable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Flowable<?> flowable,
@Advice.Argument(value = 0, readOnly = false, typing = Assigner.Typing.DYNAMIC)
Subscriber<?> subscriber) {
if (subscriber != null) {
Context parentContext =
InstrumentationContext.get(Flowable.class, Context.class).get(flowable);
if (parentContext != null) {
// wrap the subscriber so spans from its events treat the captured span as their parent
subscriber = new TracingSubscriber<>(subscriber, parentContext);
// attach the context here in case additional subscribers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeObserver;
import net.bytebuddy.asm.Advice;

public final class MaybeInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Maybe";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Maybe<?> maybe) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Maybe<?> maybe,
@Advice.Argument(value = 0, readOnly = false) MaybeObserver<?> observer) {
if (observer != null) {
Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe);
if (parentContext != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingMaybeObserver<>(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import net.bytebuddy.asm.Advice;

public final class ObservableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Observable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Observable<?> observable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Observable<?> observable,
@Advice.Argument(value = 0, readOnly = false) Observer<?> observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Observable.class, Context.class).get(observable);
if (parentContext != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingObserver<>(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Loading