/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http.impl.pool;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.impl.ContextInternal;
import java.io.File;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
The pool is a state machine that maintains a queue of waiters and a list of available connections.
Interactions with the pool modifies the pool state and then the pool will run tasks to make progress satisfying
the pool requests.
The pool maintains a few invariants:
- a connection in the
Pool<C>.available
set has its Holder.capacity
> 0
Pool<C>.weight
is the sum of all connection's Holder.weight
Pool<C>.capacity
is the sum of all connection's Holder.capacity
Pool<C>.connecting
is the number of the connections connecting but not yet connected
A connection is delivered to a Waiter
on the pool's context event loop thread, the waiter must take care of calling ContextInternal.executeFromIO
if necessary.
Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can
be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased
locking which makes the overhead of synchronized near zero), since it synchronizes on the pool.
Pool weight
To constrain the number of connections the pool maintains a Pool<C>.weight
field that must remain lesser than Pool<C>.maxWeight
to create a connection. Such weight is used instead of counting connection because the pool can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary. When a connection is created an Pool<C>.initialWeight
is added to the current weight. When the channel is connected the ConnectResult
callback value provides actual connection weight so it can be used to correct the pool weight. When the channel fails to connect the initial weight is used to correct the pool weight. Recycling a connection
When a connection is recycled and reaches its full capacity (i.e Holder#concurrency == Holder#capacity
, the behavior depends on the ConnectionListener.onRecycle(long)
event that releases this connection. When expirationTimestamp
is 0L
the connection is closed, otherwise it is maintained in the pool, letting the borrower define the expiration timestamp. The value is set according to the HTTP client connection keep alive timeout. Acquiring a connection
When a waiter wants to acquire a connection it is added to the Pool<C>.waitersQueue
and the request is handled by the pool asynchronously:
- when there is an available pooled connection, this connection is delivered to the waiter and it is removed
from the queue
- when there is no available connection a connection is created when
Pool<C>.weight
<
Pool<C>.maxWeight
- when the max number of waiters is reached, the request is failed
- otherwise the waiter remains in the queue until progress can be done (i.e a connection is recycled, etc...)
Waiter notifications happens on the event-loop thread to avoid races with connection event happening on the same thread.
Connection eviction
Connection can be evicted from the pool with ConnectionListener.onEvict()
, after this call, the connection is fully managed by the caller. This can be used for signalling a connection close or when the connection has been upgraded for an HTTP connection. Pool progress
When the pool state is modified, an asynchronous task is executed to make the pool state progress. The pool ensures that a single progress task is executed with the Pool<C>.checkInProgress
flag. Author: Julien Viet, Tim Fox
/**
* The pool is a state machine that maintains a queue of waiters and a list of available connections.
* <p/>
* Interactions with the pool modifies the pool state and then the pool will run tasks to make progress satisfying
* the pool requests.
* <p/>
* The pool maintains a few invariants:
* <ul>
* <li>a connection in the {@link #available} set has its {@link Holder#capacity}{@code > 0}</li>
* <li>{@link #weight} is the sum of all connection's {@link Holder#weight}</li>
* <li>{@link #capacity} is the sum of all connection's {@link Holder#capacity}</li>
* <li>{@link #connecting} is the number of the connections connecting but not yet connected</li>
* </ul>
*
* A connection is delivered to a {@link Waiter} on the pool's context event loop thread, the waiter must take care of
* calling {@link io.vertx.core.impl.ContextInternal#executeFromIO} if necessary.
* <p/>
* Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can
* be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased
* locking which makes the overhead of synchronized near zero), since it synchronizes on the pool.
*
* <h3>Pool weight</h3>
* To constrain the number of connections the pool maintains a {@link #weight} field that must remain lesser than
* {@link #maxWeight} to create a connection. Such weight is used instead of counting connection because the pool
* can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary.
* <p/>
* When a connection is created an {@link #initialWeight} is added to the current weight.
* When the channel is connected the {@link ConnectResult} callback value provides actual connection weight so it
* can be used to correct the pool weight. When the channel fails to connect the initial weight is used
* to correct the pool weight.
*
* <h3>Recycling a connection</h3>
* When a connection is recycled and reaches its full capacity (i.e {@code Holder#concurrency == Holder#capacity},
* the behavior depends on the {@link ConnectionListener#onRecycle(long)} event that releases this connection.
* When {@code expirationTimestamp} is {@code 0L} the connection is closed, otherwise it is maintained in the pool,
* letting the borrower define the expiration timestamp. The value is set according to the HTTP client connection
* keep alive timeout.
*
* <h3>Acquiring a connection</h3>
* When a waiter wants to acquire a connection it is added to the {@link #waitersQueue} and the request
* is handled by the pool asynchronously:
* <ul>
* <li>when there is an available pooled connection, this connection is delivered to the waiter and it is removed
* from the queue</li>
* <li>when there is no available connection a connection is created when {@link #weight}{@code <}{@link #maxWeight}</li>
* <li>when the max number of waiters is reached, the request is failed</li>
* <li>otherwise the waiter remains in the queue until progress can be done (i.e a connection is recycled, etc...)</li>
* </ul>
* Waiter notifications happens on the event-loop thread to avoid races with connection event happening on the same thread.
*
* <h3>Connection eviction</h3>
* Connection can be evicted from the pool with {@link ConnectionListener#onEvict()}, after this call, the connection
* is fully managed by the caller. This can be used for signalling a connection close or when the connection has
* been upgraded for an HTTP connection.
*
* <h3>Pool progress</h3>
* When the pool state is modified, an asynchronous task is executed to make the pool state progress. The pool ensures
* that a single progress task is executed with the {@link #checkInProgress} flag.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Pool<C> {
Pool state associated with a connection.
/**
* Pool state associated with a connection.
*/
public class Holder implements ConnectionListener<C> {
boolean removed; // Removed
C connection; // The connection instance
long concurrency; // How many times we can borrow from the connection
long capacity; // How many times the connection is currently borrowed (0 <= capacity <= concurrency)
long weight; // The weight that participates in the pool weight
long expirationTimestamp; // The expiration timestamp when (concurrency == capacity) otherwise -1L
private void init(long concurrency, C conn, long weight) {
this.concurrency = concurrency;
this.connection = conn;
this.weight = weight;
this.capacity = concurrency;
this.expirationTimestamp = -1L;
}
@Override
public void onConcurrencyChange(long concurrency) {
setConcurrency(this, concurrency);
}
@Override
public void onRecycle(long expirationTimestamp) {
recycle(this, expirationTimestamp);
}
@Override
public void onEvict() {
evicted(this);
}
void connect() {
connector.connect(this, context, ar -> {
if (ar.succeeded()) {
connectSucceeded(this, ar.result());
} else {
connectFailed(this, ar.cause());
}
});
}
@Override
public String toString() {
return "Holder[removed=" + removed + ",capacity=" + capacity + ",concurrency=" + concurrency + ",expirationTimestamp=" + expirationTimestamp + "]";
}
}
private final ContextInternal context;
private final ConnectionProvider<C> connector;
private final Consumer<C> connectionAdded;
private final Consumer<C> connectionRemoved;
private final LongSupplier clock;
private final int queueMaxSize; // the queue max size (does not include inflight waiters)
private final Deque<Waiter<C>> waitersQueue = new ArrayDeque<>(); // The waiters pending
private final Deque<Holder> available; // Available connections, i.e having capacity > 0
private final boolean fifo; // Recycling policy
private long capacity; // The total available connection capacity
private long connecting; // The number of connections in progress
private final long initialWeight; // The initial weight of a connection
private final long maxWeight; // The max weight (equivalent to max pool size)
private long weight; // The actual pool weight (equivalent to connection count)
private boolean checkInProgress; // A flag to avoid running un-necessary checks
private boolean closed;
private final Handler<Void> poolClosed;
public Pool(Context context,
ConnectionProvider<C> connector,
LongSupplier clock,
int queueMaxSize,
long initialWeight,
long maxWeight,
Handler<Void> poolClosed,
Consumer<C> connectionAdded,
Consumer<C> connectionRemoved,
boolean fifo) {
this.clock = clock;
this.context = (ContextInternal) context;
this.maxWeight = maxWeight;
this.initialWeight = initialWeight;
this.connector = connector;
this.queueMaxSize = queueMaxSize;
this.poolClosed = poolClosed;
this.available = new ArrayDeque<>();
this.connectionAdded = connectionAdded;
this.connectionRemoved = connectionRemoved;
this.fifo = fifo;
}
public synchronized int waitersInQueue() {
return waitersQueue.size();
}
public synchronized long weight() {
return weight;
}
public synchronized long capacity() {
return capacity;
}
Get a connection for a waiter asynchronously.
Params: - handler – the handler
Returns: whether the pool can satisfy the request
/**
* Get a connection for a waiter asynchronously.
*
* @param handler the handler
* @return whether the pool can satisfy the request
*/
public synchronized boolean getConnection(Handler<AsyncResult<C>> handler) {
if (closed) {
return false;
}
Waiter<C> waiter = new Waiter<>(handler);
waitersQueue.add(waiter);
checkProgress();
return true;
}
Close all unused connections with a timestamp
greater than expiration timestamp. Returns: the number of closed connections when calling this method
/**
* Close all unused connections with a {@code timestamp} greater than expiration timestamp.
*
* @return the number of closed connections when calling this method
*/
public synchronized void closeIdle() {
checkProgress();
}
Check whether the pool can make progress toward satisfying the waiters.
/**
* Check whether the pool can make progress toward satisfying the waiters.
*/
private void checkProgress() {
if (!checkInProgress && (canProgress() || canClose())) {
checkInProgress = true;
context.nettyEventLoop().execute(this::checkPendingTasks);
}
}
private boolean canProgress() {
if (waitersQueue.size() > 0) {
return (canAcquireConnection() || needToCreateConnection() || canEvictWaiter());
} else {
return capacity > 0L;
}
}
Run pending progress tasks.
/**
* Run pending progress tasks.
*/
private void checkPendingTasks() {
while (true) {
Runnable task;
synchronized (this) {
task = nextTask();
if (task == null) {
// => Can't make more progress
checkInProgress = false;
checkClose();
break;
}
}
task.run();
}
}
Returns: true
if a connection can be acquired from the pool
/**
* @return {@code true} if a connection can be acquired from the pool
*/
private boolean canAcquireConnection() {
return capacity > 0;
}
Returns: true
if a connection needs to be created
/**
* @return {@code true} if a connection needs to be created
*/
private boolean needToCreateConnection() {
return weight < maxWeight && (waitersQueue.size() - connecting) > 0;
}
Returns: true
if a waiter can be evicted from the queue
/**
* @return {@code true} if a waiter can be evicted from the queue
*/
private boolean canEvictWaiter() {
return queueMaxSize >= 0 && (waitersQueue.size() - connecting) > queueMaxSize;
}
private Runnable nextTask() {
if (waitersQueue.size() > 0) {
// Acquire a task that will deliver a connection
if (canAcquireConnection()) {
Holder conn = available.peek();
capacity--;
if (--conn.capacity == 0) {
conn.expirationTimestamp = -1L;
available.poll();
}
Waiter<C> waiter = waitersQueue.poll();
return () -> waiter.handler.handle(Future.succeededFuture(conn.connection));
} else if (needToCreateConnection()) {
connecting++;
weight += initialWeight;
Holder holder = new Holder();
return holder::connect;
} else if (canEvictWaiter()) {
Waiter<C> waiter = waitersQueue.removeLast();
return () -> waiter.handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + queueMaxSize)));
}
} else if (capacity > 0) {
long now = clock.getAsLong();
List<Holder> expired = null;
for (Iterator<Holder> it = available.iterator();it.hasNext();) {
Holder holder = it.next();
if (holder.capacity == holder.concurrency && (holder.expirationTimestamp == 0 || now >= holder.expirationTimestamp)) {
it.remove();
if (holder.capacity > 0) {
capacity -= holder.capacity;
}
holder.expirationTimestamp = -1L;
holder.capacity = 0;
if (expired == null) {
expired = new ArrayList<>();
}
expired.add(holder);
}
}
if (expired != null) {
List<Holder> toClose = expired;
return () -> {
toClose.forEach(holder -> {
connector.close(holder.connection);
});
};
}
}
return null;
}
private boolean canClose() {
return weight == 0 && waitersQueue.isEmpty();
}
private void checkClose() {
if (canClose()) {
// No waitersQueue and no connections - remove the ConnQueue
closed = true;
poolClosed.handle(null);
}
}
Handle connect success, a number of waiters will be satisfied according to the connection's concurrency.
/**
* Handle connect success, a number of waiters will be satisfied according to the connection's concurrency.
*/
private void connectSucceeded(Holder holder, ConnectResult<C> result) {
List<Waiter<C>> waiters;
synchronized (this) {
connecting--;
weight += initialWeight - result.weight();
holder.init(result.concurrency(), result.connection(), result.weight());
waiters = new ArrayList<>();
while (holder.capacity > 0 && waitersQueue.size() > 0) {
waiters.add(waitersQueue.poll());
holder.capacity--;
}
if (holder.capacity > 0) {
available.add(holder);
capacity += holder.capacity;
}
checkProgress();
}
connectionAdded.accept(holder.connection);
for (Waiter<C> waiter : waiters) {
waiter.handler.handle(Future.succeededFuture(holder.connection));
}
}
Handle connect failures, the first waiter is always failed to avoid infinite reconnection.
/**
* Handle connect failures, the first waiter is always failed to avoid infinite reconnection.
*/
private void connectFailed(Holder holder, Throwable cause) {
Waiter<C> waiter;
synchronized (this) {
connecting--;
waiter = waitersQueue.poll();
weight -= initialWeight;
holder.removed = true;
checkProgress();
}
if (waiter != null) {
waiter.handler.handle(Future.failedFuture(cause));
}
}
private synchronized void setConcurrency(Holder holder, long concurrency) {
if (concurrency < 0L) {
throw new IllegalArgumentException("Cannot set a negative concurrency value");
}
if (holder.removed) {
assert false : "Cannot recycle removed holder";
return;
}
if (holder.concurrency < concurrency) {
long diff = concurrency - holder.concurrency;
if (holder.capacity == 0) {
available.add(holder);
}
capacity += diff;
holder.capacity += diff;
holder.concurrency = concurrency;
checkProgress();
} else if (holder.concurrency > concurrency) {
throw new UnsupportedOperationException("Not yet implemented");
}
}
private void recycle(Holder holder, long timestamp) {
if (timestamp < 0L) {
throw new IllegalArgumentException("Invalid timestamp");
}
if (holder.removed) {
return;
}
C toClose;
synchronized (this) {
if (recycleConnection(holder, timestamp)) {
toClose = holder.connection;
} else {
toClose = null;
}
}
if (toClose != null) {
connector.close(holder.connection);
} else {
synchronized (this) {
checkProgress();
}
}
}
private synchronized void evicted(Holder holder) {
if (holder.removed) {
return;
}
evictConnection(holder);
checkProgress();
}
private void evictConnection(Holder holder) {
holder.removed = true;
connectionRemoved.accept(holder.connection);
if (holder.capacity > 0) {
capacity -= holder.capacity;
available.remove(holder);
holder.capacity = 0;
}
weight -= holder.weight;
}
// These methods assume to be called under synchronization
Recycles a connection.
Params: - holder – the connection to recycle
- timestamp – the expiration timestamp of the connection
Returns: true
if the connection shall be closed
/**
* Recycles a connection.
*
* @param holder the connection to recycle
* @param timestamp the expiration timestamp of the connection
* @return {@code true} if the connection shall be closed
*/
private boolean recycleConnection(Holder holder, long timestamp) {
long newCapacity = holder.capacity + 1;
if (newCapacity > holder.concurrency) {
throw new AssertionError("Attempt to recycle a connection more than permitted");
}
if (timestamp == 0L && newCapacity == holder.concurrency && capacity >= waitersQueue.size()) {
if (holder.capacity > 0) {
capacity -= holder.capacity;
available.remove(holder);
}
holder.expirationTimestamp = -1L;
holder.capacity = 0;
return true;
} else {
capacity++;
if (holder.capacity == 0) {
if (fifo) {
available.addLast(holder);
} else {
available.addFirst(holder);
}
}
holder.expirationTimestamp = timestamp;
holder.capacity++;
return false;
}
}
public String toString() {
StringBuilder sb = new StringBuilder();
synchronized (this) {
sb.append("Available:").append(File.separator);
available.forEach(holder -> {
sb.append(holder).append(File.separator);
});
sb.append("Waiters").append(File.separator);
waitersQueue.forEach(w -> {
sb.append(w.handler).append(File.separator);
});
sb.append("InitialWeight:").append(initialWeight).append(File.separator);
sb.append("MaxWeight:").append(maxWeight).append(File.separator);
sb.append("Weight:").append(weight).append(File.separator);
sb.append("Capacity:").append(capacity).append(File.separator);
sb.append("Connecting:").append(connecting).append(File.separator);
sb.append("CheckInProgress:").append(checkInProgress).append(File.separator);
sb.append("Closed:").append(closed).append(File.separator);
}
return sb.toString();
}
}