Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.awssdk.core.interceptor.Context.BeforeTransmission;
import software.amazon.awssdk.core.interceptor.Context.FailedExecution;
import software.amazon.awssdk.core.interceptor.Context.ModifyHttpRequest;
import software.amazon.awssdk.core.interceptor.Context.ModifyResponse;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand Down Expand Up @@ -111,6 +112,21 @@ public void beforeTransmission(
}
}

@Override
public SdkResponse modifyResponse(
final ModifyResponse context, final ExecutionAttributes executionAttributes) {
final SdkResponse response = context.response();
if (!AWS_LEGACY_TRACING && isPollingRequest(context.request()) && isPollingResponse(response)) {
// Attach queueUrl to the unmarshalled response before the SDK rebuilds it with
// toBuilder().sdkHttpResponse(...).build().
context
.request()
.getValueForField("QueueUrl", String.class)
.ifPresent(queueUrl -> responseQueueStore.put(response, queueUrl));
}
return response;
}

@Override
public void afterExecution(
final AfterExecution context, final ExecutionAttributes executionAttributes) {
Expand All @@ -124,13 +140,6 @@ public void afterExecution(
DECORATE.beforeFinish(span);
span.finish();
}
if (!AWS_LEGACY_TRACING && isPollingResponse(context.response())) {
// store queueUrl inside response for SqsReceiveResultInstrumentation
context
.request()
.getValueForField("QueueUrl", String.class)
.ifPresent(queueUrl -> responseQueueStore.put(context.response(), queueUrl));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import datadog.trace.agent.tooling.Instrumenter;
import net.bytebuddy.asm.Advice;

public final class SqsMd5ChecksumInterceptorInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("afterExecution")), getClass().getName() + "$AfterExecutionAdvice");
}

