Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;

import datadog.context.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.function.Function;
import reactor.core.publisher.Mono;
Expand All @@ -11,14 +12,15 @@ public class CaptureConnectSpan
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {

static final String CONNECT_SPAN = "datadog.connect.span";
static final String CONNECT_CONTEXT = "datadog.connect.context";

@Override
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
return mono.contextWrite(
context -> {
final AgentSpan span = activeSpan();
if (null != span) {
return context.put(CONNECT_SPAN, span);
return context.put(CONNECT_SPAN, span).put(CONNECT_CONTEXT, Context.current());
} else {
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan;
import static datadog.trace.instrumentation.netty41.AttributeKeys.CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.reactor.netty.CaptureConnectSpan.CONNECT_CONTEXT;
import static datadog.trace.instrumentation.reactor.netty.CaptureConnectSpan.CONNECT_SPAN;

import datadog.context.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.function.BiConsumer;
Expand All @@ -14,7 +16,13 @@ public class TransferConnectSpan implements BiConsumer<HttpClientRequest, Connec
@Override
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
final AgentSpan span = httpClientRequest.currentContextView().getOrDefault(CONNECT_SPAN, null);
final Continuation continuation = null == span ? null : captureSpan(span);
if (null == span) {
return;
}
final Context capturedContext =
httpClientRequest.currentContextView().getOrDefault(CONNECT_CONTEXT, null);
final Continuation continuation =
capturedContext != null ? captureSpan(span, capturedContext) : captureSpan(span);
if (null != continuation) {
Continuation current =
connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.ExternalAgentLauncher;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.context.Context;
import datadog.context.propagation.Propagators;
import datadog.environment.ThreadSupport;
import datadog.logging.RatelimitedLogger;
Expand Down Expand Up @@ -1163,6 +1164,11 @@ public AgentScope.Continuation captureSpan(final AgentSpan span) {
return scopeManager.captureSpan(span);
}

@Override
public AgentScope.Continuation captureSpan(final AgentSpan span, final Context capturedContext) {
return scopeManager.captureSpan(span, capturedContext);
}

@Override
public boolean isAsyncPropagationEnabled() {
return scopeManager.isAsyncPropagationEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ public AgentScope.Continuation captureSpan(final AgentSpan span) {
return captureSpan(context, INSTRUMENTATION, span);
}

public AgentScope.Continuation captureSpan(final AgentSpan span, final Context capturedContext) {
if (span == null) {
return AgentTracer.noopContinuation();
}
return captureSpan(capturedContext.with(span), INSTRUMENTATION, span);
}

private AgentScope.Continuation captureSpan(Context context, byte source, AgentSpan span) {
AgentTraceCollector traceCollector = span.context().getTraceCollector();
return new ScopeContinuation(this, context, source, traceCollector).register();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.bootstrap.instrumentation.api;

import datadog.context.Context;
import datadog.trace.api.ConfigDefaults;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.EndpointCheckpointer;
Expand Down Expand Up @@ -104,6 +105,23 @@ public static AgentScope.Continuation captureSpan(final AgentSpan span) {
return get().captureSpan(span);
}

/**
* Like {@link #captureSpan(AgentSpan)} but uses an explicitly supplied context instead of the
* current thread's scope-stack context. Use this when the context with baggage was captured on a
* different thread (e.g. a Reactor subscription thread) and needs to be carried into the
* continuation created on an I/O thread that has no active scope.
*
* @param span the span to include in the continuation
* @param capturedContext the context (e.g. containing baggage) captured from the originating
* thread
* @return Continuation with the supplied context, no-op continuation if the span is null.
*/
@Nonnull
public static AgentScope.Continuation captureSpan(
final AgentSpan span, final Context capturedContext) {
return get().captureSpan(span, capturedContext);
}

/**
* Checkpoints the active scope. A subsequent call to {@link #rollbackActiveToCheckpoint()} closes
* outstanding scopes up to but not including the most recent checkpointed scope.
Expand Down Expand Up @@ -329,6 +347,8 @@ AgentSpan startSpan(

AgentScope.Continuation captureSpan(AgentSpan span);

AgentScope.Continuation captureSpan(AgentSpan span, Context capturedContext);

void checkpointActiveForRollback();

void rollbackActiveToCheckpoint();
Expand Down Expand Up @@ -488,6 +508,12 @@ public AgentScope.Continuation captureSpan(final AgentSpan span) {
return NoopContinuation.INSTANCE;
}

@Override
public AgentScope.Continuation captureSpan(
final AgentSpan span, final Context capturedContext) {
return NoopContinuation.INSTANCE;
}

@Override
public boolean isAsyncPropagationEnabled() {
return false;
Expand Down