diff --git a/communication/build.gradle.kts b/communication/build.gradle.kts index b5f31fadb68..5ade2a96a78 100644 --- a/communication/build.gradle.kts +++ b/communication/build.gradle.kts @@ -20,15 +20,26 @@ dependencies { api(libs.okio) api(libs.okhttp) api(libs.moshi) + implementation(libs.netty.codec.http) + implementation(libs.netty.handler) + implementation(libs.netty.handler.proxy) + implementation(libs.apache.httpclient5) + implementation(libs.jetty.httpclient) + implementation(libs.jetty.unixsocket) { + exclude(group = "org.eclipse.jetty", module = "jetty-server") + exclude(group = "javax.servlet", module = "javax.servlet-api") + } // metrics-lib is needed rather than metrics-api to change the default port of StatsD connection manager // TODO Could help decoupling it later to only depend on metrics-api implementation(project(":products:metrics:metrics-lib")) testImplementation(project(":utils:test-utils")) testImplementation(libs.bundles.junit5) + testImplementation(libs.junit.platform.suite) testImplementation(libs.bytebuddy) testImplementation("org.msgpack:msgpack-core:0.8.20") testImplementation("org.msgpack:jackson-dataformat-msgpack:0.8.20") + testImplementation(libs.jnr.unixsocket) testImplementation( group = "com.squareup.okhttp3", name = "mockwebserver", diff --git a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java index 3578f0e1c93..fc450e38d29 100644 --- a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java +++ b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java @@ -48,6 +48,13 @@ public class HttpRetryPolicy implements AutoCloseable { private static final int MAX_ALLOWED_WAIT_TIME_SECONDS = 10; private static final int RATE_LIMIT_DELAY_RANDOM_COMPONENT_MAX_MILLIS = 401; + public interface Response { + int code(); + + @Nullable + String header(String name); + } + private int retriesLeft; private long delay; private boolean interrupted; @@ -82,6 +89,10 @@ public boolean shouldRetry(Exception e) { } public boolean shouldRetry(@Nullable okhttp3.Response response) { + return shouldRetry(response == null ? null : new OkHttpResponseAdapter(response)); + } + + public boolean shouldRetry(@Nullable Response response) { if (retriesLeft == 0) { return false; } @@ -114,6 +125,10 @@ public boolean shouldRetry(@Nullable okhttp3.Response response) { } private long getRateLimitResetTime(okhttp3.Response response) { + return getRateLimitResetTime(new OkHttpResponseAdapter(response)); + } + + private long getRateLimitResetTime(Response response) { String rateLimitHeader = response.header(X_RATELIMIT_RESET_HTTP_HEADER); if (rateLimitHeader == null) { return RATE_LIMIT_RESET_TIME_UNDEFINED; @@ -181,4 +196,22 @@ public HttpRetryPolicy create() { return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts); } } + + private static final class OkHttpResponseAdapter implements Response { + private final okhttp3.Response delegate; + + private OkHttpResponseAdapter(okhttp3.Response delegate) { + this.delegate = delegate; + } + + @Override + public int code() { + return delegate.code(); + } + + @Override + public @Nullable String header(String name) { + return delegate.header(name); + } + } } diff --git a/communication/src/main/java/datadog/communication/http/client/HttpClient.java b/communication/src/main/java/datadog/communication/http/client/HttpClient.java new file mode 100644 index 00000000000..cf5db03ff71 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpClient.java @@ -0,0 +1,12 @@ +package datadog.communication.http.client; + +import java.io.IOException; + +/** Facade for synchronous HTTP calls independent from the underlying client implementation. */ +public interface HttpClient extends AutoCloseable { + + HttpClientResponse execute(HttpClientRequest request) throws IOException; + + @Override + void close(); +} diff --git a/communication/src/main/java/datadog/communication/http/client/HttpClientBuilder.java b/communication/src/main/java/datadog/communication/http/client/HttpClientBuilder.java new file mode 100644 index 00000000000..8b360dbe1e3 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpClientBuilder.java @@ -0,0 +1,29 @@ +package datadog.communication.http.client; + +import datadog.communication.http.HttpRetryPolicy; +import javax.annotation.Nullable; + +/** + * Common builder contract for facade HTTP clients, independent from the underlying implementation. + */ +public interface HttpClientBuilder> { + + B transport(HttpTransport transport); + + B unixDomainSocketPath(@Nullable String unixDomainSocketPath); + + B namedPipe(@Nullable String namedPipe); + + B connectTimeoutMillis(long connectTimeoutMillis); + + B requestTimeoutMillis(long requestTimeoutMillis); + + B proxy(String proxyHost, int proxyPort); + + B proxy( + String proxyHost, int proxyPort, @Nullable String proxyUsername, @Nullable String proxyPassword); + + B retryPolicyFactory(HttpRetryPolicy.Factory retryPolicyFactory); + + HttpClient build(); +} diff --git a/communication/src/main/java/datadog/communication/http/client/HttpClientFactory.java b/communication/src/main/java/datadog/communication/http/client/HttpClientFactory.java new file mode 100644 index 00000000000..ba4c86244d1 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpClientFactory.java @@ -0,0 +1,7 @@ +package datadog.communication.http.client; + +/** Factory for creating transport-specific HTTP client facades. */ +public interface HttpClientFactory { + + HttpClient create(); +} diff --git a/communication/src/main/java/datadog/communication/http/client/HttpClientRequest.java b/communication/src/main/java/datadog/communication/http/client/HttpClientRequest.java new file mode 100644 index 00000000000..c92c0ab71ca --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpClientRequest.java @@ -0,0 +1,100 @@ +package datadog.communication.http.client; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Immutable request model used by client facades. */ +public final class HttpClientRequest { + private final URI uri; + private final String method; + private final Map> headers; + private final byte[] body; + + private HttpClientRequest( + URI uri, String method, Map> headers, byte[] body) { + this.uri = uri; + this.method = method; + this.headers = headers; + this.body = body; + } + + public URI uri() { + return uri; + } + + public String method() { + return method; + } + + public Map> headers() { + return headers; + } + + public byte[] body() { + return body; + } + + public static Builder builder(URI uri, String method) { + return new Builder(uri, method); + } + + public static final class Builder { + private final URI uri; + private final String method; + private final Map> headers = new LinkedHashMap<>(); + private byte[] body = new byte[0]; + + private Builder(URI uri, String method) { + this.uri = uri; + this.method = method; + } + + public Builder addHeader(String name, String value) { + headers.computeIfAbsent(name, ignored -> new ArrayList<>()).add(value); + return this; + } + + public Builder headers(Map values) { + for (Map.Entry entry : values.entrySet()) { + addHeader(entry.getKey(), entry.getValue()); + } + return this; + } + + public Builder body(byte[] bytes) { + this.body = bytes != null ? bytes : new byte[0]; + return this; + } + + public Builder body(List buffers) { + int totalLength = 0; + for (ByteBuffer buffer : buffers) { + totalLength += buffer.remaining(); + } + byte[] merged = new byte[totalLength]; + int offset = 0; + for (ByteBuffer buffer : buffers) { + int remaining = buffer.remaining(); + buffer.duplicate().get(merged, offset, remaining); + offset += remaining; + } + this.body = merged; + return this; + } + + public HttpClientRequest build() { + Map> frozenHeaders = new LinkedHashMap<>(); + for (Map.Entry> entry : headers.entrySet()) { + frozenHeaders.put( + entry.getKey(), Collections.unmodifiableList(new ArrayList<>(entry.getValue()))); + } + return new HttpClientRequest( + uri, method, Collections.unmodifiableMap(frozenHeaders), body.clone()); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/HttpClientResponse.java b/communication/src/main/java/datadog/communication/http/client/HttpClientResponse.java new file mode 100644 index 00000000000..663e80f8ce7 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpClientResponse.java @@ -0,0 +1,59 @@ +package datadog.communication.http.client; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import javax.annotation.Nullable; + +/** Immutable in-memory HTTP response model used by client facades. */ +public final class HttpClientResponse implements AutoCloseable { + private final int statusCode; + private final Map> headers; + private final byte[] body; + + public HttpClientResponse(int statusCode, Map> headers, byte[] body) { + this.statusCode = statusCode; + + Map> frozenHeaders = new LinkedHashMap<>(); + for (Map.Entry> entry : headers.entrySet()) { + frozenHeaders.put( + entry.getKey().toLowerCase(Locale.ROOT), + Collections.unmodifiableList(new ArrayList<>(entry.getValue()))); + } + + this.headers = Collections.unmodifiableMap(frozenHeaders); + this.body = body.clone(); + } + + public int statusCode() { + return statusCode; + } + + public Map> headers() { + return headers; + } + + @Nullable + public String header(String name) { + List values = headers.get(name.toLowerCase(Locale.ROOT)); + return values == null || values.isEmpty() ? null : values.get(0); + } + + public byte[] body() { + return body.clone(); + } + + public InputStream bodyStream() { + return new ByteArrayInputStream(body); + } + + @Override + public void close() { + // no-op for an in-memory response + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/HttpTransport.java b/communication/src/main/java/datadog/communication/http/client/HttpTransport.java new file mode 100644 index 00000000000..4d794287d70 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/HttpTransport.java @@ -0,0 +1,7 @@ +package datadog.communication.http.client; + +public enum HttpTransport { + TCP, + UNIX_DOMAIN_SOCKET, + NAMED_PIPE +} diff --git a/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClient.java b/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClient.java new file mode 100644 index 00000000000..3c41c192492 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClient.java @@ -0,0 +1,211 @@ +package datadog.communication.http.client.ahc; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientRequest; +import datadog.communication.http.client.HttpClientResponse; +import datadog.communication.http.client.HttpTransport; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.util.Timeout; + +final class ApacheAsyncHttpClient implements HttpClient { + private final long requestTimeoutMillis; + private final HttpRetryPolicy.Factory retryPolicyFactory; + private final CloseableHttpAsyncClient client; + private final boolean closeClientOnClose; + private final Set> inFlightRequests = ConcurrentHashMap.newKeySet(); + private volatile boolean closed; + + ApacheAsyncHttpClient( + HttpTransport transport, + long connectTimeoutMillis, + long requestTimeoutMillis, + long responseTimeoutMillis, + @Nullable String proxyHost, + @Nullable Integer proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword, + HttpRetryPolicy.Factory retryPolicyFactory, + @Nullable CloseableHttpAsyncClient externallyProvidedClient, + boolean closeClientOnClose) { + if (transport != HttpTransport.TCP) { + throw new IllegalArgumentException("Apache async client supports TCP transport only"); + } + + this.requestTimeoutMillis = requestTimeoutMillis; + this.retryPolicyFactory = retryPolicyFactory; + + if (externallyProvidedClient != null) { + this.client = externallyProvidedClient; + this.closeClientOnClose = closeClientOnClose; + this.client.start(); + return; + } + + RequestConfig.Builder configBuilder = + RequestConfig.custom() + .setConnectTimeout(Timeout.ofMilliseconds(connectTimeoutMillis)) + .setResponseTimeout(Timeout.ofMilliseconds(responseTimeoutMillis)) + .setConnectionRequestTimeout(Timeout.ofMilliseconds(requestTimeoutMillis)); + + BasicCredentialsProvider credentialsProvider = null; + if (proxyHost != null && proxyPort != null) { + configBuilder.setProxy(new HttpHost("http", proxyHost, proxyPort)); + if (proxyUsername != null) { + credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(proxyHost, proxyPort), + new UsernamePasswordCredentials(proxyUsername, asChars(proxyPassword))); + } + } + + org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder clientBuilder = + HttpAsyncClients.custom().setDefaultRequestConfig(configBuilder.build()); + if (credentialsProvider != null) { + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + this.client = clientBuilder.build(); + this.client.start(); + this.closeClientOnClose = true; + } + + @Override + public HttpClientResponse execute(HttpClientRequest request) throws IOException { + if (closed) { + throw new IOException("http client is closed"); + } + + try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) { + while (true) { + try { + HttpClientResponse response = executeOnce(request); + if (!retryPolicy.shouldRetry(new ResponseAdapter(response))) { + return response; + } + } catch (Exception e) { + IOException io = e instanceof IOException ? (IOException) e : new IOException(e); + if (!retryPolicy.shouldRetry(io)) { + throw io; + } + } + retryPolicy.backoff(); + } + } + } + + private HttpClientResponse executeOnce(HttpClientRequest request) throws IOException { + SimpleHttpRequest httpRequest = toApacheRequest(request); + + Future future = client.execute(httpRequest, null); + inFlightRequests.add(future); + try { + SimpleHttpResponse response = future.get(requestTimeoutMillis, TimeUnit.MILLISECONDS); + return toFacadeResponse(response); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("request interrupted", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw cause instanceof IOException ? (IOException) cause : new IOException(cause); + } catch (TimeoutException e) { + future.cancel(true); + throw new IOException("request timeout", e); + } finally { + inFlightRequests.remove(future); + } + } + + private static SimpleHttpRequest toApacheRequest(HttpClientRequest request) { + URI uri = request.uri(); + SimpleRequestBuilder requestBuilder = SimpleRequestBuilder.create(request.method()).setUri(uri); + + for (Map.Entry> header : request.headers().entrySet()) { + for (String value : header.getValue()) { + requestBuilder.addHeader(header.getKey(), value); + } + } + + byte[] body = request.body(); + if (body.length > 0) { + requestBuilder.setBody(body, ContentType.APPLICATION_OCTET_STREAM); + } + + return requestBuilder.build(); + } + + private static HttpClientResponse toFacadeResponse(SimpleHttpResponse response) { + Map> headers = new LinkedHashMap<>(); + for (Header header : response.getHeaders()) { + headers + .computeIfAbsent(header.getName(), ignored -> new ArrayList<>()) + .add(header.getValue()); + } + + byte[] body = response.getBodyBytes() != null ? response.getBodyBytes() : new byte[0]; + return new HttpClientResponse(response.getCode(), headers, body); + } + + @Override + public void close() { + closed = true; + + for (Future future : inFlightRequests) { + future.cancel(true); + } + inFlightRequests.clear(); + + if (closeClientOnClose) { + try { + client.close(); + } catch (IOException ignored) { + } + } + } + + private static char[] asChars(@Nullable String value) { + return value == null ? new char[0] : value.toCharArray(); + } + + private static final class ResponseAdapter implements HttpRetryPolicy.Response { + private final HttpClientResponse response; + + private ResponseAdapter(HttpClientResponse response) { + this.response = response; + } + + @Override + public int code() { + return response.statusCode(); + } + + @Override + public @Nullable String header(String name) { + return response.header(name); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClientBuilder.java b/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClientBuilder.java new file mode 100644 index 00000000000..a13318bf02e --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/ahc/ApacheAsyncHttpClientBuilder.java @@ -0,0 +1,131 @@ +package datadog.communication.http.client.ahc; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientBuilder; +import datadog.communication.http.client.HttpTransport; +import javax.annotation.Nullable; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +/** Builder used to configure and create {@link ApacheAsyncHttpClient}. */ +public final class ApacheAsyncHttpClientBuilder + implements HttpClientBuilder { + private HttpTransport transport = HttpTransport.TCP; + private long connectTimeoutMillis = 10_000; + private long requestTimeoutMillis = 10_000; + private long responseTimeoutMillis = 10_000; + private String proxyHost; + private Integer proxyPort; + private String proxyUsername; + private String proxyPassword; + private HttpRetryPolicy.Factory retryPolicyFactory = HttpRetryPolicy.Factory.NEVER_RETRY; + private CloseableHttpAsyncClient client; + private boolean closeClientOnClose = true; + + @Override + public ApacheAsyncHttpClientBuilder transport(HttpTransport transport) { + if (transport != HttpTransport.TCP) { + throw new IllegalArgumentException( + "Apache async client supports only TCP transport, got: " + transport); + } + this.transport = transport; + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder unixDomainSocketPath(@Nullable String unixDomainSocketPath) { + if (unixDomainSocketPath != null && !unixDomainSocketPath.isEmpty()) { + throw new UnsupportedOperationException( + "Apache async client does not support Unix Domain Socket transport"); + } + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder namedPipe(@Nullable String namedPipe) { + if (namedPipe != null && !namedPipe.isEmpty()) { + throw new UnsupportedOperationException( + "Apache async client does not support named pipe transport"); + } + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder connectTimeoutMillis(long connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder requestTimeoutMillis(long requestTimeoutMillis) { + this.requestTimeoutMillis = requestTimeoutMillis; + return this; + } + + public ApacheAsyncHttpClientBuilder responseTimeoutMillis(long responseTimeoutMillis) { + this.responseTimeoutMillis = responseTimeoutMillis; + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder proxy(String proxyHost, int proxyPort) { + return proxy(proxyHost, proxyPort, null, null); + } + + @Override + public ApacheAsyncHttpClientBuilder proxy( + String proxyHost, + int proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword) { + this.proxyHost = proxyHost; + this.proxyPort = proxyPort; + this.proxyUsername = proxyUsername; + this.proxyPassword = proxyPassword; + return this; + } + + @Override + public ApacheAsyncHttpClientBuilder retryPolicyFactory( + HttpRetryPolicy.Factory retryPolicyFactory) { + this.retryPolicyFactory = retryPolicyFactory; + return this; + } + + public ApacheAsyncHttpClientBuilder client( + CloseableHttpAsyncClient client, boolean closeClientOnClose) { + this.client = client; + this.closeClientOnClose = closeClientOnClose; + return this; + } + + @Override + public HttpClient build() { + validate(); + return new ApacheAsyncHttpClient( + transport, + connectTimeoutMillis, + requestTimeoutMillis, + responseTimeoutMillis, + proxyHost, + proxyPort, + proxyUsername, + proxyPassword, + retryPolicyFactory, + client, + closeClientOnClose); + } + + private void validate() { + if (transport != HttpTransport.TCP) { + throw new IllegalArgumentException( + "Apache async client currently supports TCP transport only"); + } + if (connectTimeoutMillis <= 0 || requestTimeoutMillis <= 0 || responseTimeoutMillis <= 0) { + throw new IllegalArgumentException("timeouts must be strictly positive"); + } + if (proxyHost != null && proxyPort == null) { + throw new IllegalArgumentException("proxy port must be provided when proxy host is set"); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClient.java b/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClient.java new file mode 100644 index 00000000000..410015bcf20 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClient.java @@ -0,0 +1,207 @@ +package datadog.communication.http.client.jetty; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientRequest; +import datadog.communication.http.client.HttpClientResponse; +import datadog.communication.http.client.HttpTransport; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.HttpProxy; +import org.eclipse.jetty.client.api.Authentication; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.BasicAuthentication; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets; + +final class JettyHttpClient implements HttpClient { + private final long requestTimeoutMillis; + private final HttpRetryPolicy.Factory retryPolicyFactory; + private final org.eclipse.jetty.client.HttpClient client; + private final boolean closeClientOnClose; + private final Set inFlightRequests = ConcurrentHashMap.newKeySet(); + private volatile boolean closed; + + JettyHttpClient( + HttpTransport transport, + @Nullable String unixDomainSocketPath, + long connectTimeoutMillis, + long requestTimeoutMillis, + long responseTimeoutMillis, + @Nullable String proxyHost, + @Nullable Integer proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword, + HttpRetryPolicy.Factory retryPolicyFactory, + @Nullable org.eclipse.jetty.client.HttpClient externallyProvidedClient, + boolean closeClientOnClose) { + if (transport == HttpTransport.NAMED_PIPE) { + throw new IllegalArgumentException("Jetty HTTP client supports only TCP and UDS transport"); + } + + this.requestTimeoutMillis = requestTimeoutMillis; + this.retryPolicyFactory = retryPolicyFactory; + + if (externallyProvidedClient != null) { + this.client = externallyProvidedClient; + this.closeClientOnClose = closeClientOnClose; + startClient(this.client); + return; + } + + org.eclipse.jetty.client.HttpClient client = createClient(transport, unixDomainSocketPath); + client.setConnectTimeout(connectTimeoutMillis); + client.setIdleTimeout(responseTimeoutMillis); + if (proxyHost != null && proxyPort != null) { + client.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort)); + if (proxyUsername != null) { + URI proxyUri = URI.create("http://" + proxyHost + ":" + proxyPort); + client + .getAuthenticationStore() + .addAuthentication( + new BasicAuthentication( + proxyUri, Authentication.ANY_REALM, proxyUsername, proxyPassword)); + } + } + startClient(client); + this.client = client; + this.closeClientOnClose = true; + } + + @Override + public HttpClientResponse execute(HttpClientRequest request) throws IOException { + if (closed) { + throw new IOException("http client is closed"); + } + + try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) { + while (true) { + try { + HttpClientResponse response = executeOnce(request); + if (!retryPolicy.shouldRetry(new ResponseAdapter(response))) { + return response; + } + } catch (Exception e) { + IOException io = e instanceof IOException ? (IOException) e : new IOException(e); + if (!retryPolicy.shouldRetry(io)) { + throw io; + } + } + retryPolicy.backoff(); + } + } + } + + private HttpClientResponse executeOnce(HttpClientRequest request) throws IOException { + Request jettyRequest = toJettyRequest(request); + inFlightRequests.add(jettyRequest); + try { + ContentResponse response = + jettyRequest.timeout(requestTimeoutMillis, TimeUnit.MILLISECONDS).send(); + return toFacadeResponse(response); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("request interrupted", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw cause instanceof IOException ? (IOException) cause : new IOException(cause); + } catch (TimeoutException e) { + throw new IOException("request timeout", e); + } finally { + inFlightRequests.remove(jettyRequest); + } + } + + private Request toJettyRequest(HttpClientRequest request) { + Request jettyRequest = client.newRequest(request.uri()).method(request.method()); + for (Map.Entry> header : request.headers().entrySet()) { + for (String value : header.getValue()) { + jettyRequest.header(header.getKey(), value); + } + } + + byte[] body = request.body(); + if (body.length > 0) { + jettyRequest.content(new BytesContentProvider(body)); + } + return jettyRequest; + } + + private static HttpClientResponse toFacadeResponse(ContentResponse response) { + Map> headers = new LinkedHashMap<>(); + for (HttpField header : response.getHeaders()) { + headers + .computeIfAbsent(header.getName(), ignored -> new ArrayList<>()) + .add(header.getValue()); + } + return new HttpClientResponse(response.getStatus(), headers, response.getContent()); + } + + @Override + public void close() { + closed = true; + + for (Request request : inFlightRequests) { + request.abort(new IOException("http client is closed")); + } + inFlightRequests.clear(); + + if (closeClientOnClose) { + try { + client.stop(); + } catch (Exception ignored) { + } + } + } + + private static void startClient(org.eclipse.jetty.client.HttpClient client) { + if (!client.isStarted()) { + try { + client.start(); + } catch (Exception e) { + throw new IllegalStateException("Unable to start Jetty HTTP client", e); + } + } + } + + private static org.eclipse.jetty.client.HttpClient createClient( + HttpTransport transport, @Nullable String unixDomainSocketPath) { + if (transport == HttpTransport.UNIX_DOMAIN_SOCKET) { + HttpClientTransport udsTransport = + new HttpClientTransportOverUnixSockets(unixDomainSocketPath); + return new org.eclipse.jetty.client.HttpClient(udsTransport, null); + } + return new org.eclipse.jetty.client.HttpClient(); + } + + private static final class ResponseAdapter implements HttpRetryPolicy.Response { + private final HttpClientResponse response; + + private ResponseAdapter(HttpClientResponse response) { + this.response = response; + } + + @Override + public int code() { + return response.statusCode(); + } + + @Override + public @Nullable String header(String name) { + return response.header(name); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClientBuilder.java b/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClientBuilder.java new file mode 100644 index 00000000000..2fcd40906cd --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/jetty/JettyHttpClientBuilder.java @@ -0,0 +1,131 @@ +package datadog.communication.http.client.jetty; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientBuilder; +import datadog.communication.http.client.HttpTransport; +import javax.annotation.Nullable; + +/** Builder used to configure and create {@link JettyHttpClient}. */ +public final class JettyHttpClientBuilder implements HttpClientBuilder { + private HttpTransport transport = HttpTransport.TCP; + private String unixDomainSocketPath; + private long connectTimeoutMillis = 10_000; + private long requestTimeoutMillis = 10_000; + private long responseTimeoutMillis = 10_000; + private String proxyHost; + private Integer proxyPort; + private String proxyUsername; + private String proxyPassword; + private HttpRetryPolicy.Factory retryPolicyFactory = HttpRetryPolicy.Factory.NEVER_RETRY; + private org.eclipse.jetty.client.HttpClient client; + private boolean closeClientOnClose = true; + + @Override + public JettyHttpClientBuilder transport(HttpTransport transport) { + if (transport == HttpTransport.NAMED_PIPE) { + throw new IllegalArgumentException( + "Jetty HTTP client does not support named pipe transport, got: " + transport); + } + this.transport = transport; + return this; + } + + @Override + public JettyHttpClientBuilder unixDomainSocketPath(@Nullable String unixDomainSocketPath) { + this.unixDomainSocketPath = unixDomainSocketPath; + return this; + } + + @Override + public JettyHttpClientBuilder namedPipe(@Nullable String namedPipe) { + if (namedPipe != null && !namedPipe.isEmpty()) { + throw new UnsupportedOperationException( + "Jetty HTTP client does not support named pipe transport"); + } + return this; + } + + @Override + public JettyHttpClientBuilder connectTimeoutMillis(long connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + return this; + } + + @Override + public JettyHttpClientBuilder requestTimeoutMillis(long requestTimeoutMillis) { + this.requestTimeoutMillis = requestTimeoutMillis; + return this; + } + + public JettyHttpClientBuilder responseTimeoutMillis(long responseTimeoutMillis) { + this.responseTimeoutMillis = responseTimeoutMillis; + return this; + } + + @Override + public JettyHttpClientBuilder proxy(String proxyHost, int proxyPort) { + return proxy(proxyHost, proxyPort, null, null); + } + + @Override + public JettyHttpClientBuilder proxy( + String proxyHost, + int proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword) { + this.proxyHost = proxyHost; + this.proxyPort = proxyPort; + this.proxyUsername = proxyUsername; + this.proxyPassword = proxyPassword; + return this; + } + + @Override + public JettyHttpClientBuilder retryPolicyFactory(HttpRetryPolicy.Factory retryPolicyFactory) { + this.retryPolicyFactory = retryPolicyFactory; + return this; + } + + public JettyHttpClientBuilder client( + org.eclipse.jetty.client.HttpClient client, boolean closeClientOnClose) { + this.client = client; + this.closeClientOnClose = closeClientOnClose; + return this; + } + + @Override + public HttpClient build() { + validate(); + return new JettyHttpClient( + transport, + unixDomainSocketPath, + connectTimeoutMillis, + requestTimeoutMillis, + responseTimeoutMillis, + proxyHost, + proxyPort, + proxyUsername, + proxyPassword, + retryPolicyFactory, + client, + closeClientOnClose); + } + + private void validate() { + if (connectTimeoutMillis <= 0 || requestTimeoutMillis <= 0 || responseTimeoutMillis <= 0) { + throw new IllegalArgumentException("timeouts must be strictly positive"); + } + if (proxyHost != null && proxyPort == null) { + throw new IllegalArgumentException("proxy port must be provided when proxy host is set"); + } + if (proxyHost != null && transport != HttpTransport.TCP) { + throw new IllegalArgumentException("proxy is currently supported only for TCP transport"); + } + if (transport == HttpTransport.UNIX_DOMAIN_SOCKET + && (unixDomainSocketPath == null || unixDomainSocketPath.isEmpty())) { + throw new IllegalArgumentException( + "unix domain socket path must be set for UNIX_DOMAIN_SOCKET transport"); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClient.java b/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClient.java new file mode 100644 index 00000000000..aec069c9520 --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClient.java @@ -0,0 +1,528 @@ +package datadog.communication.http.client.netty; + +import datadog.common.socket.NamedPipeSocketFactory; +import datadog.common.socket.UnixDomainSocketFactory; +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientRequest; +import datadog.communication.http.client.HttpClientResponse; +import datadog.communication.http.client.HttpTransport; +import datadog.trace.util.AgentProxySelector; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.proxy.HttpProxyHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import javax.net.SocketFactory; + +final class NettyHttpClient implements HttpClient { + private final HttpTransport transport; + private final long connectTimeoutMillis; + private final long readTimeoutMillis; + private final long writeTimeoutMillis; + private final long requestTimeoutMillis; + private final int maxResponseSizeBytes; + private final String proxyHost; + private final Integer proxyPort; + private final String proxyUsername; + private final String proxyPassword; + private final HttpRetryPolicy.Factory retryPolicyFactory; + private final EventLoopGroup eventLoopGroup; + private final boolean closeEventLoopGroupOnClose; + private final SslContext sslContext; + private final SocketFactory socketFactory; + private final Set inFlightChannels = + Collections.newSetFromMap(new ConcurrentHashMap()); + private volatile boolean closed; + + NettyHttpClient( + HttpTransport transport, + @Nullable String unixDomainSocketPath, + @Nullable String namedPipe, + long connectTimeoutMillis, + long readTimeoutMillis, + long writeTimeoutMillis, + long requestTimeoutMillis, + int maxResponseSizeBytes, + @Nullable String proxyHost, + @Nullable Integer proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword, + HttpRetryPolicy.Factory retryPolicyFactory, + @Nullable EventLoopGroup externallyProvidedEventLoopGroup, + boolean closeEventLoopGroupOnClose, + ThreadFactory threadFactory) { + this.transport = transport; + this.connectTimeoutMillis = connectTimeoutMillis; + this.readTimeoutMillis = readTimeoutMillis; + this.writeTimeoutMillis = writeTimeoutMillis; + this.requestTimeoutMillis = requestTimeoutMillis; + this.maxResponseSizeBytes = maxResponseSizeBytes; + this.proxyHost = proxyHost; + this.proxyPort = proxyPort; + this.proxyUsername = proxyUsername; + this.proxyPassword = proxyPassword; + this.retryPolicyFactory = retryPolicyFactory; + + if (externallyProvidedEventLoopGroup != null) { + this.eventLoopGroup = externallyProvidedEventLoopGroup; + this.closeEventLoopGroupOnClose = closeEventLoopGroupOnClose; + } else { + if (transport == HttpTransport.TCP) { + this.eventLoopGroup = new NioEventLoopGroup(1, threadFactory); + } else { + this.eventLoopGroup = new OioEventLoopGroup(1, threadFactory); + } + this.closeEventLoopGroupOnClose = true; + } + + this.sslContext = buildSslContext(); + this.socketFactory = socketFactoryForTransport(transport, unixDomainSocketPath, namedPipe); + } + + @Override + public HttpClientResponse execute(HttpClientRequest request) throws IOException { + if (closed) { + throw new IOException("http client is closed"); + } + try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) { + while (true) { + try { + HttpClientResponse response = executeOnce(request); + if (!retryPolicy.shouldRetry(new ResponseAdapter(response))) { + return response; + } + } catch (Exception e) { + if (!retryPolicy.shouldRetry( + e instanceof IOException ? (IOException) e : new IOException(e))) { + if (e instanceof IOException) { + throw (IOException) e; + } + throw new IOException(e); + } + } + retryPolicy.backoff(); + } + } + } + + private HttpClientResponse executeOnce(HttpClientRequest request) + throws IOException, InterruptedException { + if (transport != HttpTransport.TCP) { + return executeWithSocketTransport(request); + } + + URI uri = request.uri(); + int port = port(uri); + String host = uri.getHost() != null ? uri.getHost() : "localhost"; + + Bootstrap bootstrap = + new Bootstrap() + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeoutMillis); + + CompletableFuture responseFuture = new CompletableFuture<>(); + bootstrap.handler( + new ClientInitializer(uri, readTimeoutMillis, writeTimeoutMillis, responseFuture)); + + ChannelFuture connectFuture = bootstrap.connect(host, port); + try { + if (!connectFuture.await(connectTimeoutMillis, TimeUnit.MILLISECONDS)) { + throw new IOException("connection timeout"); + } + if (!connectFuture.isSuccess()) { + throw new IOException("connection failed", connectFuture.cause()); + } + + Channel channel = connectFuture.channel(); + inFlightChannels.add(channel); + channel + .closeFuture() + .addListener( + ignored -> responseFuture.completeExceptionally(new IOException("channel closed"))); + if (closed) { + throw new IOException("http client is closed"); + } + channel.writeAndFlush(toNettyRequest(request, uri, host, port)); + + return responseFuture.get(requestTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw cause instanceof IOException ? (IOException) cause : new IOException(cause); + } catch (TimeoutException e) { + throw new IOException("request timeout", e); + } finally { + if (connectFuture.channel() != null) { + inFlightChannels.remove(connectFuture.channel()); + connectFuture.channel().close().awaitUninterruptibly(); + } + } + } + + private HttpClientResponse executeWithSocketTransport(HttpClientRequest request) + throws IOException, InterruptedException { + URI uri = request.uri(); + String host = uri.getHost() != null ? uri.getHost() : "localhost"; + int port = port(uri); + + try (Socket socket = createSocket(socketFactory)) { + socket.connect(new InetSocketAddress(host, port), (int) connectTimeoutMillis); + socket.setSoTimeout((int) readTimeoutMillis); + + try (BufferedOutputStream output = new BufferedOutputStream(socket.getOutputStream()); + BufferedInputStream input = new BufferedInputStream(socket.getInputStream())) { + byte[] body = request.body(); + String requestLine = request.method() + " " + rawPathAndQuery(uri) + " HTTP/1.1\r\n"; + output.write(requestLine.getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + output.write( + ("Host: " + hostHeaderValue(uri, host, port) + "\r\n") + .getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + output.write("Connection: close\r\n".getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + for (Map.Entry> header : request.headers().entrySet()) { + for (String value : header.getValue()) { + output.write( + (header.getKey() + ": " + value + "\r\n") + .getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + } + } + output.write( + ("Content-Length: " + body.length + "\r\n\r\n") + .getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + output.write(body); + output.flush(); + + return readSocketHttpResponse(input); + } + } + } + + private HttpClientResponse readSocketHttpResponse(BufferedInputStream input) throws IOException { + String statusLine = readAsciiLine(input); + if (statusLine == null || !statusLine.startsWith("HTTP/")) { + throw new IOException("Invalid HTTP response: missing status line"); + } + String[] statusParts = statusLine.split(" "); + if (statusParts.length < 2) { + throw new IOException("Invalid HTTP response status line: " + statusLine); + } + int statusCode = Integer.parseInt(statusParts[1]); + + Map> headers = new LinkedHashMap<>(); + int contentLength = -1; + while (true) { + String line = readAsciiLine(input); + if (line == null || line.isEmpty()) { + break; + } + int separator = line.indexOf(':'); + if (separator > 0) { + String name = line.substring(0, separator).trim(); + String value = line.substring(separator + 1).trim(); + headers.computeIfAbsent(name, ignored -> new ArrayList<>()).add(value); + if ("Content-Length".equalsIgnoreCase(name)) { + contentLength = Integer.parseInt(value); + } + } + } + + byte[] body; + if (contentLength >= 0) { + body = new byte[contentLength]; + int offset = 0; + while (offset < contentLength) { + int read = input.read(body, offset, contentLength - offset); + if (read < 0) { + break; + } + offset += read; + } + if (offset < contentLength) { + byte[] truncated = new byte[offset]; + System.arraycopy(body, 0, truncated, 0, offset); + body = truncated; + } + } else { + ByteArrayOutputStream bodyBuffer = new ByteArrayOutputStream(); + byte[] buffer = new byte[8 * 1024]; + int read; + while ((read = input.read(buffer)) != -1) { + bodyBuffer.write(buffer, 0, read); + } + body = bodyBuffer.toByteArray(); + } + return new HttpClientResponse(statusCode, headers, body); + } + + private static @Nullable String readAsciiLine(BufferedInputStream input) throws IOException { + ByteArrayOutputStream lineBuffer = new ByteArrayOutputStream(); + int previous = -1; + while (true) { + int current = input.read(); + if (current == -1) { + break; + } + if (previous == '\r' && current == '\n') { + byte[] bytes = lineBuffer.toByteArray(); + return new String( + bytes, 0, Math.max(0, bytes.length - 1), java.nio.charset.StandardCharsets.US_ASCII); + } + lineBuffer.write(current); + previous = current; + } + return lineBuffer.size() == 0 + ? null + : new String(lineBuffer.toByteArray(), java.nio.charset.StandardCharsets.US_ASCII); + } + + private DefaultFullHttpRequest toNettyRequest( + HttpClientRequest request, URI uri, String host, int port) { + HttpMethod method = HttpMethod.valueOf(request.method()); + byte[] body = request.body(); + + DefaultFullHttpRequest nettyRequest = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, method, rawPathAndQuery(uri), Unpooled.wrappedBuffer(body)); + + for (Map.Entry> entry : request.headers().entrySet()) { + for (String value : entry.getValue()) { + nettyRequest.headers().add(entry.getKey(), value); + } + } + + if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) { + nettyRequest.headers().set(HttpHeaderNames.HOST, hostHeaderValue(uri, host, port)); + } + if (!nettyRequest.headers().contains(HttpHeaderNames.CONNECTION)) { + nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + } + if (!nettyRequest.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + nettyRequest.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, body.length); + } + + return nettyRequest; + } + + private static String rawPathAndQuery(URI uri) { + String rawPath = uri.getRawPath(); + if (rawPath == null || rawPath.isEmpty()) { + rawPath = "/"; + } + String rawQuery = uri.getRawQuery(); + return rawQuery == null || rawQuery.isEmpty() ? rawPath : rawPath + "?" + rawQuery; + } + + private String hostHeaderValue(URI uri, String host, int port) { + if (uri.getPort() == -1 || uri.getPort() == defaultPort(uri.getScheme())) { + return host; + } + return host + ':' + port; + } + + private int port(URI uri) { + if (uri.getPort() != -1) { + return uri.getPort(); + } + return defaultPort(uri.getScheme()); + } + + private static int defaultPort(@Nullable String scheme) { + if ("https".equalsIgnoreCase(scheme)) { + return 443; + } + return 80; + } + + private SslContext buildSslContext() { + try { + return SslContextBuilder.forClient().build(); + } catch (Exception e) { + throw new IllegalStateException("Unable to create Netty SSL context", e); + } + } + + private static SocketFactory socketFactoryForTransport( + HttpTransport transport, @Nullable String unixDomainSocketPath, @Nullable String namedPipe) { + if (transport == HttpTransport.UNIX_DOMAIN_SOCKET) { + return new UnixDomainSocketFactory(new java.io.File(unixDomainSocketPath)); + } + if (transport == HttpTransport.NAMED_PIPE) { + return new NamedPipeSocketFactory(namedPipe); + } + return SocketFactory.getDefault(); + } + + private static Socket createSocket(SocketFactory factory) { + try { + return factory.createSocket(); + } catch (IOException e) { + throw new IllegalStateException("Unable to create socket", e); + } + } + + @Override + public void close() { + closed = true; + for (Channel channel : inFlightChannels) { + try { + channel.close().awaitUninterruptibly(); + } catch (Exception ignored) { + } + } + inFlightChannels.clear(); + + if (closeEventLoopGroupOnClose) { + eventLoopGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).awaitUninterruptibly(); + } + } + + private final class ClientInitializer extends ChannelInitializer { + private final URI uri; + private final long readTimeoutMillis; + private final long writeTimeoutMillis; + private final CompletableFuture responseFuture; + + private ClientInitializer( + URI uri, + long readTimeoutMillis, + long writeTimeoutMillis, + CompletableFuture responseFuture) { + this.uri = uri; + this.readTimeoutMillis = readTimeoutMillis; + this.writeTimeoutMillis = writeTimeoutMillis; + this.responseFuture = responseFuture; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + InetSocketAddress proxyAddress = resolveProxyAddress(uri); + if (proxyAddress != null) { + channel + .pipeline() + .addLast( + proxyUsername != null + ? new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword) + : new HttpProxyHandler(proxyAddress)); + } + + if ("https".equalsIgnoreCase(uri.getScheme())) { + channel + .pipeline() + .addLast(sslContext.newHandler(channel.alloc(), uri.getHost(), port(uri))); + } + + channel + .pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(maxResponseSizeBytes)) + .addLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS)) + .addLast(new WriteTimeoutHandler(writeTimeoutMillis, TimeUnit.MILLISECONDS)) + .addLast(new ResponseHandler(responseFuture)); + } + } + + private @Nullable InetSocketAddress resolveProxyAddress(URI uri) { + if (proxyHost != null) { + return new InetSocketAddress(proxyHost, proxyPort); + } + try { + URI selectorUri = new URI(uri.getScheme(), null, uri.getHost(), port(uri), null, null, null); + for (Proxy proxy : AgentProxySelector.INSTANCE.select(selectorUri)) { + if (proxy != null + && proxy.type() == Proxy.Type.HTTP + && proxy.address() instanceof InetSocketAddress) { + return (InetSocketAddress) proxy.address(); + } + } + } catch (URISyntaxException ignored) { + // fall back to no proxy + } + return null; + } + + private static final class ResponseHandler extends SimpleChannelInboundHandler { + private final CompletableFuture responseFuture; + + private ResponseHandler(CompletableFuture responseFuture) { + this.responseFuture = responseFuture; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { + Map> headers = new LinkedHashMap<>(); + for (Map.Entry entry : msg.headers()) { + headers.computeIfAbsent(entry.getKey(), ignored -> new ArrayList<>()).add(entry.getValue()); + } + + byte[] body = new byte[msg.content().readableBytes()]; + msg.content().readBytes(body); + responseFuture.complete(new HttpClientResponse(msg.status().code(), headers, body)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + responseFuture.completeExceptionally(cause); + ctx.close(); + } + } + + private static final class ResponseAdapter implements HttpRetryPolicy.Response { + private final HttpClientResponse response; + + private ResponseAdapter(HttpClientResponse response) { + this.response = response; + } + + @Override + public int code() { + return response.statusCode(); + } + + @Override + public @Nullable String header(String name) { + return response.header(name); + } + } +} diff --git a/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClientBuilder.java b/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClientBuilder.java new file mode 100644 index 00000000000..93243bf4a1e --- /dev/null +++ b/communication/src/main/java/datadog/communication/http/client/netty/NettyHttpClientBuilder.java @@ -0,0 +1,168 @@ +package datadog.communication.http.client.netty; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.HttpClient; +import datadog.communication.http.client.HttpClientBuilder; +import datadog.communication.http.client.HttpTransport; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; + +/** Builder used to configure and create {@link NettyHttpClient}. */ +public final class NettyHttpClientBuilder + implements HttpClientBuilder { + private HttpTransport transport = HttpTransport.TCP; + private String unixDomainSocketPath; + private String namedPipe; + private long connectTimeoutMillis = 10_000; + private long readTimeoutMillis = 10_000; + private long writeTimeoutMillis = 10_000; + private long requestTimeoutMillis = 10_000; + private int maxResponseSizeBytes = 8 * 1024 * 1024; + private String proxyHost; + private Integer proxyPort; + private String proxyUsername; + private String proxyPassword; + private HttpRetryPolicy.Factory retryPolicyFactory = HttpRetryPolicy.Factory.NEVER_RETRY; + private boolean daemonThreads = true; + private String threadNamePrefix = "dd-netty-http"; + private EventLoopGroup eventLoopGroup; + private boolean closeEventLoopGroupOnClose = true; + + @Override + public NettyHttpClientBuilder transport(HttpTransport transport) { + this.transport = transport; + return this; + } + + @Override + public NettyHttpClientBuilder unixDomainSocketPath(@Nullable String unixDomainSocketPath) { + this.unixDomainSocketPath = unixDomainSocketPath; + return this; + } + + @Override + public NettyHttpClientBuilder namedPipe(@Nullable String namedPipe) { + this.namedPipe = namedPipe; + return this; + } + + @Override + public NettyHttpClientBuilder connectTimeoutMillis(long connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + return this; + } + + public NettyHttpClientBuilder readTimeoutMillis(long readTimeoutMillis) { + this.readTimeoutMillis = readTimeoutMillis; + return this; + } + + public NettyHttpClientBuilder writeTimeoutMillis(long writeTimeoutMillis) { + this.writeTimeoutMillis = writeTimeoutMillis; + return this; + } + + @Override + public NettyHttpClientBuilder requestTimeoutMillis(long requestTimeoutMillis) { + this.requestTimeoutMillis = requestTimeoutMillis; + return this; + } + + public NettyHttpClientBuilder maxResponseSizeBytes(int maxResponseSizeBytes) { + this.maxResponseSizeBytes = maxResponseSizeBytes; + return this; + } + + @Override + public NettyHttpClientBuilder proxy(String proxyHost, int proxyPort) { + return proxy(proxyHost, proxyPort, null, null); + } + + @Override + public NettyHttpClientBuilder proxy( + String proxyHost, + int proxyPort, + @Nullable String proxyUsername, + @Nullable String proxyPassword) { + this.proxyHost = proxyHost; + this.proxyPort = proxyPort; + this.proxyUsername = proxyUsername; + this.proxyPassword = proxyPassword; + return this; + } + + @Override + public NettyHttpClientBuilder retryPolicyFactory(HttpRetryPolicy.Factory retryPolicyFactory) { + this.retryPolicyFactory = retryPolicyFactory; + return this; + } + + public NettyHttpClientBuilder daemonThreads(boolean daemonThreads) { + this.daemonThreads = daemonThreads; + return this; + } + + public NettyHttpClientBuilder threadNamePrefix(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + return this; + } + + public NettyHttpClientBuilder eventLoopGroup( + EventLoopGroup eventLoopGroup, boolean closeEventLoopGroupOnClose) { + this.eventLoopGroup = eventLoopGroup; + this.closeEventLoopGroupOnClose = closeEventLoopGroupOnClose; + return this; + } + + @Override + public HttpClient build() { + validate(); + + ThreadFactory threadFactory = new DefaultThreadFactory(threadNamePrefix, daemonThreads); + return new NettyHttpClient( + transport, + unixDomainSocketPath, + namedPipe, + connectTimeoutMillis, + readTimeoutMillis, + writeTimeoutMillis, + requestTimeoutMillis, + maxResponseSizeBytes, + proxyHost, + proxyPort, + proxyUsername, + proxyPassword, + retryPolicyFactory, + eventLoopGroup, + closeEventLoopGroupOnClose, + threadFactory); + } + + private void validate() { + if (connectTimeoutMillis <= 0 + || readTimeoutMillis <= 0 + || writeTimeoutMillis <= 0 + || requestTimeoutMillis <= 0) { + throw new IllegalArgumentException("timeouts must be strictly positive"); + } + if (maxResponseSizeBytes <= 0) { + throw new IllegalArgumentException("maxResponseSizeBytes must be strictly positive"); + } + if (proxyHost != null && proxyPort == null) { + throw new IllegalArgumentException("proxy port must be provided when proxy host is set"); + } + if (proxyHost != null && transport != HttpTransport.TCP) { + throw new IllegalArgumentException("proxy is currently supported only for TCP transport"); + } + if (transport == HttpTransport.UNIX_DOMAIN_SOCKET + && (unixDomainSocketPath == null || unixDomainSocketPath.isEmpty())) { + throw new IllegalArgumentException( + "unix domain socket path must be set for UNIX_DOMAIN_SOCKET transport"); + } + if (transport == HttpTransport.NAMED_PIPE && (namedPipe == null || namedPipe.isEmpty())) { + throw new IllegalArgumentException("named pipe must be set for NAMED_PIPE transport"); + } + } +} diff --git a/communication/src/test/java/datadog/communication/http/client/HttpClientContractTest.java b/communication/src/test/java/datadog/communication/http/client/HttpClientContractTest.java new file mode 100644 index 00000000000..b7996e01531 --- /dev/null +++ b/communication/src/test/java/datadog/communication/http/client/HttpClientContractTest.java @@ -0,0 +1,356 @@ +package datadog.communication.http.client; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeFalse; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.client.ahc.ApacheAsyncHttpClientBuilder; +import datadog.communication.http.client.jetty.JettyHttpClientBuilder; +import datadog.communication.http.client.netty.NettyHttpClientBuilder; +import io.netty.channel.nio.NioEventLoopGroup; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.SocketPolicy; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.opentest4j.TestAbortedException; + +@ExtendWith(HttpClientContractTest.ClientImplementationExtension.class) +public class HttpClientContractTest { + public static final String CLIENT_IMPL_PARAMETER = "datadog.http.client.impl"; + public static final String NETTY = "netty"; + public static final String AHC = "ahc"; + public static final String JETTY = "jetty"; + + private final List closeables = new ArrayList<>(); + private MockWebServer server; + private HttpClientBuilder builder; + + private void useBuilder(HttpClientBuilder implementation) { + this.builder = implementation; + } + + private String clientName() { + return builder.getClass().getSimpleName(); + } + + private boolean supportsUnixDomainSocket() { + return isNetty() || isJetty(); + } + + private boolean isNetty() { + return clientName().equals(NettyHttpClientBuilder.class.getSimpleName()); + } + + private boolean isJetty() { + return clientName().equals(JettyHttpClientBuilder.class.getSimpleName()); + } + + @AfterEach + void afterEach() { + if (server != null) { + try { + server.shutdown(); + } catch (Exception ignored) { + } + } + + for (int i = closeables.size() - 1; i >= 0; i--) { + try { + closeables.get(i).close(); + } catch (Exception ignored) { + } + } + closeables.clear(); + } + + @Test + void shouldSendRequestAndReceiveResponse() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("pong")); + server.start(); + + URI uri = server.url("/ping").uri(); + HttpClientRequest request = + HttpClientRequest.builder(uri, "GET").addHeader("x-test", "1").build(); + + try (HttpClient client = builder.build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertArrayEquals("pong".getBytes(StandardCharsets.UTF_8), response.body(), clientName()); + } + } + + @Test + void shouldRetryOnServerError() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(500)); + server.enqueue(new MockResponse().setResponseCode(200).setBody("ok")); + server.start(); + + URI uri = server.url("/retry").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "POST").body(new byte[] {1}).build(); + + HttpRetryPolicy.Factory retryPolicy = new HttpRetryPolicy.Factory(1, 1, 1.0); + try (HttpClient client = builder.retryPolicyFactory(retryPolicy).build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertEquals(2, server.getRequestCount(), clientName()); + } + } + + @Test + void shouldRetryOn429WithRateLimitReset() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(429).setHeader("x-ratelimit-reset", "0")); + server.enqueue(new MockResponse().setResponseCode(200).setBody("ok")); + server.start(); + + URI uri = server.url("/retry-429").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "POST").body(new byte[] {1}).build(); + + HttpRetryPolicy.Factory retryPolicy = new HttpRetryPolicy.Factory(2, 1, 1.0); + try (HttpClient client = builder.retryPolicyFactory(retryPolicy).build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertEquals(2, server.getRequestCount(), clientName()); + } + } + + @Test + void shouldRetryAfterConnectionIssue() throws Exception { + ServerSocket reservedPort = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1")); + int port = reservedPort.getLocalPort(); + reservedPort.close(); + + URI uri = new URI("http://127.0.0.1:" + port + "/retry-connection"); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + HttpRetryPolicy.Factory retryPolicy = new HttpRetryPolicy.Factory(2, 200, 1.0); + try (HttpClient client = + builder + .retryPolicyFactory(retryPolicy) + .connectTimeoutMillis(100) + .requestTimeoutMillis(2_000) + .build()) { + long startNanos = System.nanoTime(); + IOException exception = assertThrows(IOException.class, () -> client.execute(request)); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + assertTrue(elapsedMillis >= 100, "retry backoff should have been applied for " + clientName()); + assertNotNull(exception.getMessage(), clientName()); + } + } + + @Test + void shouldSendRequestThroughProxy() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("proxied")); + server.start(); + + MockProxy proxy = new MockProxy(); + closeables.add(proxy); + proxy.start(); + + URI uri = server.url("/proxy").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + try (HttpClient client = builder.proxy("127.0.0.1", proxy.port()).build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertArrayEquals("proxied".getBytes(StandardCharsets.UTF_8), response.body(), clientName()); + } + + assertTrue(proxy.awaitConnect(5, TimeUnit.SECONDS), clientName()); + assertTrue( + proxy.connectRequestLine().startsWith("CONNECT ") + || proxy.connectRequestLine().startsWith("GET "), + clientName()); + assertEquals(1, server.getRequestCount(), clientName()); + } + + @Test + void shouldSendProxyAuthorizationWhenCredentialsProvided() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("proxied-auth")); + server.start(); + + MockProxy proxy = new MockProxy(true); + closeables.add(proxy); + proxy.start(); + + URI uri = server.url("/proxy-auth").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + try (HttpClient client = + builder.proxy("127.0.0.1", proxy.port(), "test-user", "test-pass").build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertArrayEquals( + "proxied-auth".getBytes(StandardCharsets.UTF_8), response.body(), clientName()); + } + + assertTrue(proxy.awaitConnect(5, TimeUnit.SECONDS), clientName()); + String authHeader = proxy.connectHeader("Proxy-Authorization"); + assertNotNull(authHeader, clientName()); + String expectedAuth = + "Basic " + + Base64.getEncoder() + .encodeToString("test-user:test-pass".getBytes(StandardCharsets.US_ASCII)); + assertEquals(expectedAuth, authHeader, clientName()); + } + + @Test + void shouldTimeoutWhenServerDoesNotRespond() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)); + server.start(); + + URI uri = server.url("/timeout").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + try (HttpClient client = + builder.requestTimeoutMillis(250).connectTimeoutMillis(250).build()) { + IOException exception = assertThrows(IOException.class, () -> client.execute(request)); + assertTrue( + exception.getMessage().contains("timeout") + || (exception.getCause() != null + && exception.getCause().getMessage() != null + && exception.getCause().getMessage().contains("timeout")), + clientName()); + } + } + + @Test + void shouldFailInFlightRequestWhenClientClosed() throws Exception { + server = new MockWebServer(); + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)); + server.start(); + + URI uri = server.url("/close-in-flight").uri(); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + try (HttpClient client = builder.requestTimeoutMillis(30_000).connectTimeoutMillis(500).build()) { + CompletableFuture responseFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return client.execute(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(200); + client.close(); + + ExecutionException exception = + assertThrows(ExecutionException.class, () -> responseFuture.get(5, TimeUnit.SECONDS)); + assertNotNull(exception.getCause(), clientName()); + assertTrue(exception.getCause() instanceof RuntimeException, clientName()); + assertTrue(exception.getCause().getCause() instanceof IOException, clientName()); + } + } + + @Test + void shouldNotCloseExternallyManagedEventLoopGroupWhenSupported() { + assumeTrue(isNetty(), "Netty-specific lifecycle test"); + + NioEventLoopGroup externalGroup = new NioEventLoopGroup(1); + try (HttpClient ignored = + new NettyHttpClientBuilder().eventLoopGroup(externalGroup, false).build()) { + // no-op + } + assertFalse(externalGroup.isShuttingDown()); + externalGroup.shutdownGracefully().awaitUninterruptibly(); + } + + @Test + @DisabledOnOs(OS.WINDOWS) + void shouldSendRequestThroughUnixDomainSocketWhenSupported() throws Exception { + assumeTrue(supportsUnixDomainSocket(), "UDS supported only by implementations that expose it"); + + Path socketPath = Files.createTempFile("http-client-contract", ".sock"); + Files.deleteIfExists(socketPath); + server = new MockWebServer(); + server.setServerSocketFactory(new UnixDomainServerSocketFactory(socketPath)); + server.enqueue(new MockResponse().setResponseCode(200).setBody("uds-ok")); + server.start(); + + URI uri = new URI("http://localhost/uds"); + HttpClientRequest request = HttpClientRequest.builder(uri, "GET").build(); + + try (HttpClient client = + builder + .transport(HttpTransport.UNIX_DOMAIN_SOCKET) + .unixDomainSocketPath(socketPath.toString()) + .build()) { + HttpClientResponse response = client.execute(request); + assertEquals(200, response.statusCode(), clientName()); + assertArrayEquals("uds-ok".getBytes(StandardCharsets.UTF_8), response.body(), clientName()); + } + } + + @Test + void shouldFailImmediatelyWhenUnixDomainSocketIsUnsupported() { + assumeFalse(supportsUnixDomainSocket(), "Only run for implementations without UDS support"); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> builder.transport(HttpTransport.UNIX_DOMAIN_SOCKET)); + assertTrue(exception.getMessage().contains("TCP"), clientName()); + } + + static final class ClientImplementationExtension implements BeforeEachCallback { + @Override + public void beforeEach(ExtensionContext context) { + String implementationValue = context.getConfigurationParameter(CLIENT_IMPL_PARAMETER).orElse(null); + if (implementationValue == null || implementationValue.isEmpty()) { + throw new TestAbortedException( + "Skipping contract test: missing JUnit configuration parameter: " + + CLIENT_IMPL_PARAMETER); + } + switch (implementationValue) { + case NETTY: + ((HttpClientContractTest) context.getRequiredTestInstance()) + .useBuilder(new NettyHttpClientBuilder()); + break; + case AHC: + ((HttpClientContractTest) context.getRequiredTestInstance()) + .useBuilder(new ApacheAsyncHttpClientBuilder()); + break; + case JETTY: + ((HttpClientContractTest) context.getRequiredTestInstance()) + .useBuilder(new JettyHttpClientBuilder()); + break; + default: + throw new IllegalStateException("Unsupported implementation: " + implementationValue); + } + } + } +} diff --git a/communication/src/test/java/datadog/communication/http/client/HttpClientFacadeSuites.java b/communication/src/test/java/datadog/communication/http/client/HttpClientFacadeSuites.java new file mode 100644 index 00000000000..f537ede163a --- /dev/null +++ b/communication/src/test/java/datadog/communication/http/client/HttpClientFacadeSuites.java @@ -0,0 +1,33 @@ +package datadog.communication.http.client; + +import org.junit.platform.suite.api.ConfigurationParameter; +import org.junit.platform.suite.api.SelectClasses; +import org.junit.platform.suite.api.Suite; +import org.junit.platform.suite.api.SuiteDisplayName; + +import static datadog.communication.http.client.HttpClientContractTest.AHC; +import static datadog.communication.http.client.HttpClientContractTest.CLIENT_IMPL_PARAMETER; +import static datadog.communication.http.client.HttpClientContractTest.JETTY; +import static datadog.communication.http.client.HttpClientContractTest.NETTY; + +public final class HttpClientFacadeSuites { + private HttpClientFacadeSuites() {} + + @Suite + @SuiteDisplayName("HTTP Facade Suite [netty]") + @SelectClasses(HttpClientContractTest.class) + @ConfigurationParameter(key = CLIENT_IMPL_PARAMETER, value = NETTY) + public static class NettyHttpClientTest {} + + @Suite + @SuiteDisplayName("HTTP Facade Suite [apache-async-http-client5]") + @SelectClasses(HttpClientContractTest.class) + @ConfigurationParameter(key = CLIENT_IMPL_PARAMETER, value = AHC) + public static class ApacheAsyncHttpClientTest {} + + @Suite + @SuiteDisplayName("HTTP Facade Suite [jetty-http-client]") + @SelectClasses(HttpClientContractTest.class) + @ConfigurationParameter(key = CLIENT_IMPL_PARAMETER, value = JETTY) + public static class JettyHttpClientTest {} +} diff --git a/communication/src/test/java/datadog/communication/http/client/MockProxy.java b/communication/src/test/java/datadog/communication/http/client/MockProxy.java new file mode 100644 index 00000000000..7c8331fe022 --- /dev/null +++ b/communication/src/test/java/datadog/communication/http/client/MockProxy.java @@ -0,0 +1,225 @@ +package datadog.communication.http.client; + +import java.io.*; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +final class MockProxy implements Closeable { + private final ServerSocket serverSocket; + private final CountDownLatch connectLatch = new CountDownLatch(1); + private final AtomicReference connectLine = new AtomicReference(); + private final AtomicReference> connectHeaders = + new AtomicReference>(Collections.emptyMap()); + private volatile boolean running = true; + private final boolean requireAuth; + + public MockProxy() throws IOException { + this(false); + } + + public MockProxy(boolean requireAuth) throws IOException { + this.requireAuth = requireAuth; + serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1")); + } + + public int port() { + return serverSocket.getLocalPort(); + } + + public void start() { + Thread acceptThread = + new Thread( + () -> { + while (running) { + try { + Socket clientSocket = serverSocket.accept(); + handleClient(clientSocket); + } catch (IOException ignored) { + return; + } + } + }, + "proxy-tunnel-server"); + acceptThread.setDaemon(true); + acceptThread.start(); + } + + private void handleClient(Socket clientSocket) { + Thread handler = + new Thread( + () -> { + try (Socket client = clientSocket) { + BufferedReader reader = + new BufferedReader( + new InputStreamReader( + client.getInputStream(), StandardCharsets.US_ASCII)); + OutputStream clientOut = client.getOutputStream(); + + String firstLine = reader.readLine(); + connectLine.set(firstLine); + connectLatch.countDown(); + + if (firstLine == null || !firstLine.startsWith("CONNECT ")) { + handleForwardProxyRequest(firstLine, reader, clientOut); + return; + } + + Map headers = new LinkedHashMap(); + while (true) { + String line = reader.readLine(); + if (line == null || line.isEmpty()) { + break; + } + int idx = line.indexOf(':'); + if (idx > 0) { + headers.put(line.substring(0, idx).trim(), line.substring(idx + 1).trim()); + } + } + connectHeaders.set(headers); + if (requireAuth && connectHeaderFrom(headers, "Proxy-Authorization") == null) { + clientOut.write( + "HTTP/1.1 407 Proxy Authentication Required\r\n" + .getBytes(StandardCharsets.US_ASCII)); + clientOut.write( + "Proxy-Authenticate: Basic realm=\"test\"\r\n\r\n" + .getBytes(StandardCharsets.US_ASCII)); + clientOut.flush(); + return; + } + + String authority = firstLine.split(" ")[1]; + String[] hostPort = authority.split(":"); + try (Socket upstream = new Socket(hostPort[0], Integer.parseInt(hostPort[1]))) { + clientOut.write( + "HTTP/1.1 200 Connection established\r\nProxy-Agent: test\r\n\r\n" + .getBytes(StandardCharsets.US_ASCII)); + clientOut.flush(); + + Thread upstreamToClient = + new Thread( + () -> transfer(upstream, client), "proxy-upstream-to-client-thread"); + upstreamToClient.setDaemon(true); + upstreamToClient.start(); + transfer(client, upstream); + } + } catch (Exception ignored) { + } + }, + "proxy-client-handler"); + handler.setDaemon(true); + handler.start(); + } + + private void handleForwardProxyRequest( + String firstLine, BufferedReader reader, OutputStream clientOut) throws IOException { + Map headers = new LinkedHashMap(); + while (true) { + String line = reader.readLine(); + if (line == null || line.isEmpty()) { + break; + } + int idx = line.indexOf(':'); + if (idx > 0) { + headers.put(line.substring(0, idx).trim(), line.substring(idx + 1).trim()); + } + } + connectHeaders.set(headers); + if (requireAuth && connectHeaderFrom(headers, "Proxy-Authorization") == null) { + clientOut.write( + "HTTP/1.1 407 Proxy Authentication Required\r\n".getBytes(StandardCharsets.US_ASCII)); + clientOut.write( + "Proxy-Authenticate: Basic realm=\"test\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII)); + clientOut.flush(); + return; + } + + String[] parts = firstLine.split(" "); + URI target = URI.create(parts[1]); + int port = target.getPort() == -1 ? 80 : target.getPort(); + String path = target.getRawPath(); + if (path == null || path.isEmpty()) { + path = "/"; + } + if (target.getRawQuery() != null && !target.getRawQuery().isEmpty()) { + path += "?" + target.getRawQuery(); + } + + try (Socket upstream = new Socket(target.getHost(), port)) { + OutputStream upstreamOut = upstream.getOutputStream(); + upstreamOut.write((parts[0] + " " + path + " HTTP/1.1\r\n").getBytes(StandardCharsets.US_ASCII)); + for (Map.Entry header : headers.entrySet()) { + upstreamOut.write( + (header.getKey() + ": " + header.getValue() + "\r\n") + .getBytes(StandardCharsets.US_ASCII)); + } + upstreamOut.write("\r\n".getBytes(StandardCharsets.US_ASCII)); + upstreamOut.flush(); + + transfer(upstream, clientOut); + } + } + + private static void transfer(Socket source, Socket destination) { + try { + InputStream inputStream = source.getInputStream(); + OutputStream outputStream = destination.getOutputStream(); + byte[] buffer = new byte[8 * 1024]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, read); + outputStream.flush(); + } + } catch (IOException ignored) { + } + } + + private static void transfer(Socket source, OutputStream destination) { + try { + InputStream inputStream = source.getInputStream(); + byte[] buffer = new byte[8 * 1024]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + destination.write(buffer, 0, read); + destination.flush(); + } + } catch (IOException ignored) { + } + } + + public boolean awaitConnect(long timeout, TimeUnit unit) throws InterruptedException { + return connectLatch.await(timeout, unit); + } + + public String connectRequestLine() { + return connectLine.get(); + } + + public String connectHeader(String name) { + Map headers = connectHeaders.get(); + return connectHeaderFrom(headers, name); + } + + private static String connectHeaderFrom(Map headers, String name) { + for (Map.Entry entry : headers.entrySet()) { + if (entry.getKey().equalsIgnoreCase(name)) { + return entry.getValue(); + } + } + return null; + } + + @Override + public void close() throws IOException { + running = false; + serverSocket.close(); + } +} diff --git a/communication/src/test/java/datadog/communication/http/client/UnixDomainServerSocketFactory.java b/communication/src/test/java/datadog/communication/http/client/UnixDomainServerSocketFactory.java new file mode 100644 index 00000000000..c03846c1387 --- /dev/null +++ b/communication/src/test/java/datadog/communication/http/client/UnixDomainServerSocketFactory.java @@ -0,0 +1,81 @@ +package datadog.communication.http.client; + +import jnr.unixsocket.UnixServerSocketChannel; +import jnr.unixsocket.UnixSocketAddress; + +import javax.net.ServerSocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; + +final class UnixDomainServerSocketFactory extends ServerSocketFactory { + private final Path socketPath; + + UnixDomainServerSocketFactory(Path socketPath) { + this.socketPath = socketPath; + } + + @Override + public ServerSocket createServerSocket() throws IOException { + Path socketPath1 = socketPath; + return new ServerSocket() { + private final Path socketPath = socketPath1; + private UnixServerSocketChannel serverChannel; + + @Override + public void bind(SocketAddress endpoint, int backlog) throws IOException { + if (serverChannel != null) { + return; + } + Files.deleteIfExists(socketPath); + serverChannel = UnixServerSocketChannel.open(); + serverChannel.socket().bind(new UnixSocketAddress(socketPath.toFile())); + } + + @Override + public Socket accept() throws IOException { + if (serverChannel == null) { + throw new IOException("socket is not bound"); + } + return serverChannel.accept().socket(); + } + + @Override + public int getLocalPort() { + // Dummy port value expected by MockWebServer. + return 1; + } + + @Override + public synchronized void close() throws IOException { + try { + if (serverChannel != null) { + serverChannel.close(); + } + } finally { + Files.deleteIfExists(socketPath); + } + } + }; + } + + @Override + public ServerSocket createServerSocket(int port) throws IOException { + return createServerSocket(); + } + + @Override + public ServerSocket createServerSocket(int port, int backlog) throws IOException { + return createServerSocket(); + } + + @Override + public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) + throws IOException { + return createServerSocket(); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8c6bcb2ec13..8b251cccf01 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,10 @@ jafar = "0.16.0" # Web & Network jnr-unixsocket = "0.38.24" +netty = "4.1.122.Final" +apache-httpclient5 = "5.5" +jetty-httpclient = "9.4.56.v20240826" +jetty-unixsocket = "9.4.56.v20240826" okhttp-legacy = "[3.0,3.12.12]" # 3.12.x is last version to support Java7 okio = "1.17.6" # Datadog fork @@ -122,6 +126,12 @@ jafar-tools = { module = "io.btrace:jafar-tools", version.ref = "jafar" } # Web & Network okio = { module = "com.datadoghq.okio:okio", version.ref = "okio" } jnr-unixsocket = { module = "com.github.jnr:jnr-unixsocket", version.ref = "jnr-unixsocket"} +netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" } +netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" } +netty-handler-proxy = { module = "io.netty:netty-handler-proxy", version.ref = "netty" } +apache-httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5", version.ref = "apache-httpclient5" } +jetty-httpclient = { module = "org.eclipse.jetty:jetty-client", version.ref = "jetty-httpclient" } +jetty-unixsocket = { module = "org.eclipse.jetty:jetty-unixsocket", version.ref = "jetty-unixsocket" } # Cryptography cafe-crypto-ed25519 = { module = "cafe.cryptography:ed25519-elisabeth", version.ref = "cafe_crypto" } @@ -155,6 +165,7 @@ junit-jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "jun junit-jupiter-params = { module = "org.junit.jupiter:junit-jupiter-params", version.ref = "junit5" } junit-jupiter-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit5" } junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher", version.ref = "junit-platform" } +junit-platform-suite = { module = "org.junit.platform:junit-platform-suite", version.ref = "junit-platform" } mokito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" } mokito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" } objenesis = { module = "org.objenesis:objenesis", version = "3.3" } # Used by Spock for mocking: