Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.apache.hc.core5.pool;

import java.time.Clock;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -50,6 +51,7 @@
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.util.Args;
Expand Down Expand Up @@ -78,6 +80,9 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
private final AtomicBoolean isShutDown;

private final Clock clock;
private final Supplier<Long> currentTimeSupplier;

private volatile int defaultMaxPerRoute;

/**
Expand All @@ -89,6 +94,16 @@ public LaxConnPool(
final PoolReusePolicy policy,
final DisposalCallback<C> disposalCallback,
final ConnPoolListener<T> connPoolListener) {
this(defaultMaxPerRoute, timeToLive, policy, disposalCallback, connPoolListener, Clock.systemUTC());
}

LaxConnPool(
final int defaultMaxPerRoute,
final TimeValue timeToLive,
final PoolReusePolicy policy,
final DisposalCallback<C> disposalCallback,
final ConnPoolListener<T> connPoolListener,
final Clock clock) {
super();
Args.positive(defaultMaxPerRoute, "Max per route value");
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
Expand All @@ -98,6 +113,8 @@ public LaxConnPool(
this.routeToPool = new ConcurrentHashMap<>();
this.isShutDown = new AtomicBoolean();
this.defaultMaxPerRoute = defaultMaxPerRoute;
this.clock = Args.notNull(clock, "clock");
this.currentTimeSupplier = this.clock::millis;
}

/**
Expand Down Expand Up @@ -145,7 +162,9 @@ private PerRoutePool<T, C> getPool(final T route) {
policy,
this,
disposalCallback,
connPoolListener);
connPoolListener,
clock,
currentTimeSupplier);
routePool = routeToPool.putIfAbsent(route, newRoutePool);
if (routePool == null) {
routePool = newRoutePool;
Expand Down Expand Up @@ -266,7 +285,7 @@ public void enumLeased(final Callback<PoolEntry<T, C>> callback) {

@Override
public void closeIdle(final TimeValue idleTime) {
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
final long deadline = clock.millis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
enumAvailable(entry -> {
if (entry.getUpdated() <= deadline) {
entry.discardConnection(CloseMode.GRACEFUL);
Expand All @@ -276,7 +295,7 @@ public void closeIdle(final TimeValue idleTime) {

@Override
public void closeExpired() {
final long now = System.currentTimeMillis();
final long now = clock.millis();
enumAvailable(entry -> {
if (entry.getExpiryDeadline().isBefore(now)) {
entry.discardConnection(CloseMode.GRACEFUL);
Expand Down Expand Up @@ -306,11 +325,11 @@ static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {

LeaseRequest(
final Object state,
final Timeout requestTimeout,
final Deadline deadline,
final BasicFuture<PoolEntry<T, C>> future) {
super();
this.state = state;
this.deadline = Deadline.calculate(requestTimeout);
this.deadline = deadline;
this.future = future;
}

Expand Down Expand Up @@ -362,6 +381,9 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
private final AtomicInteger allocated;
private final AtomicLong releaseSeqNum;

private final Clock clock;
private final Supplier<Long> currentTimeSupplier;

private volatile int max;

PerRoutePool(
Expand All @@ -371,7 +393,9 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
final PoolReusePolicy policy,
final ConnPoolStats<T> connPoolStats,
final DisposalCallback<C> disposalCallback,
final ConnPoolListener<T> connPoolListener) {
final ConnPoolListener<T> connPoolListener,
final Clock clock,
final Supplier<Long> currentTimeSupplier) {
super();
this.route = route;
this.timeToLive = timeToLive;
Expand All @@ -386,6 +410,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
this.allocated = new AtomicInteger(0);
this.releaseSeqNum = new AtomicLong(0);
this.max = max;
this.clock = clock;
this.currentTimeSupplier = currentTimeSupplier;
}

public void shutdown(final CloseMode closeMode) {
Expand All @@ -412,7 +438,7 @@ private PoolEntry<T, C> createPoolEntry() {
prev = allocated.get();
next = (prev < poolMax) ? prev + 1 : prev;
} while (!allocated.compareAndSet(prev, next));
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback, currentTimeSupplier) : null;
}

private void deallocatePoolEntry() {
Expand All @@ -437,12 +463,13 @@ private void removeLeased(final PoolEntry<T, C> entry) {
}

private PoolEntry<T, C> getAvailableEntry(final Object state) {
final long now = clock.millis();
for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
final PoolEntry<T, C> entry = ref.getReference();
if (ref.compareAndSet(entry, entry, false, true)) {
it.remove();
if (entry.getExpiryDeadline().isExpired() || !Objects.equals(entry.getState(), state)) {
if (entry.getExpiryDeadline().isBefore(now) || !Objects.equals(entry.getState(), state)) {
entry.discardConnection(CloseMode.GRACEFUL);
deallocatePoolEntry();
} else {
Expand Down Expand Up @@ -484,7 +511,7 @@ public PoolEntry<T, C> get(
addLeased(entry);
future.completed(entry);
} else {
pending.add(new LeaseRequest<>(state, requestTimeout, future));
pending.add(new LeaseRequest<>(state, Deadline.calculate(clock.millis(), requestTimeout), future));
if (releaseState != releaseSeqNum.get()) {
servicePendingRequest();
}
Expand All @@ -494,7 +521,8 @@ public PoolEntry<T, C> get(

public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
removeLeased(releasedEntry);
if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
final long now = clock.millis();
if (!reusable || releasedEntry.getExpiryDeadline().isBefore(now)) {
releasedEntry.discardConnection(CloseMode.GRACEFUL);
}
if (releasedEntry.hasConnection()) {
Expand Down Expand Up @@ -529,8 +557,9 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
}
final Object state = leaseRequest.getState();
final Deadline deadline = leaseRequest.getDeadline();
final long now = clock.millis();

if (deadline.isExpired()) {
if (deadline.isBefore(now)) {
leaseRequest.failed(DeadlineTimeoutException.from(deadline));
} else {
final long releaseState = releaseSeqNum.get();
Expand Down Expand Up @@ -558,6 +587,7 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
}

public void validatePendingRequests() {
final long now = clock.millis();
final Iterator<LeaseRequest<T, C>> it = pending.iterator();
while (it.hasNext()) {
final LeaseRequest<T, C> request = it.next();
Expand All @@ -566,7 +596,7 @@ public void validatePendingRequests() {
it.remove();
} else {
final Deadline deadline = request.getDeadline();
if (deadline.isExpired()) {
if (deadline.isBefore(now)) {
request.failed(DeadlineTimeoutException.from(deadline));
}
if (request.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package org.apache.hc.core5.pool;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.util.Args;
Expand Down Expand Up @@ -104,6 +106,9 @@ public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implement

private final ScheduledThreadPoolExecutor timeouts;

private final Clock clock;
private final Supplier<Long> currentTimeSupplier;

/**
* Dedicated executor for asynchronous, best-effort disposal.
* Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
Expand Down Expand Up @@ -131,6 +136,17 @@ public RouteSegmentedConnPool(
final PoolReusePolicy reusePolicy,
final DisposalCallback<C> disposal,
final ConnPoolListener<R> connPoolListener) {
this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, connPoolListener, Clock.systemUTC());
}

RouteSegmentedConnPool(
final int defaultMaxPerRoute,
final int maxTotal,
final TimeValue timeToLive,
final PoolReusePolicy reusePolicy,
final DisposalCallback<C> disposal,
final ConnPoolListener<R> connPoolListener,
final Clock clock) {

this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5);
this.maxTotal.set(maxTotal > 0 ? maxTotal : 25);
Expand All @@ -148,6 +164,9 @@ public RouteSegmentedConnPool(
this.disposal = Args.notNull(disposal, "disposal");
this.connPoolListener = connPoolListener;

this.clock = Args.notNull(clock, "clock");
this.currentTimeSupplier = this.clock::millis;

final ThreadFactory tf = r -> {
final Thread t = new Thread(r, "seg-pool-timeouts");
t.setDaemon(true);
Expand Down Expand Up @@ -231,7 +250,7 @@ public Future<PoolEntry<R, C>> lease(
if (hit == null) {
break;
}
final long now = System.currentTimeMillis();
final long now = clock.millis();
if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) {
discardAndDecr(hit, CloseMode.GRACEFUL);
continue;
Expand All @@ -248,7 +267,7 @@ public Future<PoolEntry<R, C>> lease(

// 2) Try to allocate new within caps
if (tryAllocateOne(route, seg)) {
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, currentTimeSupplier);
fireOnLease(route);
if (callback != null) {
callback.completed(entry);
Expand Down Expand Up @@ -326,7 +345,7 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
return;
}

final long now = System.currentTimeMillis();
final long now = clock.millis();
final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now);

if (stillValid) {
Expand Down Expand Up @@ -391,7 +410,7 @@ public void close(final CloseMode closeMode) {

@Override
public void closeIdle(final TimeValue idleTime) {
final long cutoff = System.currentTimeMillis()
final long cutoff = clock.millis()
- Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L);

for (final Map.Entry<R, Segment> e : segments.entrySet()) {
Expand All @@ -417,7 +436,7 @@ public void closeIdle(final TimeValue idleTime) {

@Override
public void closeExpired() {
final long now = System.currentTimeMillis();
final long now = clock.millis();

for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final R route = e.getKey();
Expand Down Expand Up @@ -731,7 +750,7 @@ private void serveRoundRobin(final int budget) {
seg.allocated.decrementAndGet();
totalAllocated.decrementAndGet();
} else {
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, currentTimeSupplier);
cancelTimeout(w);
w.complete(entry);
fireOnLease(w.route);
Expand Down
Loading
Loading