diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 89a632bfaf8..0916adba0fe 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { _transferLifeCycle = transferLifeCycle; } + protected TransferLifeCycle getTransferLifeCycle() { + return _transferLifeCycle; + } + @Override public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor) diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java index f7b815cd20e..89f8e4c9767 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java @@ -19,6 +19,7 @@ import com.google.common.base.Splitter; import diskCacheV111.util.CacheException; +import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.ProtocolInfo; import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo; import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo; @@ -58,8 +59,11 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpRequestExecutor; +import org.dcache.pool.movers.Mover; import org.dcache.pool.movers.MoverProtocol; +import org.dcache.pool.movers.MoverProtocolMover; import org.dcache.pool.movers.RemoteHttpDataTransferProtocol; +import org.dcache.pool.repository.ReplicaDescriptor; import org.dcache.security.trust.AggregateX509TrustManager; import org.dcache.util.Version; import org.slf4j.Logger; @@ -69,6 +73,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static dmg.util.Exceptions.meaningfulMessage; +import dmg.cells.nucleus.CellPath; + public class RemoteHttpTransferService extends SecureRemoteTransferService { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class); @@ -128,6 +134,17 @@ public HttpUriRequest getRedirect(final HttpRequest request, private X509TrustManager trustManager; private CloseableHttpClient sharedClient; + @Override + public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, + CellPath pathToDoor) throws CacheException { + Mover mover = super.createMover(handle, message, pathToDoor); + if (mover instanceof MoverProtocolMover mpm + && mpm.getMover() instanceof RemoteHttpDataTransferProtocol rhtp) { + rhtp.setSubject(message.getSubject()); + } + return mover; + } + @Override protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception { if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) { @@ -145,7 +162,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception SSLContext context = buildSSLContext(credential.getKeyManager()); CloseableHttpClient client = createClient(context); - return new RemoteHttpDataTransferProtocol(client) { + return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) { @Override protected void afterTransfer() { super.afterTransfer(); @@ -159,7 +176,7 @@ protected void afterTransfer() { } } - return new RemoteHttpDataTransferProtocol(sharedClient); + return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle()); } @PostConstruct diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java b/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java index 47d5f7eda11..837d4dfd663 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java @@ -16,6 +16,7 @@ import diskCacheV111.util.CacheException; import diskCacheV111.util.ThirdPartyTransferFailedCacheException; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.ProtocolInfo; import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo; import java.io.IOException; @@ -38,7 +39,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import javax.security.auth.Subject; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpInetConnection; @@ -213,9 +216,22 @@ private enum HeaderFlags { private Long _expectedTransferSize; private InetSocketAddress _localEndpoint; + private final TransferLifeCycle _transferLifeCycle; + private Subject _subject; + private boolean _startMarkerSent; public RemoteHttpDataTransferProtocol(CloseableHttpClient client) { + this(client, null); + } + + public RemoteHttpDataTransferProtocol(CloseableHttpClient client, + @Nullable TransferLifeCycle transferLifeCycle) { _client = requireNonNull(client); + _transferLifeCycle = transferLifeCycle; + } + + public void setSubject(Subject subject) { + _subject = subject; } private static void checkThat(boolean isOk, String message) throws CacheException { @@ -540,6 +556,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf CloseableHttpResponse response = _client.execute(get, context); _localEndpoint = localAddress().orElse(null); + startFlowMarker(); boolean isSuccessful = false; try { @@ -605,6 +622,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info) try (CloseableHttpResponse response = _client.execute(put, context)) { _localEndpoint = localAddress().orElse(null); + startFlowMarker(); StatusLine status = response.getStatusLine(); switch (status.getStatusCode()) { case 200: /* OK (not actually a valid response from PUT) */ @@ -981,6 +999,27 @@ public Long getBytesExpected() { return _expectedTransferSize; } + private void startFlowMarker() { + if (_startMarkerSent || _transferLifeCycle == null + || _localEndpoint == null || _subject == null) { + return; + } + + ProtocolInfo protocolInfo = _channel.getProtocolInfo(); + if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) { + return; + } + + InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress(); + if (remoteEndpoint == null) { + return; + } + + _transferLifeCycle.onStart(remoteEndpoint, _localEndpoint, + protocolInfo, _subject); + _startMarkerSent = true; + } + @Override public Optional getLocalEndpoint() { return Optional.ofNullable(_localEndpoint);