From 720554dea07b7b29a9544ee0e77b8f3c19460546 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 1 Mar 2026 09:09:02 +0100 Subject: [PATCH 1/2] Inject Clock into connection pools to make time-based behavior testable --- .../org/apache/hc/core5/pool/LaxConnPool.java | 54 ++++-- .../hc/core5/pool/RouteSegmentedConnPool.java | 31 ++- .../apache/hc/core5/pool/StrictConnPool.java | 53 ++++-- .../apache/hc/core5/pool/PoolTestSupport.java | 112 +++++++++++ .../pool/TestConnPoolClockInjection.java | 179 ++++++++++++++++++ 5 files changed, 396 insertions(+), 33 deletions(-) create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java index 9478fdcfa..5692ea60f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java @@ -26,6 +26,7 @@ */ package org.apache.hc.core5.pool; +import java.time.Clock; import java.util.Deque; import java.util.HashSet; import java.util.Iterator; @@ -50,6 +51,7 @@ import org.apache.hc.core5.concurrent.Cancellable; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; +import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; import org.apache.hc.core5.util.Args; @@ -78,6 +80,9 @@ public class LaxConnPool implements ManagedConnPool private final ConcurrentMap> routeToPool; private final AtomicBoolean isShutDown; + private final Clock clock; + private final Supplier currentTimeSupplier; + private volatile int defaultMaxPerRoute; /** @@ -89,6 +94,16 @@ public LaxConnPool( final PoolReusePolicy policy, final DisposalCallback disposalCallback, final ConnPoolListener connPoolListener) { + this(defaultMaxPerRoute, timeToLive, policy, disposalCallback, connPoolListener, Clock.systemUTC()); + } + + LaxConnPool( + final int defaultMaxPerRoute, + final TimeValue timeToLive, + final PoolReusePolicy policy, + final DisposalCallback disposalCallback, + final ConnPoolListener connPoolListener, + final Clock clock) { super(); Args.positive(defaultMaxPerRoute, "Max per route value"); this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive); @@ -98,6 +113,8 @@ public LaxConnPool( this.routeToPool = new ConcurrentHashMap<>(); this.isShutDown = new AtomicBoolean(); this.defaultMaxPerRoute = defaultMaxPerRoute; + this.clock = Args.notNull(clock, "clock"); + this.currentTimeSupplier = this.clock::millis; } /** @@ -145,7 +162,9 @@ private PerRoutePool getPool(final T route) { policy, this, disposalCallback, - connPoolListener); + connPoolListener, + clock, + currentTimeSupplier); routePool = routeToPool.putIfAbsent(route, newRoutePool); if (routePool == null) { routePool = newRoutePool; @@ -266,7 +285,7 @@ public void enumLeased(final Callback> callback) { @Override public void closeIdle(final TimeValue idleTime) { - final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0); + final long deadline = clock.millis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0); enumAvailable(entry -> { if (entry.getUpdated() <= deadline) { entry.discardConnection(CloseMode.GRACEFUL); @@ -276,7 +295,7 @@ public void closeIdle(final TimeValue idleTime) { @Override public void closeExpired() { - final long now = System.currentTimeMillis(); + final long now = clock.millis(); enumAvailable(entry -> { if (entry.getExpiryDeadline().isBefore(now)) { entry.discardConnection(CloseMode.GRACEFUL); @@ -306,11 +325,11 @@ static class LeaseRequest implements Cancellable { LeaseRequest( final Object state, - final Timeout requestTimeout, + final Deadline deadline, final BasicFuture> future) { super(); this.state = state; - this.deadline = Deadline.calculate(requestTimeout); + this.deadline = deadline; this.future = future; } @@ -362,6 +381,9 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL } private final AtomicInteger allocated; private final AtomicLong releaseSeqNum; + private final Clock clock; + private final Supplier currentTimeSupplier; + private volatile int max; PerRoutePool( @@ -371,7 +393,9 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL } final PoolReusePolicy policy, final ConnPoolStats connPoolStats, final DisposalCallback disposalCallback, - final ConnPoolListener connPoolListener) { + final ConnPoolListener connPoolListener, + final Clock clock, + final Supplier currentTimeSupplier) { super(); this.route = route; this.timeToLive = timeToLive; @@ -386,6 +410,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL } this.allocated = new AtomicInteger(0); this.releaseSeqNum = new AtomicLong(0); this.max = max; + this.clock = clock; + this.currentTimeSupplier = currentTimeSupplier; } public void shutdown(final CloseMode closeMode) { @@ -412,7 +438,7 @@ private PoolEntry createPoolEntry() { prev = allocated.get(); next = (prev < poolMax) ? prev + 1 : prev; } while (!allocated.compareAndSet(prev, next)); - return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback) : null; + return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback, currentTimeSupplier) : null; } private void deallocatePoolEntry() { @@ -437,12 +463,13 @@ private void removeLeased(final PoolEntry entry) { } private PoolEntry getAvailableEntry(final Object state) { + final long now = clock.millis(); for (final Iterator>> it = available.iterator(); it.hasNext(); ) { final AtomicMarkableReference> ref = it.next(); final PoolEntry entry = ref.getReference(); if (ref.compareAndSet(entry, entry, false, true)) { it.remove(); - if (entry.getExpiryDeadline().isExpired() || !Objects.equals(entry.getState(), state)) { + if (entry.getExpiryDeadline().isBefore(now) || !Objects.equals(entry.getState(), state)) { entry.discardConnection(CloseMode.GRACEFUL); deallocatePoolEntry(); } else { @@ -484,7 +511,7 @@ public PoolEntry get( addLeased(entry); future.completed(entry); } else { - pending.add(new LeaseRequest<>(state, requestTimeout, future)); + pending.add(new LeaseRequest<>(state, Deadline.calculate(clock.millis(), requestTimeout), future)); if (releaseState != releaseSeqNum.get()) { servicePendingRequest(); } @@ -494,7 +521,8 @@ public PoolEntry get( public void release(final PoolEntry releasedEntry, final boolean reusable) { removeLeased(releasedEntry); - if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) { + final long now = clock.millis(); + if (!reusable || releasedEntry.getExpiryDeadline().isBefore(now)) { releasedEntry.discardConnection(CloseMode.GRACEFUL); } if (releasedEntry.hasConnection()) { @@ -529,8 +557,9 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy } final Object state = leaseRequest.getState(); final Deadline deadline = leaseRequest.getDeadline(); + final long now = clock.millis(); - if (deadline.isExpired()) { + if (deadline.isBefore(now)) { leaseRequest.failed(DeadlineTimeoutException.from(deadline)); } else { final long releaseState = releaseSeqNum.get(); @@ -558,6 +587,7 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy } public void validatePendingRequests() { + final long now = clock.millis(); final Iterator> it = pending.iterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); @@ -566,7 +596,7 @@ public void validatePendingRequests() { it.remove(); } else { final Deadline deadline = request.getDeadline(); - if (deadline.isExpired()) { + if (deadline.isBefore(now)) { request.failed(DeadlineTimeoutException.from(deadline)); } if (request.isDone()) { diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java index 667519137..13e83d9b4 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java @@ -27,6 +27,7 @@ package org.apache.hc.core5.pool; import java.io.IOException; +import java.time.Clock; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; @@ -54,6 +55,7 @@ import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; import org.apache.hc.core5.util.Args; @@ -104,6 +106,9 @@ public final class RouteSegmentedConnPool implement private final ScheduledThreadPoolExecutor timeouts; + private final Clock clock; + private final Supplier currentTimeSupplier; + /** * Dedicated executor for asynchronous, best-effort disposal. * Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread. @@ -131,6 +136,17 @@ public RouteSegmentedConnPool( final PoolReusePolicy reusePolicy, final DisposalCallback disposal, final ConnPoolListener connPoolListener) { + this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, connPoolListener, Clock.systemUTC()); + } + + RouteSegmentedConnPool( + final int defaultMaxPerRoute, + final int maxTotal, + final TimeValue timeToLive, + final PoolReusePolicy reusePolicy, + final DisposalCallback disposal, + final ConnPoolListener connPoolListener, + final Clock clock) { this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5); this.maxTotal.set(maxTotal > 0 ? maxTotal : 25); @@ -148,6 +164,9 @@ public RouteSegmentedConnPool( this.disposal = Args.notNull(disposal, "disposal"); this.connPoolListener = connPoolListener; + this.clock = Args.notNull(clock, "clock"); + this.currentTimeSupplier = this.clock::millis; + final ThreadFactory tf = r -> { final Thread t = new Thread(r, "seg-pool-timeouts"); t.setDaemon(true); @@ -231,7 +250,7 @@ public Future> lease( if (hit == null) { break; } - final long now = System.currentTimeMillis(); + final long now = clock.millis(); if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) { discardAndDecr(hit, CloseMode.GRACEFUL); continue; @@ -248,7 +267,7 @@ public Future> lease( // 2) Try to allocate new within caps if (tryAllocateOne(route, seg)) { - final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal, currentTimeSupplier); fireOnLease(route); if (callback != null) { callback.completed(entry); @@ -326,7 +345,7 @@ public void release(final PoolEntry entry, final boolean reusable) { return; } - final long now = System.currentTimeMillis(); + final long now = clock.millis(); final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now); if (stillValid) { @@ -391,7 +410,7 @@ public void close(final CloseMode closeMode) { @Override public void closeIdle(final TimeValue idleTime) { - final long cutoff = System.currentTimeMillis() + final long cutoff = clock.millis() - Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L); for (final Map.Entry e : segments.entrySet()) { @@ -417,7 +436,7 @@ public void closeIdle(final TimeValue idleTime) { @Override public void closeExpired() { - final long now = System.currentTimeMillis(); + final long now = clock.millis(); for (final Map.Entry e : segments.entrySet()) { final R route = e.getKey(); @@ -731,7 +750,7 @@ private void serveRoundRobin(final int budget) { seg.allocated.decrementAndGet(); totalAllocated.decrementAndGet(); } else { - final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal, currentTimeSupplier); cancelTimeout(w); w.complete(entry); fireOnLease(w.route); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java index f5c3e4285..794fa3233 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java @@ -26,6 +26,7 @@ */ package org.apache.hc.core5.pool; +import java.time.Clock; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -47,6 +48,7 @@ import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; +import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; import org.apache.hc.core5.util.Args; @@ -80,6 +82,9 @@ public class StrictConnPool implements ManagedConnP private final ReentrantLock lock; private final AtomicBoolean isShutDown; + private final Clock clock; + private final Supplier currentTimeSupplier; + private volatile int defaultMaxPerRoute; private volatile int maxTotal; @@ -93,6 +98,17 @@ public StrictConnPool( final PoolReusePolicy policy, final DisposalCallback disposalCallback, final ConnPoolListener connPoolListener) { + this(defaultMaxPerRoute, maxTotal, timeToLive, policy, disposalCallback, connPoolListener, Clock.systemUTC()); + } + + StrictConnPool( + final int defaultMaxPerRoute, + final int maxTotal, + final TimeValue timeToLive, + final PoolReusePolicy policy, + final DisposalCallback disposalCallback, + final ConnPoolListener connPoolListener, + final Clock clock) { super(); Args.positive(defaultMaxPerRoute, "Max per route value"); Args.positive(maxTotal, "Max total value"); @@ -110,6 +126,8 @@ public StrictConnPool( this.isShutDown = new AtomicBoolean(); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; + this.clock = Args.notNull(clock, "clock"); + this.currentTimeSupplier = this.clock::millis; } /** @@ -157,7 +175,7 @@ public void close() { } private PerRoutePool getPool(final T route) { - return this.routeToPool.computeIfAbsent(route, r -> new PerRoutePool<>(route, this.disposalCallback, this.policy)); + return this.routeToPool.computeIfAbsent(route, r -> new PerRoutePool<>(r, this.disposalCallback, this.policy, this.currentTimeSupplier)); } @Override @@ -168,7 +186,7 @@ public Future> lease( Args.notNull(route, "Route"); Args.notNull(requestTimeout, "Request timeout"); Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); - final Deadline deadline = Deadline.calculate(requestTimeout); + final Deadline deadline = Deadline.calculate(this.clock.millis(), requestTimeout); final BasicFuture> future = new BasicFuture>(callback) { @Override @@ -200,7 +218,7 @@ public PoolEntry get( if (acquiredLock) { try { - final LeaseRequest request = new LeaseRequest<>(route, state, requestTimeout, future); + final LeaseRequest request = new LeaseRequest<>(route, state, deadline, future); final boolean completed = processPendingRequest(request); if (!request.isDone() && !completed) { this.pendingRequests.add(request); @@ -312,8 +330,9 @@ private boolean processPendingRequest(final LeaseRequest request) { final T route = request.getRoute(); final Object state = request.getState(); final Deadline deadline = request.getDeadline(); + final long now = this.clock.millis(); - if (deadline.isExpired()) { + if (deadline.isBefore(now)) { request.failed(DeadlineTimeoutException.from(deadline)); return false; } @@ -325,7 +344,7 @@ private boolean processPendingRequest(final LeaseRequest request) { if (entry == null) { break; } - if (entry.getExpiryDeadline().isExpired()) { + if (entry.getExpiryDeadline().isBefore(now)) { entry.discardConnection(CloseMode.GRACEFUL); this.available.remove(entry); pool.free(entry, false); @@ -408,7 +427,7 @@ private void fireCallbacks() { public void validatePendingRequests() { this.lock.lock(); try { - final long now = System.currentTimeMillis(); + final long now = this.clock.millis(); final ListIterator> it = this.pendingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); @@ -512,11 +531,12 @@ public int getMaxPerRoute(final T route) { public PoolStats getTotalStats() { this.lock.lock(); try { + final long now = this.clock.millis(); int pendingCount = 0; for (final LeaseRequest request: pendingRequests) { if (!request.isDone()) { final Deadline deadline = request.getDeadline(); - if (!deadline.isExpired()) { + if (!deadline.isBefore(now)) { pendingCount++; } } @@ -536,12 +556,13 @@ public PoolStats getStats(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { + final long now = this.clock.millis(); final PerRoutePool pool = getPool(route); int pendingCount = 0; for (final LeaseRequest request: pendingRequests) { if (!request.isDone() && Objects.equals(route, request.getRoute())) { final Deadline deadline = request.getDeadline(); - if (!deadline.isExpired()) { + if (!deadline.isBefore(now)) { pendingCount++; } } @@ -628,7 +649,7 @@ private void purgePoolMap() { @Override public void closeIdle(final TimeValue idleTime) { - final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0); + final long deadline = this.clock.millis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0); enumAvailable(entry -> { if (entry.getUpdated() <= deadline) { entry.discardConnection(CloseMode.GRACEFUL); @@ -638,7 +659,7 @@ public void closeIdle(final TimeValue idleTime) { @Override public void closeExpired() { - final long now = System.currentTimeMillis(); + final long now = this.clock.millis(); enumAvailable(entry -> { if (entry.getExpiryDeadline().isBefore(now)) { entry.discardConnection(CloseMode.GRACEFUL); @@ -677,18 +698,18 @@ static class LeaseRequest { * * @param route route * @param state state - * @param requestTimeout timeout to wait in a request queue until kicked off + * @param deadline deadline * @param future future callback */ public LeaseRequest( final T route, final Object state, - final Timeout requestTimeout, + final Deadline deadline, final BasicFuture> future) { super(); this.route = route; this.state = state; - this.deadline = Deadline.calculate(requestTimeout); + this.deadline = deadline; this.future = future; this.completed = new AtomicBoolean(); } @@ -756,14 +777,16 @@ static class PerRoutePool { private final LinkedList> available; private final DisposalCallback disposalCallback; private final PoolReusePolicy policy; + private final Supplier currentTimeSupplier; - PerRoutePool(final T route, final DisposalCallback disposalCallback, final PoolReusePolicy policy) { + PerRoutePool(final T route, final DisposalCallback disposalCallback, final PoolReusePolicy policy, final Supplier currentTimeSupplier) { super(); this.route = route; this.disposalCallback = disposalCallback; this.policy = policy; this.leased = new HashSet<>(); this.available = new LinkedList<>(); + this.currentTimeSupplier = currentTimeSupplier; } public final T getRoute() { @@ -834,7 +857,7 @@ public void free(final PoolEntry entry, final boolean reusable) { } public PoolEntry createEntry(final TimeValue timeToLive) { - final PoolEntry entry = new PoolEntry<>(this.route, timeToLive, disposalCallback); + final PoolEntry entry = new PoolEntry<>(this.route, timeToLive, disposalCallback, currentTimeSupplier); this.leased.add(entry); return entry; } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java new file mode 100644 index 000000000..feac80669 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java @@ -0,0 +1,112 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.io.IOException; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; +import org.apache.hc.core5.util.TimeValue; + +final class PoolTestSupport { + + private PoolTestSupport() { + } + + static final class DummyConn implements ModalCloseable { + + private volatile boolean closed; + + @Override + public void close(final CloseMode closeMode) { + this.closed = true; + } + + @Override + public void close() throws IOException { + close(CloseMode.GRACEFUL); + } + + boolean isClosed() { + return closed; + } + + } + + static final DisposalCallback DISPOSAL = DummyConn::close; + + enum PoolType { + STRICT, + LAX, + OFFLOCK; + + ManagedConnPool createPool( + final int defaultMaxPerRoute, + final int maxTotal) { + + final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND; + final PoolReusePolicy reusePolicy = PoolReusePolicy.LIFO; + + switch (this) { + case STRICT: { + return new StrictConnPool<>( + defaultMaxPerRoute, + maxTotal, + ttl, + reusePolicy, + DISPOSAL, + null); + } + case LAX: { + return new LaxConnPool<>( + defaultMaxPerRoute, + ttl, + reusePolicy, + DISPOSAL, + null); + } + case OFFLOCK: { + return new RouteSegmentedConnPool<>( + defaultMaxPerRoute, + maxTotal, + ttl, + reusePolicy, + DISPOSAL); + } + default: { + throw new IllegalStateException("Unexpected: " + this); + } + } + } + + boolean hasHardTotalLimit() { + return this != LAX; + } + + } + +} \ No newline at end of file diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java new file mode 100644 index 000000000..51c3bda6f --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java @@ -0,0 +1,179 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; + +final class TestConnPoolClockInjection { + + static final class TestClock extends Clock { + + private final ZoneId zoneId; + private final AtomicLong millis; + + TestClock(final long initialMillis) { + this.zoneId = ZoneId.of("UTC"); + this.millis = new AtomicLong(initialMillis); + } + + void advanceMillis(final long deltaMillis) { + millis.addAndGet(deltaMillis); + } + + @Override + public ZoneId getZone() { + return zoneId; + } + + @Override + public Clock withZone(final ZoneId zone) { + return this; + } + + @Override + public long millis() { + return millis.get(); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + } + + @Test + void strictPoolCloseIdleUsesInjectedClock() throws Exception { + final TestClock clock = new TestClock(0L); + + final StrictConnPool pool = new StrictConnPool<>( + 1, + 1, + TimeValue.NEG_ONE_MILLISECOND, + PoolReusePolicy.LIFO, + PoolTestSupport.DISPOSAL, + null, + clock); + + try { + final Future> f1 = + pool.lease("r1", null, Timeout.ofSeconds(1), null); + final PoolEntry e1 = f1.get(1, TimeUnit.SECONDS); + assertNotNull(e1); + e1.assignConnection(new PoolTestSupport.DummyConn()); + pool.release(e1, true); + + clock.advanceMillis(10_000L); + + pool.closeIdle(TimeValue.ofMilliseconds(1)); + + assertEquals(0, pool.getTotalStats().getAvailable()); + assertEquals(0, pool.getTotalStats().getLeased()); + assertEquals(0, pool.getTotalStats().getPending()); + } finally { + pool.close(CloseMode.IMMEDIATE); + } + } + + @Test + void laxPoolCloseIdleUsesInjectedClock() throws Exception { + final TestClock clock = new TestClock(0L); + + final LaxConnPool pool = new LaxConnPool<>( + 1, + TimeValue.NEG_ONE_MILLISECOND, + PoolReusePolicy.LIFO, + PoolTestSupport.DISPOSAL, + null, + clock); + + try { + final Future> f1 = + pool.lease("r1", null, Timeout.ofSeconds(1), null); + final PoolEntry e1 = f1.get(1, TimeUnit.SECONDS); + assertNotNull(e1); + e1.assignConnection(new PoolTestSupport.DummyConn()); + pool.release(e1, true); + + clock.advanceMillis(10_000L); + + pool.closeIdle(TimeValue.ofMilliseconds(1)); + + assertEquals(0, pool.getTotalStats().getAvailable()); + assertEquals(0, pool.getTotalStats().getLeased()); + assertEquals(0, pool.getTotalStats().getPending()); + } finally { + pool.close(CloseMode.IMMEDIATE); + } + } + + @Test + void offlockPoolCloseIdleUsesInjectedClock() throws Exception { + final TestClock clock = new TestClock(0L); + + final RouteSegmentedConnPool pool = new RouteSegmentedConnPool<>( + 1, + 1, + TimeValue.NEG_ONE_MILLISECOND, + PoolReusePolicy.LIFO, + PoolTestSupport.DISPOSAL, + null, + clock); + + try { + final Future> f1 = + pool.lease("r1", null, Timeout.ofSeconds(1), null); + final PoolEntry e1 = f1.get(1, TimeUnit.SECONDS); + assertNotNull(e1); + e1.assignConnection(new PoolTestSupport.DummyConn()); + pool.release(e1, true); + + clock.advanceMillis(10_000L); + + pool.closeIdle(TimeValue.ofMilliseconds(1)); + + assertEquals(0, pool.getTotalStats().getAvailable()); + assertEquals(0, pool.getTotalStats().getLeased()); + assertEquals(0, pool.getTotalStats().getPending()); + } finally { + pool.close(CloseMode.IMMEDIATE); + } + } + +} \ No newline at end of file From bfd05b998c3a53a63b0235d4437f8452215ebd5c Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 1 Mar 2026 16:23:20 +0100 Subject: [PATCH 2/2] StrictConnPool: inject Clock to make time-based tests deterministic --- .../apache/hc/core5/pool/PoolTestSupport.java | 44 ++++ .../core5/pool/TestConnPoolTimeContract.java | 198 ++++++++++++++++++ .../hc/core5/pool/TestStrictConnPool.java | 101 +++++++-- 3 files changed, 326 insertions(+), 17 deletions(-) create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolTimeContract.java diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java index feac80669..ed9b9494c 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/PoolTestSupport.java @@ -27,6 +27,10 @@ package org.apache.hc.core5.pool; import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; @@ -109,4 +113,44 @@ boolean hasHardTotalLimit() { } + @FunctionalInterface + interface PoolFactory { + ManagedConnPool create(final Clock clock); + } + static final class TestClock extends Clock { + + private final ZoneId zoneId; + private final AtomicLong millis; + + TestClock(final long initialMillis) { + this.zoneId = ZoneId.of("UTC"); + this.millis = new AtomicLong(initialMillis); + } + + void advanceMillis(final long deltaMillis) { + this.millis.addAndGet(deltaMillis); + } + + @Override + public ZoneId getZone() { + return this.zoneId; + } + + @Override + public Clock withZone(final ZoneId zone) { + return this; + } + + @Override + public long millis() { + return this.millis.get(); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + } + + } \ No newline at end of file diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolTimeContract.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolTimeContract.java new file mode 100644 index 000000000..5a8431535 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolTimeContract.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +class TestConnPoolTimeContract { + + private static Stream pools() { + return Stream.of( + Arguments.of("STRICT", (PoolTestSupport.PoolFactory) clock -> + new StrictConnPool<>( + 2, + 2, + TimeValue.NEG_ONE_MILLISECOND, + PoolReusePolicy.LIFO, + null, + null, + clock)) + ); + } + + @ParameterizedTest(name = "{0} closeExpired is deterministic") + @MethodSource("pools") + void closeExpired_isDeterministic( + final String name, + final PoolTestSupport.PoolFactory factory) throws Exception { + + final HttpConnection conn1 = Mockito.mock(HttpConnection.class); + final HttpConnection conn2 = Mockito.mock(HttpConnection.class); + + final PoolTestSupport.TestClock clock = new PoolTestSupport.TestClock(0L); + + try (final ManagedConnPool pool = factory.create(clock)) { + + final Future> f1 = + pool.lease("somehost", null, Timeout.DISABLED, null); + final Future> f2 = + pool.lease("somehost", null, Timeout.DISABLED, null); + + Assertions.assertTrue(f1.isDone()); + final PoolEntry e1 = f1.get(); + Assertions.assertNotNull(e1); + e1.assignConnection(conn1); + + Assertions.assertTrue(f2.isDone()); + final PoolEntry e2 = f2.get(); + Assertions.assertNotNull(e2); + e2.assignConnection(conn2); + + e1.updateExpiry(TimeValue.of(1, TimeUnit.MILLISECONDS)); + pool.release(e1, true); + + clock.advanceMillis(200L); + + e2.updateExpiry(TimeValue.of(1000, TimeUnit.SECONDS)); + pool.release(e2, true); + + pool.closeExpired(); + + Mockito.verify(conn1).close(CloseMode.GRACEFUL); + Mockito.verify(conn2, Mockito.never()).close(ArgumentMatchers.any()); + + final PoolStats totals = pool.getTotalStats(); + Assertions.assertEquals(1, totals.getAvailable()); + Assertions.assertEquals(0, totals.getLeased()); + Assertions.assertEquals(0, totals.getPending()); + } + } + + @ParameterizedTest(name = "{0} closeIdle is deterministic") + @MethodSource("pools") + void closeIdle_isDeterministic( + final String name, + final PoolTestSupport.PoolFactory factory) throws Exception { + + final HttpConnection conn1 = Mockito.mock(HttpConnection.class); + final HttpConnection conn2 = Mockito.mock(HttpConnection.class); + + final PoolTestSupport.TestClock clock = new PoolTestSupport.TestClock(0L); + + try (final ManagedConnPool pool = factory.create(clock)) { + + final Future> f1 = + pool.lease("somehost", null, Timeout.DISABLED, null); + final Future> f2 = + pool.lease("somehost", null, Timeout.DISABLED, null); + + Assertions.assertTrue(f1.isDone()); + final PoolEntry e1 = f1.get(); + Assertions.assertNotNull(e1); + e1.assignConnection(conn1); + + Assertions.assertTrue(f2.isDone()); + final PoolEntry e2 = f2.get(); + Assertions.assertNotNull(e2); + e2.assignConnection(conn2); + + // IMPORTANT: updateState drives "updated" timestamp. + e1.updateState(null); + pool.release(e1, true); + + clock.advanceMillis(200L); + + e2.updateState(null); + pool.release(e2, true); + + pool.closeIdle(TimeValue.ofMilliseconds(50)); + + Mockito.verify(conn1).close(CloseMode.GRACEFUL); + Mockito.verify(conn2, Mockito.never()).close(ArgumentMatchers.any()); + + pool.closeIdle(TimeValue.ofMilliseconds(-1)); + + Mockito.verify(conn2).close(CloseMode.GRACEFUL); + } + } + + @ParameterizedTest(name = "{0} request timeout validation is deterministic") + @MethodSource("pools") + void validatePendingRequests_isDeterministic( + final String name, + final PoolTestSupport.PoolFactory factory) throws Exception { + + final HttpConnection conn1 = Mockito.mock(HttpConnection.class); + final PoolTestSupport.TestClock clock = new PoolTestSupport.TestClock(0L); + + try (final ManagedConnPool pool = factory.create(clock)) { + + // IMPORTANT: force 1/1 so second lease becomes pending. + pool.setDefaultMaxPerRoute(1); + pool.setMaxTotal(1); + + final Future> f1 = + pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); + final Future> f2 = + pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); + final Future> f3 = + pool.lease("somehost", null, Timeout.ofMilliseconds(10), null); + + Assertions.assertTrue(f1.isDone()); + final PoolEntry e1 = f1.get(); + Assertions.assertNotNull(e1); + e1.assignConnection(conn1); + + Assertions.assertFalse(f2.isDone()); + Assertions.assertFalse(f3.isDone()); + + clock.advanceMillis(100L); + + Assertions.assertInstanceOf(StrictConnPool.class, pool); + ((StrictConnPool) pool).validatePendingRequests(); + + // f2 has Timeout=0 (= DISABLED) so it must remain pending + Assertions.assertFalse(f2.isDone()); + // f3 must time out + Assertions.assertTrue(f3.isDone()); + } + } + +} \ No newline at end of file diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestStrictConnPool.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestStrictConnPool.java index 201964c87..3776892c4 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestStrictConnPool.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestStrictConnPool.java @@ -26,6 +26,9 @@ */ package org.apache.hc.core5.pool; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.Collections; import java.util.Random; import java.util.concurrent.CancellationException; @@ -36,6 +39,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.core5.http.HttpConnection; @@ -50,6 +54,55 @@ class TestStrictConnPool { + static final class TestClock extends Clock { + + private final ZoneId zoneId; + private final AtomicLong millis; + + TestClock(final long initialMillis) { + this.zoneId = ZoneId.of("UTC"); + this.millis = new AtomicLong(initialMillis); + } + + void advanceMillis(final long deltaMillis) { + this.millis.addAndGet(deltaMillis); + } + + @Override + public ZoneId getZone() { + return this.zoneId; + } + + @Override + public Clock withZone(final ZoneId zone) { + return this; + } + + @Override + public long millis() { + return this.millis.get(); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + } + + private static StrictConnPool newPool( + final int defaultMaxPerRoute, + final int maxTotal, + final Clock clock) { + return new StrictConnPool<>( + defaultMaxPerRoute, + maxTotal, + TimeValue.NEG_ONE_MILLISECOND, + PoolReusePolicy.LIFO, + null, + null, + clock); + } + @Test void testEmptyPool() { try (final StrictConnPool pool = new StrictConnPool<>(2, 10)) { @@ -128,9 +181,15 @@ void testLeaseReleaseMultiThreaded() throws Exception { try { while (n.decrementAndGet() > 0) { try { - final Future> future = pool.lease("somehost", null); - final PoolEntry poolEntry = future.get(1, TimeUnit.MINUTES); - Thread.sleep(rnd.nextInt(1)); + final Future> future = + pool.lease("somehost", null); + final PoolEntry poolEntry = + future.get(1, TimeUnit.MINUTES); + // No sleeping; just a tiny bit of jitter to vary interleavings. + if ((rnd.nextInt() & 0x3) == 0) { + Thread.yield(); + } + pool.release(poolEntry, false); } catch (final Exception ex) { Assertions.fail(ex.getMessage(), ex); @@ -357,8 +416,8 @@ void testStatefulConnectionRedistributionOnPerRouteMaxLimit() throws Exception { Assertions.assertTrue(future1.isDone()); final PoolEntry entry1 = future1.get(); - entry1.assignConnection(conn1); Assertions.assertNotNull(entry1); + entry1.assignConnection(conn1); Assertions.assertTrue(future2.isDone()); final PoolEntry entry2 = future2.get(); Assertions.assertNotNull(entry2); @@ -377,7 +436,7 @@ void testStatefulConnectionRedistributionOnPerRouteMaxLimit() throws Exception { final Future> future3 = pool.lease("somehost", "some-stuff"); final Future> future4 = pool.lease("somehost", "some-stuff"); - Assertions.assertTrue(future1.isDone()); + Assertions.assertTrue(future3.isDone()); final PoolEntry entry3 = future3.get(); Assertions.assertNotNull(entry3); Assertions.assertSame(conn2, entry3.getConnection()); @@ -394,7 +453,8 @@ void testStatefulConnectionRedistributionOnPerRouteMaxLimit() throws Exception { Assertions.assertEquals(0, totals.getLeased()); Assertions.assertEquals(0, totals.getPending()); - final Future> future5 = pool.lease("somehost", "some-other-stuff"); + final Future> future5 = + pool.lease("somehost", "some-other-stuff"); Assertions.assertTrue(future5.isDone()); @@ -410,8 +470,9 @@ void testStatefulConnectionRedistributionOnPerRouteMaxLimit() throws Exception { @Test void testCreateNewIfExpired() throws Exception { final HttpConnection conn1 = Mockito.mock(HttpConnection.class); + final TestClock clock = new TestClock(0L); - try (final StrictConnPool pool = new StrictConnPool<>(2, 2)) { + try (final StrictConnPool pool = newPool(2, 2, clock)) { final Future> future1 = pool.lease("somehost", null); @@ -423,7 +484,7 @@ void testCreateNewIfExpired() throws Exception { entry1.updateExpiry(TimeValue.of(1, TimeUnit.MILLISECONDS)); pool.release(entry1, true); - Thread.sleep(200L); + clock.advanceMillis(200L); final Future> future2 = pool.lease("somehost", null); @@ -445,8 +506,9 @@ void testCreateNewIfExpired() throws Exception { void testCloseExpired() throws Exception { final HttpConnection conn1 = Mockito.mock(HttpConnection.class); final HttpConnection conn2 = Mockito.mock(HttpConnection.class); + final TestClock clock = new TestClock(0L); - try (final StrictConnPool pool = new StrictConnPool<>(2, 2)) { + try (final StrictConnPool pool = newPool(2, 2, clock)) { final Future> future1 = pool.lease("somehost", null); final Future> future2 = pool.lease("somehost", null); @@ -463,7 +525,7 @@ void testCloseExpired() throws Exception { entry1.updateExpiry(TimeValue.of(1, TimeUnit.MILLISECONDS)); pool.release(entry1, true); - Thread.sleep(200); + clock.advanceMillis(200L); entry2.updateExpiry(TimeValue.of(1000, TimeUnit.SECONDS)); pool.release(entry2, true); @@ -488,8 +550,9 @@ void testCloseExpired() throws Exception { void testCloseIdle() throws Exception { final HttpConnection conn1 = Mockito.mock(HttpConnection.class); final HttpConnection conn2 = Mockito.mock(HttpConnection.class); + final TestClock clock = new TestClock(0L); - try (final StrictConnPool pool = new StrictConnPool<>(2, 2)) { + try (final StrictConnPool pool = newPool(2, 2, clock)) { final Future> future1 = pool.lease("somehost", null); final Future> future2 = pool.lease("somehost", null); @@ -506,7 +569,7 @@ void testCloseIdle() throws Exception { entry1.updateState(null); pool.release(entry1, true); - Thread.sleep(200L); + clock.advanceMillis(200L); entry2.updateState(null); pool.release(entry2, true); @@ -543,12 +606,16 @@ void testCloseIdle() throws Exception { @Test void testLeaseRequestTimeout() throws Exception { final HttpConnection conn1 = Mockito.mock(HttpConnection.class); + final TestClock clock = new TestClock(0L); - try (final StrictConnPool pool = new StrictConnPool<>(1, 1)) { + try (final StrictConnPool pool = newPool(1, 1, clock)) { - final Future> future1 = pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); - final Future> future2 = pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); - final Future> future3 = pool.lease("somehost", null, Timeout.ofMilliseconds(10), null); + final Future> future1 = + pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); + final Future> future2 = + pool.lease("somehost", null, Timeout.ofMilliseconds(0), null); + final Future> future3 = + pool.lease("somehost", null, Timeout.ofMilliseconds(10), null); Assertions.assertTrue(future1.isDone()); final PoolEntry entry1 = future1.get(); @@ -557,7 +624,7 @@ void testLeaseRequestTimeout() throws Exception { Assertions.assertFalse(future2.isDone()); Assertions.assertFalse(future3.isDone()); - Thread.sleep(100); + clock.advanceMillis(100L); pool.validatePendingRequests();