From 0cf1997bb2182e3cbe21550cd1539ee3f2a3641b Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 29 May 2026 13:41:02 -0700 Subject: [PATCH 1/2] Add regression test for sync SQS receive response copy context loss --- .../src/test/groovy/SqsClientTest.groovy | 96 ++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 0b8bbdc9207..19ee1fb5638 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -312,6 +312,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.close() } + def "sync receive activates consumer span for user handler iteration"() { + setup: + def client = SqsClient.builder() + .region(Region.EU_CENTRAL_1) + .endpointOverride(endpoint) + .credentialsProvider(credentialsProvider) + .build() + def queueUrl = client.createQueue(CreateQueueRequest.builder().queueName('somequeue').build()).queueUrl() + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody('sometext').build()) + }) + // The sync AWS SDK receive pipeline normally rebuilds the immutable response before user code + // sees it. If receive context stays only on the original response, the copied messages lose it + // and the handler span disconnects from the consumer span. + def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach { + TraceUtils.runUnderTrace('handler') { + null + } + } + + then: + messages.size() == 1 + + def sendSpan + assertTraces(2) { + trace(2) { + basicSpan(it, 'parent') + span { + serviceName expectedService("Sqs", "SendMessage") + operationName expectedOperation("Sqs", "SendMessage") + resourceName "Sqs.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "SendMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage")) + serviceNameSource("java-aws-sdk") + defaultTags() + } + } + sendSpan = span(1) + } + trace(2) { + span { + serviceName expectedService("Sqs", "ReceiveMessage") + operationName expectedOperation("Sqs", "ReceiveMessage") + resourceName "Sqs.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(sendSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "ReceiveMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(true) + } + } + basicSpan(it, "handler", span(0)) + } + } + + cleanup: + client.close() + } + @IgnoreIf({instance.isDataStreamsEnabled()}) def "trace details propagated via embedded SQS message attribute (string)"() { setup: @@ -679,5 +773,3 @@ class SqsClientV0ContextSwapForkedTest extends SqsClientV0Test { injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } - - From 2fffa01794ad6beb28602a53bd568a4f796cfaa6 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Fri, 29 May 2026 18:15:19 -0700 Subject: [PATCH 2/2] Preserve SQS receive context across AWS SDK response rebuilding Store the receive queue URL before the SDK rebuilds ReceiveMessageResponse and propagate it through the response builder so messages() can still wrap the final list in TracingList. Also avoid wrapping messages during the SDK's internal MessageMD5ChecksumInterceptor pass to prevent creating consumer spans before user code actually consumes the messages. --- .../aws/v2/TracingExecutionInterceptor.java | 23 +++++--- ...Md5ChecksumInterceptorInstrumentation.java | 34 ++++++++++++ .../instrumentation/aws/v2/sqs/SqsModule.java | 14 +++-- ...iveResponseBuilderImplInstrumentation.java | 52 +++++++++++++++++++ ...ReceiveResponseBuilderInstrumentation.java | 51 ++++++++++++++++++ .../sqs/SqsReceiveResponseInternalAccess.java | 20 +++++++ .../sqs/SqsReceiveResultInstrumentation.java | 3 ++ .../src/test/groovy/SqsClientTest.groovy | 8 +-- 8 files changed, 191 insertions(+), 14 deletions(-) create mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java create mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java create mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java create mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java b/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java index 4969eff41f6..d0a45c7562c 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/TracingExecutionInterceptor.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.core.interceptor.Context.BeforeTransmission; import software.amazon.awssdk.core.interceptor.Context.FailedExecution; import software.amazon.awssdk.core.interceptor.Context.ModifyHttpRequest; +import software.amazon.awssdk.core.interceptor.Context.ModifyResponse; import software.amazon.awssdk.core.interceptor.ExecutionAttribute; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; @@ -111,6 +112,21 @@ public void beforeTransmission( } } + @Override + public SdkResponse modifyResponse( + final ModifyResponse context, final ExecutionAttributes executionAttributes) { + final SdkResponse response = context.response(); + if (!AWS_LEGACY_TRACING && isPollingRequest(context.request()) && isPollingResponse(response)) { + // Attach queueUrl to the unmarshalled response before the SDK rebuilds it with + // toBuilder().sdkHttpResponse(...).build(). + context + .request() + .getValueForField("QueueUrl", String.class) + .ifPresent(queueUrl -> responseQueueStore.put(response, queueUrl)); + } + return response; + } + @Override public void afterExecution( final AfterExecution context, final ExecutionAttributes executionAttributes) { @@ -124,13 +140,6 @@ public void afterExecution( DECORATE.beforeFinish(span); span.finish(); } - if (!AWS_LEGACY_TRACING && isPollingResponse(context.response())) { - // store queueUrl inside response for SqsReceiveResultInstrumentation - context - .request() - .getValueForField("QueueUrl", String.class) - .ifPresent(queueUrl -> responseQueueStore.put(context.response(), queueUrl)); - } } @Override diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java new file mode 100644 index 00000000000..1bc61c8f7df --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMd5ChecksumInterceptorInstrumentation.java @@ -0,0 +1,34 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.asm.Advice; + +public final class SqsMd5ChecksumInterceptorInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("afterExecution")), getClass().getName() + "$AfterExecutionAdvice"); + } + + public static class AfterExecutionAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter() { + SqsReceiveResponseInternalAccess.enter(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit() { + SqsReceiveResponseInternalAccess.exit(); + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java index 5db7932aeef..4d2fe577bd1 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsModule.java @@ -5,7 +5,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.InstrumenterConfig; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -24,6 +23,7 @@ public String[] helperClassNames() { "datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector", "datadog.trace.instrumentation.aws.v2.sqs.MessageExtractAdapter", "datadog.trace.instrumentation.aws.v2.sqs.SqsDecorator", + "datadog.trace.instrumentation.aws.v2.sqs.SqsReceiveResponseInternalAccess", "datadog.trace.instrumentation.aws.v2.sqs.TracingIterator", "datadog.trace.instrumentation.aws.v2.sqs.TracingList", "datadog.trace.instrumentation.aws.v2.sqs.TracingListIterator" @@ -32,17 +32,25 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return Collections.singletonMap( + Map contextStore = new java.util.HashMap<>(); + contextStore.put( "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String"); + contextStore.put( + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl", + "java.lang.String"); + return contextStore; } @Override public List typeInstrumentations() { - final List ret = new ArrayList<>(4); + final List ret = new ArrayList<>(6); ret.add(new SqsClientInstrumentation()); ret.add(new SqsReceiveRequestInstrumentation()); // we don't need to instrument messages when we're doing legacy AWS-SDK tracing if (!InstrumenterConfig.get().isLegacyInstrumentationEnabled(false, "aws-sdk")) { + ret.add(new SqsMd5ChecksumInterceptorInstrumentation()); + ret.add(new SqsReceiveResponseBuilderInstrumentation()); + ret.add(new SqsReceiveResponseBuilderImplInstrumentation()); ret.add(new SqsReceiveResultInstrumentation()); } return ret; diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java new file mode 100644 index 00000000000..4e4465309ce --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderImplInstrumentation.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +public final class SqsReceiveResponseBuilderImplInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + private static final String BUILDER_IMPL = + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl"; + + @Override + public String instrumentedType() { + return BUILDER_IMPL; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("build")) + .and(takesNoArguments()) + .and( + returns(named("software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse"))), + getClass().getName() + "$BuildAdvice"); + } + + public static class BuildAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Object builder, @Advice.Return ReceiveMessageResponse response) { + if (response == null) { + return; + } + ContextStore builderStore = + InstrumentationContext.get(BUILDER_IMPL, "java.lang.String"); + String queueUrl = builderStore.get(builder); + if (queueUrl != null) { + InstrumentationContext.get(ReceiveMessageResponse.class, String.class) + .put(response, queueUrl); + } + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java new file mode 100644 index 00000000000..e7ca06f537a --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseBuilderInstrumentation.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +public final class SqsReceiveResponseBuilderInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + private static final String BUILDER_IMPL = + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl"; + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("toBuilder")) + .and(takesNoArguments()) + .and( + returns( + named( + "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$Builder"))), + getClass().getName() + "$ToBuilderAdvice"); + } + + public static class ToBuilderAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This ReceiveMessageResponse response, @Advice.Return Object builder) { + if (builder == null) { + return; + } + String queueUrl = + InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(response); + if (queueUrl != null) { + InstrumentationContext.get(BUILDER_IMPL, "java.lang.String").put(builder, queueUrl); + } + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java new file mode 100644 index 00000000000..aa73af23641 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResponseInternalAccess.java @@ -0,0 +1,20 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import datadog.trace.bootstrap.CallDepthThreadLocalMap; + +public final class SqsReceiveResponseInternalAccess { + + private SqsReceiveResponseInternalAccess() {} + + public static void enter() { + CallDepthThreadLocalMap.incrementCallDepth(SqsReceiveResponseInternalAccess.class); + } + + public static void exit() { + CallDepthThreadLocalMap.decrementCallDepth(SqsReceiveResponseInternalAccess.class); + } + + public static boolean active() { + return CallDepthThreadLocalMap.getCallDepth(SqsReceiveResponseInternalAccess.class) > 0; + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java index 148fd04479a..2a340623327 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java @@ -29,6 +29,9 @@ public static class GetMessagesAdvice { public static void onExit( @Advice.This ReceiveMessageResponse result, @Advice.Return(readOnly = false) List messages) { + if (SqsReceiveResponseInternalAccess.active()) { + return; + } if (messages != null && !messages.isEmpty() && !(messages instanceof TracingList)) { String queueUrl = InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index 19ee1fb5638..6e6385a1186 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -100,12 +100,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase { }) def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach {/* consume to create message spans */ } + if (isDataStreamsEnabled()) { TEST_DATA_STREAMS_WRITER.waitForGroups(2) } - messages.forEach {/* consume to create message spans */ } - then: def sendSpan assertTraces(2) { @@ -231,12 +231,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase { }) def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages() + messages.forEach {/* consume to create message spans */ } + if (isDataStreamsEnabled()) { TEST_DATA_STREAMS_WRITER.waitForGroups(2) } - messages.forEach {/* consume to create message spans */ } - then: def sendSpan assertTraces(2) {