diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java index 4969eff41f6..d0a45c7562c 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.core.interceptor.Context.BeforeTransmission; import software.amazon.awssdk.core.interceptor.Context.FailedExecution; import software.amazon.awssdk.core.interceptor.Context.ModifyHttpRequest; +import software.amazon.awssdk.core.interceptor.Context.ModifyResponse; import software.amazon.awssdk.core.interceptor.ExecutionAttribute; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; @@ -111,6 +112,21 @@ public void beforeTransmission( } } + @Override + public SdkResponse modifyResponse( + final ModifyResponse context, final ExecutionAttributes executionAttributes) { + final SdkResponse response = context.response(); + if (!AWS_LEGACY_TRACING && isPollingRequest(context.request()) && isPollingResponse(response)) { + // Attach queueUrl to the unmarshalled response before the SDK rebuilds it with + // toBuilder().sdkHttpResponse(...).build(). + context + .request() + .getValueForField("QueueUrl", String.class) + .ifPresent(queueUrl -> responseQueueStore.put(response, queueUrl)); + } + return response; + } + @Override public void afterExecution( final AfterExecution context, final ExecutionAttributes executionAttributes) { @@ -124,13 +140,6 @@ public void afterExecution( DECORATE.beforeFinish(span); span.finish(); } - if (!AWS_LEGACY_TRACING && isPollingResponse(context.response())) { - // store queueUrl inside response for SqsReceiveResultInstrumentation - context - .request() - .getValueForField("QueueUrl", String.class) - .ifPresent(queueUrl -> responseQueueStore.put(context.response(), queueUrl)); - } } @Override diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java new file mode 100644 index 00000000000..1bc61c8f7df --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java @@ -0,0 +1,34 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.asm.Advice; + +public final class SqsMd5ChecksumInterceptorInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("afterExecution")), getClass().getName() + "$AfterExecutionAdvice"); + } + + public static class AfterExecutionAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter() { + SqsReceiveResponseInternalAccess.enter(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit() { + SqsReceiveResponseInternalAccess.exit(); + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java index 5db7932aeef..4d2fe577bd1 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java @@ -5,7 +5,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.InstrumenterConfig; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -24,6 +23,7 @@ public String[] helperClassNames() { "datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector", "datadog.trace.instrumentation.aws.v2.sqs.MessageExtractAdapter", "datadog.trace.instrumentation.aws.v2.sqs.SqsDecorator", + "datadog.trace.instrumentation.aws.v2.sqs.SqsReceiveResponseInternalAccess", "datadog.trace.instrumentation.aws.v2.sqs.TracingIterator", "datadog.trace.instrumentation.aws.v2.sqs.TracingList", "datadog.trace.instrumentation.aws.v2.sqs.TracingListIterator" @@ -32,17 +32,25 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return Collections.singletonMap( + Map contextStore = new java.util.HashMap<>(); + contextStore.put( "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String"); + contextStore.put( + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl", + "java.lang.String"); + return contextStore; } @Override public List typeInstrumentations() { - final List ret = new ArrayList<>(4); + final List ret = new ArrayList<>(6); ret.add(new SqsClientInstrumentation()); ret.add(new SqsReceiveRequestInstrumentation()); // we don't need to instrument messages when we're doing legacy AWS-SDK tracing if (!InstrumenterConfig.get().isLegacyInstrumentationEnabled(false, "aws-sdk")) { + ret.add(new SqsMd5ChecksumInterceptorInstrumentation()); + ret.add(new SqsReceiveResponseBuilderInstrumentation()); + ret.add(new SqsReceiveResponseBuilderImplInstrumentation()); ret.add(new SqsReceiveResultInstrumentation()); } return ret; diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java new file mode 100644 index 00000000000..4e4465309ce --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +public final class SqsReceiveResponseBuilderImplInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + private static final String BUILDER_IMPL = + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl"; + + @Override + public String instrumentedType() { + return BUILDER_IMPL; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("build")) + .and(takesNoArguments()) + .and( + returns(named("software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse"))), + getClass().getName() + "$BuildAdvice"); + } + + public static class BuildAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Object builder, @Advice.Return ReceiveMessageResponse response) { + if (response == null) { + return; + } + ContextStore builderStore = + InstrumentationContext.get(BUILDER_IMPL, "java.lang.String"); + String queueUrl = builderStore.get(builder); + if (queueUrl != null) { + InstrumentationContext.get(ReceiveMessageResponse.class, String.class) + .put(response, queueUrl); + } + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java new file mode 100644 index 00000000000..e7ca06f537a --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +public final class SqsReceiveResponseBuilderInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + private static final String BUILDER_IMPL = + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl"; + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("toBuilder")) + .and(takesNoArguments()) + .and( + returns( + named( + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$Builder"))), + getClass().getName() + "$ToBuilderAdvice"); + } + + public static class ToBuilderAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This ReceiveMessageResponse response, @Advice.Return Object builder) { + if (builder == null) { + return; + } + String queueUrl = + InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(response); + if (queueUrl != null) { + InstrumentationContext.get(BUILDER_IMPL, "java.lang.String").put(builder, queueUrl); + } + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java new file mode 100644 index 00000000000..aa73af23641 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java @@ -0,0 +1,20 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import datadog.trace.bootstrap.CallDepthThreadLocalMap; + +public final class SqsReceiveResponseInternalAccess { + + private SqsReceiveResponseInternalAccess() {} + + public static void enter() { + CallDepthThreadLocalMap.incrementCallDepth(SqsReceiveResponseInternalAccess.class); + } + + public static void exit() { + CallDepthThreadLocalMap.decrementCallDepth(SqsReceiveResponseInternalAccess.class); + } + + public static boolean active() { + return CallDepthThreadLocalMap.getCallDepth(SqsReceiveResponseInternalAccess.class) > 0; + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java index 148fd04479a..2a340623327 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java @@ -29,6 +29,9 @@ public static class GetMessagesAdvice { public static void onExit( @Advice.This ReceiveMessageResponse result, @Advice.Return(readOnly = false) List messages) { + if (SqsReceiveResponseInternalAccess.active()) { + return; + } if (messages != null && !messages.isEmpty() && !(messages instanceof TracingList)) { String queueUrl = InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 0b8bbdc9207..6e6385a1186 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -100,12 +100,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase { }) def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach {/* consume to create message spans */ } + if (isDataStreamsEnabled()) { TEST_DATA_STREAMS_WRITER.waitForGroups(2) } - messages.forEach {/* consume to create message spans */ } - then: def sendSpan assertTraces(2) { @@ -231,12 +231,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase { }) def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach {/* consume to create message spans */ } + if (isDataStreamsEnabled()) { TEST_DATA_STREAMS_WRITER.waitForGroups(2) } - messages.forEach {/* consume to create message spans */ } - then: def sendSpan assertTraces(2) { @@ -312,6 +312,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.close() } + def "sync receive activates consumer span for user handler iteration"() { + setup: + def client = SqsClient.builder() + .region(Region.EU_CENTRAL_1) + .endpointOverride(endpoint) + .credentialsProvider(credentialsProvider) + .build() + def queueUrl = client.createQueue(CreateQueueRequest.builder().queueName('somequeue').build()).queueUrl() + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody('sometext').build()) + }) + // The sync AWS SDK receive pipeline normally rebuilds the immutable response before user code + // sees it. If receive context stays only on the original response, the copied messages lose it + // and the handler span disconnects from the consumer span. + def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach { + TraceUtils.runUnderTrace('handler') { + null + } + } + + then: + messages.size() == 1 + + def sendSpan + assertTraces(2) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("Sqs", "SendMessage") + operationName expectedOperation("Sqs", "SendMessage") + resourceName "Sqs.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "SendMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage")) + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + trace(2) { + span { + serviceName expectedService("Sqs", "ReceiveMessage") + operationName expectedOperation("Sqs", "ReceiveMessage") + resourceName "Sqs.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "ReceiveMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + basicSpan(it, "handler", span(0)) + } + } + + cleanup: + client.close() + } + @IgnoreIf({instance.isDataStreamsEnabled()}) def "trace details propagated via embedded SQS message attribute (string)"() { setup: @@ -679,5 +773,3 @@ class SqsClientV0ContextSwapForkedTest extends SqsClientV0Test { injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } - -