package io.vertx.sqlclient.impl;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.core.*;
import io.vertx.core.impl.NoStackTraceThrowable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
public class ConnectionPool {
private final Consumer<Handler<AsyncResult<Connection>>> connector;
private final int maxSize;
private final ArrayDeque<Promise<Connection>> waiters = new ArrayDeque<>();
private final Set<PooledConnection> all = new HashSet<>();
private final ArrayDeque<PooledConnection> available = new ArrayDeque<>();
private int size;
private final int maxWaitQueueSize;
private boolean checkInProgress;
private boolean closed;
public ConnectionPool(Consumer<Handler<AsyncResult<Connection>>> connector) {
this(connector, PoolOptions.DEFAULT_MAX_SIZE, PoolOptions.DEFAULT_MAX_WAIT_QUEUE_SIZE);
}
public ConnectionPool(Consumer<Handler<AsyncResult<Connection>>> connector, int maxSize) {
this(connector, maxSize, PoolOptions.DEFAULT_MAX_WAIT_QUEUE_SIZE);
}
public ConnectionPool(Consumer<Handler<AsyncResult<Connection>>> connector, int maxSize, int maxWaitQueueSize) {
this.maxSize = maxSize;
this.maxWaitQueueSize = maxWaitQueueSize;
this.connector = connector;
}
public int available() {
return available.size();
}
public int size() {
return size;
}
public void acquire(Handler<AsyncResult<Connection>> holder) {
if (closed) {
throw new IllegalStateException("Connection pool closed");
}
Promise<Connection> promise = Promise.promise();
promise.future().setHandler(holder);
waiters.add(promise);
check();
}
public void close() {
if (closed) {
throw new IllegalStateException("Connection pool already closed");
}
closed = true;
for (PooledConnection pooled : new ArrayList<>(all)) {
pooled.close();
}
Future<Connection> failure = Future.failedFuture("Connection pool closed");
for (Promise<Connection> pending : waiters) {
try {
pending.handle(failure);
} catch (Exception ignore) {
}
}
}
private class PooledConnection implements Connection, Connection.Holder {
private final Connection conn;
private Holder holder;
PooledConnection(Connection conn) {
this.conn = conn;
}
@Override
public boolean isSsl() {
return conn.isSsl();
}
@Override
public void schedule(CommandBase<?> cmd) {
conn.schedule(cmd);
}
private void close() {
conn.close(this);
}
@Override
public void init(Holder holder) {
if (this.holder != null) {
throw new IllegalStateException();
}
this.holder = holder;
}
@Override
public void close(Holder holder) {
if (holder != this.holder) {
throw new IllegalStateException();
}
this.holder = null;
release(this);
}
@Override
public void handleClosed() {
if (all.remove(this)) {
size--;
if (holder == null) {
available.remove(this);
} else {
holder.handleClosed();
}
check();
} else {
throw new IllegalStateException();
}
}
@Override
public void handleNotification(int processId, String channel, String payload) {
if (holder != null) {
holder.handleNotification(processId, channel, payload);
}
}
@Override
public void handleException(Throwable err) {
if (holder != null) {
holder.handleException(err);
}
}
@Override
public int getProcessId() {
return conn.getProcessId();
}
@Override
public int getSecretKey() {
return conn.getSecretKey();
}
}
private void release(PooledConnection proxy) {
if (all.contains(proxy)) {
available.add(proxy);
check();
}
}
private void check() {
if (closed) {
return;
}
if (!checkInProgress) {
checkInProgress = true;
try {
while (waiters.size() > 0) {
if (available.size() > 0) {
PooledConnection proxy = available.poll();
Promise<Connection> waiter = waiters.poll();
waiter.complete(proxy);
} else {
if (size < maxSize) {
Promise<Connection> waiter = waiters.poll();
size++;
connector.accept(ar -> {
if (ar.succeeded()) {
Connection conn = ar.result();
PooledConnection proxy = new PooledConnection(conn);
all.add(proxy);
conn.init(proxy);
waiter.complete(proxy);
} else {
size--;
waiter.fail(ar.cause());
check();
}
});
} else {
if (maxWaitQueueSize >= 0) {
int numInProgress = size - all.size();
int numToFail = waiters.size() - (maxWaitQueueSize + numInProgress);
while (numToFail-- > 0) {
Promise<Connection> waiter = waiters.pollLast();
waiter.fail(new NoStackTraceThrowable("Max waiter size reached"));
}
}
break;
}
}
}
} finally {
checkInProgress = false;
}
}
}
}