public static class AfterExecutionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
SqsReceiveResponseInternalAccess.enter();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit() {
SqsReceiveResponseInternalAccess.exit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -24,6 +23,7 @@ public String[] helperClassNames() {
"datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector",
"datadog.trace.instrumentation.aws.v2.sqs.MessageExtractAdapter",
"datadog.trace.instrumentation.aws.v2.sqs.SqsDecorator",
"datadog.trace.instrumentation.aws.v2.sqs.SqsReceiveResponseInternalAccess",
"datadog.trace.instrumentation.aws.v2.sqs.TracingIterator",
"datadog.trace.instrumentation.aws.v2.sqs.TracingList",
"datadog.trace.instrumentation.aws.v2.sqs.TracingListIterator"
Expand All @@ -32,17 +32,25 @@ public String[] helperClassNames() {

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
Map<String, String> contextStore = new java.util.HashMap<>();
contextStore.put(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
contextStore.put(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl",
"java.lang.String");
return contextStore;
}

@Override
public List<Instrumenter> typeInstrumentations() {
final List<Instrumenter> ret = new ArrayList<>(4);
final List<Instrumenter> ret = new ArrayList<>(6);
ret.add(new SqsClientInstrumentation());
ret.add(new SqsReceiveRequestInstrumentation());
// we don't need to instrument messages when we're doing legacy AWS-SDK tracing
if (!InstrumenterConfig.get().isLegacyInstrumentationEnabled(false, "aws-sdk")) {
ret.add(new SqsMd5ChecksumInterceptorInstrumentation());
ret.add(new SqsReceiveResponseBuilderInstrumentation());
ret.add(new SqsReceiveResponseBuilderImplInstrumentation());
ret.add(new SqsReceiveResultInstrumentation());
}
return ret;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;

import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public final class SqsReceiveResponseBuilderImplInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

private static final String BUILDER_IMPL =
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl";

@Override
public String instrumentedType() {
return BUILDER_IMPL;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(named("build"))
.and(takesNoArguments())
.and(
returns(named("software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse"))),
getClass().getName() + "$BuildAdvice");
}

public static class BuildAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Object builder, @Advice.Return ReceiveMessageResponse response) {
if (response == null) {
return;
}
ContextStore<Object, String> builderStore =
InstrumentationContext.get(BUILDER_IMPL, "java.lang.String");
String queueUrl = builderStore.get(builder);
if (queueUrl != null) {
InstrumentationContext.get(ReceiveMessageResponse.class, String.class)
.put(response, queueUrl);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;

import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public final class SqsReceiveResponseBuilderInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

private static final String BUILDER_IMPL =
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$BuilderImpl";

@Override
public String instrumentedType() {
return "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(named("toBuilder"))
.and(takesNoArguments())
.and(
returns(
named(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse$Builder"))),
getClass().getName() + "$ToBuilderAdvice");
}

public static class ToBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This ReceiveMessageResponse response, @Advice.Return Object builder) {
if (builder == null) {
return;
}
String queueUrl =
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(response);
if (queueUrl != null) {
InstrumentationContext.get(BUILDER_IMPL, "java.lang.String").put(builder, queueUrl);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import datadog.trace.bootstrap.CallDepthThreadLocalMap;

public final class SqsReceiveResponseInternalAccess {

private SqsReceiveResponseInternalAccess() {}

public static void enter() {
CallDepthThreadLocalMap.incrementCallDepth(SqsReceiveResponseInternalAccess.class);
}

public static void exit() {
CallDepthThreadLocalMap.decrementCallDepth(SqsReceiveResponseInternalAccess.class);
}

public static boolean active() {
return CallDepthThreadLocalMap.getCallDepth(SqsReceiveResponseInternalAccess.class) > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public static class GetMessagesAdvice {
public static void onExit(
@Advice.This ReceiveMessageResponse result,
@Advice.Return(readOnly = false) List<Message> messages) {
if (SqsReceiveResponseInternalAccess.active()) {
return;
}
if (messages != null && !messages.isEmpty() && !(messages instanceof TracingList)) {
String queueUrl =
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
})
def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages()

messages.forEach {/* consume to create message spans */ }

if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}

messages.forEach {/* consume to create message spans */ }

then:
def sendSpan
assertTraces(2) {
Expand Down Expand Up @@ -231,12 +231,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
})
def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages()

messages.forEach {/* consume to create message spans */ }

if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}

messages.forEach {/* consume to create message spans */ }

then:
def sendSpan
assertTraces(2) {
Expand Down Expand Up @@ -312,6 +312,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
client.close()
}

def "sync receive activates consumer span for user handler iteration"() {
setup:
def client = SqsClient.builder()
.region(Region.EU_CENTRAL_1)
.endpointOverride(endpoint)
.credentialsProvider(credentialsProvider)
.build()
def queueUrl = client.createQueue(CreateQueueRequest.builder().queueName('somequeue').build()).queueUrl()
TEST_WRITER.clear()

when:
TraceUtils.runUnderTrace('parent', {
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody('sometext').build())
})
// The sync AWS SDK receive pipeline normally rebuilds the immutable response before user code
// sees it. If receive context stays only on the original response, the copied messages lose it
// and the handler span disconnects from the consumer span.
def messages = client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).build()).messages()
messages.forEach {
TraceUtils.runUnderTrace('handler') {
null
}
}

then:
messages.size() == 1

def sendSpan
assertTraces(2) {
trace(2) {
basicSpan(it, 'parent')
span {
serviceName expectedService("Sqs", "SendMessage")
operationName expectedOperation("Sqs", "SendMessage")
resourceName "Sqs.SendMessage"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" address.port
"$Tags.PEER_HOSTNAME" "localhost"
"aws.service" "Sqs"
"aws_service" "Sqs"
"aws.operation" "SendMessage"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage"))
serviceNameSource("java-aws-sdk")
defaultTags()
}
}
sendSpan = span(1)
}
trace(2) {
span {
serviceName expectedService("Sqs", "ReceiveMessage")
operationName expectedOperation("Sqs", "ReceiveMessage")
resourceName "Sqs.ReceiveMessage"
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
childOf(sendSpan)
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"aws.service" "Sqs"
"aws_service" "Sqs"
"aws.operation" "ReceiveMessage"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
defaultTags(true)
}
}
basicSpan(it, "handler", span(0))
}
}

cleanup:
client.close()
}

@IgnoreIf({instance.isDataStreamsEnabled()})
def "trace details propagated via embedded SQS message attribute (string)"() {
setup:
Expand Down Expand Up @@ -679,5 +773,3 @@ class SqsClientV0ContextSwapForkedTest extends SqsClientV0Test {
injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false")
}
}


Loading