From 873c4918e048d1fd01ef697145114525f9feb4e8 Mon Sep 17 00:00:00 2001 From: Shawn McKee Date: Wed, 8 Apr 2026 11:04:24 -0400 Subject: [PATCH] pool: emit firefly onStart marker from RemoteHttpDataTransferProtocol Move the firefly flow-start marker emission from AbstractMoverProtocolTransferService.MoverTask into RemoteHttpDataTransferProtocol, where the actual HTTP connection's local socket address (correct IP and port) is available. Previously, the start marker was emitted in MoverTask.run() before the HTTP connection was established, using NetworkUtils.getLocalAddress() to derive the local endpoint. This produced the wrong port (0) and could select the wrong interface on multi-homed hosts. Now, RemoteHttpTransferService passes the TransferLifeCycle to RemoteHttpDataTransferProtocol at construction time and sets the Subject via the overridden createMover(). The protocol calls onStart() in doGet() and sendFile() immediately after capturing the local endpoint from HttpInetConnection, which provides the real bound address and port. Signed-off-by: Shawn McKee --- .../AbstractMoverProtocolTransferService.java | 4 ++ .../classic/RemoteHttpTransferService.java | 21 +++++++++- .../RemoteHttpDataTransferProtocol.java | 39 +++++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 89a632bfaf8..0916adba0fe 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { _transferLifeCycle = transferLifeCycle; } + protected TransferLifeCycle getTransferLifeCycle() { + return _transferLifeCycle; + } + @Override public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor) diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java index f7b815cd20e..89f8e4c9767 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java @@ -19,6 +19,7 @@ import com.google.common.base.Splitter; import diskCacheV111.util.CacheException; +import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.ProtocolInfo; import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo; import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo; @@ -58,8 +59,11 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpRequestExecutor; +import org.dcache.pool.movers.Mover; import org.dcache.pool.movers.MoverProtocol; +import org.dcache.pool.movers.MoverProtocolMover; import org.dcache.pool.movers.RemoteHttpDataTransferProtocol; +import org.dcache.pool.repository.ReplicaDescriptor; import org.dcache.security.trust.AggregateX509TrustManager; import org.dcache.util.Version; import org.slf4j.Logger; @@ -69,6 +73,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static dmg.util.Exceptions.meaningfulMessage; +import dmg.cells.nucleus.CellPath; + public class RemoteHttpTransferService extends SecureRemoteTransferService { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class); @@ -128,6 +134,17 @@ public HttpUriRequest getRedirect(final HttpRequest request, private X509TrustManager trustManager; private CloseableHttpClient sharedClient; + @Override + public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, + CellPath pathToDoor) throws CacheException { + Mover mover = super.createMover(handle, message, pathToDoor); + if (mover instanceof MoverProtocolMover mpm + && mpm.getMover() instanceof RemoteHttpDataTransferProtocol rhtp) { + rhtp.setSubject(message.getSubject()); + } + return mover; + } + @Override protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception { if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) { @@ -145,7 +162,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception SSLContext context = buildSSLContext(credential.getKeyManager()); CloseableHttpClient client = createClient(context); - return new RemoteHttpDataTransferProtocol(client) { + return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) { @Override protected void afterTransfer() { super.afterTransfer(); @@ -159,7 +176,7 @@ protected void afterTransfer() { } } - return new RemoteHttpDataTransferProtocol(sharedClient); + return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle()); } @PostConstruct diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java b/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java index 47d5f7eda11..837d4dfd663 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java @@ -16,6 +16,7 @@ import diskCacheV111.util.CacheException; import diskCacheV111.util.ThirdPartyTransferFailedCacheException; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.ProtocolInfo; import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo; import java.io.IOException; @@ -38,7 +39,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import javax.security.auth.Subject; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpInetConnection; @@ -213,9 +216,22 @@ private enum HeaderFlags { private Long _expectedTransferSize; private InetSocketAddress _localEndpoint; + private final TransferLifeCycle _transferLifeCycle; + private Subject _subject; + private boolean _startMarkerSent; public RemoteHttpDataTransferProtocol(CloseableHttpClient client) { + this(client, null); + } + + public RemoteHttpDataTransferProtocol(CloseableHttpClient client, + @Nullable TransferLifeCycle transferLifeCycle) { _client = requireNonNull(client); + _transferLifeCycle = transferLifeCycle; + } + + public void setSubject(Subject subject) { + _subject = subject; } private static void checkThat(boolean isOk, String message) throws CacheException { @@ -540,6 +556,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf CloseableHttpResponse response = _client.execute(get, context); _localEndpoint = localAddress().orElse(null); + startFlowMarker(); boolean isSuccessful = false; try { @@ -605,6 +622,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info) try (CloseableHttpResponse response = _client.execute(put, context)) { _localEndpoint = localAddress().orElse(null); + startFlowMarker(); StatusLine status = response.getStatusLine(); switch (status.getStatusCode()) { case 200: /* OK (not actually a valid response from PUT) */ @@ -981,6 +999,27 @@ public Long getBytesExpected() { return _expectedTransferSize; } + private void startFlowMarker() { + if (_startMarkerSent || _transferLifeCycle == null + || _localEndpoint == null || _subject == null) { + return; + } + + ProtocolInfo protocolInfo = _channel.getProtocolInfo(); + if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) { + return; + } + + InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress(); + if (remoteEndpoint == null) { + return; + } + + _transferLifeCycle.onStart(remoteEndpoint, _localEndpoint, + protocolInfo, _subject); + _startMarkerSent = true; + } + @Override public Optional getLocalEndpoint() { return Optional.ofNullable(_localEndpoint);