/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl.clientconnection;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.impl.ContextInternal;
import java.io.File;
import java.util.*;
import java.util.function.Consumer;
The pool is a state machine that maintains a queue of waiters and a list of available connections.
Interactions with the pool modifies the pool state and then the pool will run tasks to make progress satisfying
the pool requests.
The pool maintains a few invariants:
- a connection in the
Pool<C>.available
set has its Holder.capacity
> 0
Pool<C>.weight
is the sum of all connection's Holder.weight
Pool<C>.capacity
is the sum of all connection's Holder.capacity
Pool<C>.connecting
is the number of the connections connecting but not yet connected
A connection is delivered to a Waiter
on the pool's context event loop thread, the waiter must take care of calling ContextInternal.execute
if necessary.
Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can
be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased
locking which makes the overhead of synchronized near zero), since it synchronizes on the pool.
Pool weight
To constrain the number of connections the pool maintains a Pool<C>.weight
field that must remain lesser than Pool<C>.maxWeight
to create a connection. Such weight is used instead of counting connection because the pool can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary. When a connection is created an Pool<C>.initialWeight
is added to the current weight. When the channel is connected the ConnectResult
callback value provides actual connection weight so it can be used to correct the pool weight. When the channel fails to connect the initial weight is used to correct the pool weight. Recycling a connection
When a connection is recycled and reaches its full capacity (i.e Holder#concurrency == Holder#capacity
, then it is put back in the pool so it can be borrowed again. Acquiring a connection
When a waiter wants to acquire a connection it is added to the Pool<C>.waitersQueue
and the request is handled by the pool asynchronously:
- when there is an available pooled connection, this connection is delivered to the waiter and it is removed
from the queue
- when there is no available connection a connection is created when
Pool<C>.weight
<
Pool<C>.maxWeight
- when the max number of waiters is reached, the request is failed
- otherwise the waiter remains in the queue until progress can be done (i.e a connection is recycled, etc...)
Waiter notifications happens on the event-loop thread to avoid races with connection event happening on the same thread.
Connection eviction
Connection can be evicted from the pool with ConnectionListener.onEvict()
, after this call, the connection is fully managed by the caller. This can be used for signaling a connection close or when the connection has been upgraded for an HTTP connection. Idle closing
Closing idle connection can be achieved by calling closeIdle
. This will check every available connection (i.e that are not borrowed) by calling the ConnectionProvider.isValid
predicate. When ConnectionProvider.isValid
return false
then the connection is closed. Pool progress
When the pool state is modified, an asynchronous task is executed to make the pool state progress. The pool ensures that a single progress task is executed with the Pool<C>.checkInProgress
flag. Author: Julien Viet, Tim Fox
/**
* The pool is a state machine that maintains a queue of waiters and a list of available connections.
* <p/>
* Interactions with the pool modifies the pool state and then the pool will run tasks to make progress satisfying
* the pool requests.
* <p/>
* The pool maintains a few invariants:
* <ul>
* <li>a connection in the {@link #available} set has its {@link Holder#capacity}{@code > 0}</li>
* <li>{@link #weight} is the sum of all connection's {@link Holder#weight}</li>
* <li>{@link #capacity} is the sum of all connection's {@link Holder#capacity}</li>
* <li>{@link #connecting} is the number of the connections connecting but not yet connected</li>
* </ul>
*
* A connection is delivered to a {@link Waiter} on the pool's context event loop thread, the waiter must take care of
* calling {@link io.vertx.core.impl.ContextInternal#execute} if necessary.
* <p/>
* Calls to the pool are synchronized on the pool to avoid race conditions and maintain its invariants. This pool can
* be called from different threads safely (although it is not encouraged for performance reasons, we benefit from biased
* locking which makes the overhead of synchronized near zero), since it synchronizes on the pool.
*
* <h3>Pool weight</h3>
* To constrain the number of connections the pool maintains a {@link #weight} field that must remain lesser than
* {@link #maxWeight} to create a connection. Such weight is used instead of counting connection because the pool
* can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary.
* <p/>
* When a connection is created an {@link #initialWeight} is added to the current weight.
* When the channel is connected the {@link ConnectResult} callback value provides actual connection weight so it
* can be used to correct the pool weight. When the channel fails to connect the initial weight is used
* to correct the pool weight.
*
* <h3>Recycling a connection</h3>
* When a connection is recycled and reaches its full capacity (i.e {@code Holder#concurrency == Holder#capacity},
* then it is put back in the pool so it can be borrowed again.
*
* <h3>Acquiring a connection</h3>
* When a waiter wants to acquire a connection it is added to the {@link #waitersQueue} and the request
* is handled by the pool asynchronously:
* <ul>
* <li>when there is an available pooled connection, this connection is delivered to the waiter and it is removed
* from the queue</li>
* <li>when there is no available connection a connection is created when {@link #weight}{@code <}{@link #maxWeight}</li>
* <li>when the max number of waiters is reached, the request is failed</li>
* <li>otherwise the waiter remains in the queue until progress can be done (i.e a connection is recycled, etc...)</li>
* </ul>
* Waiter notifications happens on the event-loop thread to avoid races with connection event happening on the same thread.
*
* <h3>Connection eviction</h3>
* Connection can be evicted from the pool with {@link ConnectionListener#onEvict()}, after this call, the connection
* is fully managed by the caller. This can be used for signaling a connection close or when the connection has
* been upgraded for an HTTP connection.
*
* <h3>Idle closing</h3>
* Closing idle connection can be achieved by calling {@link #closeIdle}. This will check every available connection
* (i.e that are not borrowed) by calling the {@link ConnectionProvider#isValid} predicate. When {@link ConnectionProvider#isValid}
* return {@code false} then the connection is closed.
*
* <h3>Pool progress</h3>
* When the pool state is modified, an asynchronous task is executed to make the pool state progress. The pool ensures
* that a single progress task is executed with the {@link #checkInProgress} flag.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Pool<C> {
Pool state associated with a connection.
/**
* Pool state associated with a connection.
*/
private class Holder implements ConnectionListener<C> {
boolean initialized; // Initialized
boolean removed; // Removed
C connection; // The connection instance
long concurrency; // How many times we can borrow from the connection
long capacity; // How many times the connection is currently borrowed (0 <= capacity <= concurrency)
long weight; // The weight that participates in the pool weight
private void init(long concurrency, C conn, long weight) {
this.concurrency = concurrency;
this.connection = conn;
this.weight = weight;
this.capacity = concurrency;
}
@Override
public void onConcurrencyChange(long concurrency) {
setConcurrency(this, concurrency);
}
@Override
public void onRecycle() {
recycle(this);
}
@Override
public void onEvict() {
evicted(this);
}
void connect() {
connector.connect(this, context, ar -> {
if (ar.succeeded()) {
connectSucceeded(this, ar.result());
} else {
connectFailed(this, ar.cause());
}
});
}
@Override
public String toString() {
return "Holder[removed=" + removed + ",capacity=" + capacity + ",concurrency=" + concurrency + "]";
}
}
private final ContextInternal context;
private final ConnectionProvider<C> connector;
private final Consumer<C> connectionAdded;
private final Consumer<C> connectionRemoved;
private final int queueMaxSize; // the queue max size (does not include inflight waiters)
private final Deque<Waiter<C>> waitersQueue = new ArrayDeque<>(); // The waiters pending
private final Deque<Holder> available; // Available connections, i.e having capacity > 0
private final boolean fifo; // Recycling policy
private long capacity; // The total available connection capacity
private long connecting; // The number of connections in progress
private final long initialWeight; // The initial weight of a connection
private final long maxWeight; // The max weight (equivalent to max pool size)
private long weight; // The actual pool weight (equivalent to connection count)
private boolean checkInProgress; // A flag to avoid running un-necessary checks
public Pool(Context context,
ConnectionProvider<C> connector,
int queueMaxSize,
long initialWeight,
long maxWeight,
Consumer<C> connectionAdded,
Consumer<C> connectionRemoved,
boolean fifo) {
this.context = (ContextInternal) context;
this.weight = 0;
this.maxWeight = maxWeight;
this.initialWeight = initialWeight;
this.connector = connector;
this.queueMaxSize = queueMaxSize;
this.available = new ArrayDeque<>();
this.connectionAdded = connectionAdded;
this.connectionRemoved = connectionRemoved;
this.fifo = fifo;
}
public synchronized int waitersInQueue() {
return waitersQueue.size();
}
public synchronized long weight() {
return weight;
}
public synchronized long capacity() {
return capacity;
}
Get a connection for a waiter asynchronously.
Params: - handler – the handler
/**
* Get a connection for a waiter asynchronously.
*
* @param handler the handler
*/
public synchronized void getConnection(Handler<AsyncResult<C>> handler) {
Waiter<C> waiter = new Waiter<>(handler);
waitersQueue.add(waiter);
checkProgress();
}
Close all connections returning false
when ConnectionProvider.isValid
is called. /**
* Close all connections returning {@code false} when {@link ConnectionProvider#isValid} is called.
*/
public synchronized void closeIdle() {
checkProgress();
}
Check whether the pool can make progress toward satisfying the waiters.
/**
* Check whether the pool can make progress toward satisfying the waiters.
*/
private void checkProgress() {
if (!checkInProgress && canProgress()) {
checkInProgress = true;
context.nettyEventLoop().execute(this::checkPendingTasks);
}
}
private boolean canProgress() {
if (waitersQueue.size() > 0) {
return (canAcquireConnection() || needToCreateConnection() || canEvictWaiter());
} else {
// To check idle connections
return capacity > 0L;
}
}
Run pending progress tasks.
/**
* Run pending progress tasks.
*/
private void checkPendingTasks() {
while (true) {
Runnable task;
synchronized (this) {
task = nextTask();
if (task == null) {
// => Can't make more progress
checkInProgress = false;
break;
}
}
task.run();
}
}
Returns: true
if a connection can be acquired from the pool
/**
* @return {@code true} if a connection can be acquired from the pool
*/
private boolean canAcquireConnection() {
return capacity > 0;
}
Returns: true
if a connection needs to be created
/**
* @return {@code true} if a connection needs to be created
*/
private boolean needToCreateConnection() {
return weight < maxWeight && (waitersQueue.size() - connecting) > 0;
}
Returns: true
if a waiter can be evicted from the queue
/**
* @return {@code true} if a waiter can be evicted from the queue
*/
private boolean canEvictWaiter() {
return queueMaxSize >= 0 && (waitersQueue.size() - connecting) > queueMaxSize;
}
private Runnable nextTask() {
if (waitersQueue.size() > 0) {
// Acquire a task that will deliver a connection
if (canAcquireConnection()) {
Holder conn = available.peek();
capacity--;
if (--conn.capacity == 0) {
available.poll();
}
Waiter<C> waiter = waitersQueue.poll();
return () -> waiter.handler.handle(Future.succeededFuture(conn.connection));
} else if (needToCreateConnection()) {
connecting++;
weight += initialWeight;
Holder holder = new Holder();
return holder::connect;
} else if (canEvictWaiter()) {
Waiter<C> waiter = waitersQueue.removeLast();
return () -> waiter.handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + queueMaxSize)));
}
} else if (capacity > 0) {
List<Holder> expired = null;
for (Iterator<Holder> it = available.iterator();it.hasNext();) {
Holder holder = it.next();
if (holder.capacity == holder.concurrency && !connector.isValid(holder.connection)) {
it.remove();
if (holder.capacity > 0) {
capacity -= holder.capacity;
}
holder.capacity = 0;
if (expired == null) {
expired = new ArrayList<>();
}
expired.add(holder);
}
}
if (expired != null) {
List<Holder> toClose = expired;
return () -> {
toClose.forEach(holder -> {
connector.close(holder.connection);
});
};
}
}
return null;
}
Sanity check of pool invariants.
Throws: - IllegalStateException – when an invariant is invalid
/**
* Sanity check of pool invariants.
*
* @throws IllegalStateException when an invariant is invalid
*/
public synchronized void checkInvariants() {
int weight = 0;
int capacity = 0;
for (Holder holder : available) {
weight += holder.weight;
capacity += holder.capacity;
if (holder.capacity < 1) {
throw new IllegalStateException("Holder capacity must be > 0");
}
}
if (weight != this.weight) {
throw new IllegalStateException("Weight invariant");
}
if (capacity != this.capacity) {
throw new IllegalStateException("Capacity invariant");
}
}
Handle connect success, a number of waiters will be satisfied according to the connection's concurrency.
/**
* Handle connect success, a number of waiters will be satisfied according to the connection's concurrency.
*/
private void connectSucceeded(Holder holder, ConnectResult<C> result) {
connector.init(result.connection());
List<Waiter<C>> waiters;
synchronized (this) {
connecting--;
weight -= initialWeight;
if (holder.removed) {
checkProgress();
return;
}
holder.initialized = true;
weight += result.weight();
holder.init(result.concurrency(), result.connection(), result.weight());
waiters = new ArrayList<>();
while (holder.capacity > 0 && waitersQueue.size() > 0) {
waiters.add(waitersQueue.poll());
holder.capacity--;
}
if (holder.capacity > 0) {
available.add(holder);
capacity += holder.capacity;
}
checkProgress();
}
connectionAdded.accept(holder.connection);
for (Waiter<C> waiter : waiters) {
waiter.handler.handle(Future.succeededFuture(holder.connection));
}
}
Handle connect failures, the first waiter is always failed to avoid infinite reconnection.
/**
* Handle connect failures, the first waiter is always failed to avoid infinite reconnection.
*/
private void connectFailed(Holder holder, Throwable cause) {
Waiter<C> waiter;
synchronized (this) {
connecting--;
waiter = waitersQueue.poll();
weight -= initialWeight;
holder.removed = true;
checkProgress();
}
if (waiter != null) {
waiter.handler.handle(Future.failedFuture(cause));
}
}
private synchronized void setConcurrency(Holder holder, long concurrency) {
if (concurrency < 0L) {
throw new IllegalArgumentException("Cannot set a negative concurrency value");
}
if (holder.removed) {
assert false : "Cannot recycle removed holder";
return;
}
if (holder.concurrency < concurrency) {
long diff = concurrency - holder.concurrency;
if (holder.capacity == 0) {
available.add(holder);
}
capacity += diff;
holder.capacity += diff;
holder.concurrency = concurrency;
checkProgress();
} else if (holder.concurrency > concurrency) {
throw new UnsupportedOperationException("Not yet implemented");
}
}
private synchronized void recycle(Holder holder) {
if (holder.removed) {
return;
}
recycleConnection(holder);
checkProgress();
}
private synchronized void evicted(Holder holder) {
if (holder.removed) {
return;
}
holder.removed = true;
if (holder.initialized) {
evictConnection(holder);
checkProgress();
}
}
private void evictConnection(Holder holder) {
connectionRemoved.accept(holder.connection);
if (holder.capacity > 0) {
capacity -= holder.capacity;
holder.capacity = 0;
available.remove(holder);
}
weight -= holder.weight;
}
// These methods assume to be called under synchronization
Recycles a connection.
Params: - holder – the connection to recycle
/**
* Recycles a connection.
*
* @param holder the connection to recycle
*/
private void recycleConnection(Holder holder) {
long newCapacity = holder.capacity + 1;
if (newCapacity > holder.concurrency) {
throw new AssertionError("Attempt to recycle a connection more than permitted");
}
capacity++;
if (holder.capacity == 0) {
if (fifo) {
available.addLast(holder);
} else {
available.addFirst(holder);
}
}
holder.capacity++;
}
public String toString() {
StringBuilder sb = new StringBuilder();
synchronized (this) {
sb.append("Available:").append(File.separator);
available.forEach(holder -> {
sb.append(holder).append(File.separator);
});
sb.append("Waiters").append(File.separator);
waitersQueue.forEach(w -> {
sb.append(w.handler).append(File.separator);
});
sb.append("InitialWeight:").append(initialWeight).append(File.separator);
sb.append("MaxWeight:").append(maxWeight).append(File.separator);
sb.append("Weight:").append(weight).append(File.separator);
sb.append("Capacity:").append(capacity).append(File.separator);
sb.append("Connecting:").append(connecting).append(File.separator);
sb.append("CheckInProgress:").append(checkInProgress).append(File.separator);
}
return sb.toString();
}
private static final class Waiter<C> {
private final Handler<AsyncResult<C>> handler;
Waiter(Handler<AsyncResult<C>> handler) {
this.handler = handler;
}
}
}