package io.vertx.ext.asyncsql.impl.pool;
import com.github.mauricio.async.db.Configuration;
import com.github.mauricio.async.db.Connection;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.asyncsql.impl.ScalaUtils;
import io.vertx.ext.asyncsql.impl.VertxEventLoopExecutionContext;
import java.util.ArrayDeque;
import java.util.Deque;
public abstract class AsyncConnectionPool {
public static final int DEFAULT_MAX_POOL_SIZE = 10;
public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0;
public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000;
private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class);
private final int maxPoolSize;
private final int maxConnectionRetries;
private final int connectionRetryDelay;
protected final Configuration connectionConfig;
protected final Vertx vertx;
private int poolSize = 0;
private final Deque<Connection> availableConnections = new ArrayDeque<>();
private final Deque<Handler<AsyncResult<Connection>>> waiters = new ArrayDeque<>();
public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Configuration connectionConfig) {
this.vertx = vertx;
this.maxPoolSize = globalConfig.getInteger("maxPoolSize", DEFAULT_MAX_POOL_SIZE);
this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES);
this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY);
this.connectionConfig = connectionConfig;
}
protected abstract Connection create();
private synchronized void createConnection(Handler<AsyncResult<Connection>> handler) {
poolSize += 1;
createAndConnect(new Handler<AsyncResult<Connection>>() {
int retries = 0;
@Override
public void handle(AsyncResult<Connection> connectionResult) {
if (connectionResult.succeeded()) {
handler.handle(connectionResult);
} else if (maxConnectionRetries < 0 || retries < maxConnectionRetries) {
retries++;
logger.debug("Error creating connection. Waiting " + connectionRetryDelay + " ms for retry " +
retries + (maxConnectionRetries >= 0 ? " of " + maxConnectionRetries : ""));
vertx.setTimer(connectionRetryDelay, timerId ->
createAndConnect(this)
);
} else {
poolSize -= 1;
notifyWaitersAboutAvailableConnection();
handler.handle(connectionResult);
}
}
});
}
private synchronized void createAndConnect(Handler<AsyncResult<Connection>> handler) {
try {
create()
.connect()
.onComplete(ScalaUtils.toFunction1(handler), VertxEventLoopExecutionContext.create(vertx));
} catch (Throwable e) {
logger.info("creating a connection went wrong", e);
handler.handle(Future.failedFuture(e));
}
}
private synchronized void waitForAvailableConnection(Handler<AsyncResult<Connection>> handler) {
waiters.add(handler);
}
private synchronized void createOrWaitForAvailableConnection(Handler<AsyncResult<Connection>> handler) {
if (poolSize < maxPoolSize) {
createConnection(handler);
} else {
waitForAvailableConnection(handler);
}
}
public synchronized void take(Handler<AsyncResult<Connection>> handler) {
Connection connection = availableConnections.poll();
if (connection == null) {
createOrWaitForAvailableConnection(handler);
} else {
if (connection.isConnected()) {
handler.handle(Future.succeededFuture(connection));
} else {
poolSize -= 1;
take(handler);
}
}
}
private synchronized void notifyWaitersAboutAvailableConnection() {
Handler<AsyncResult<Connection>> handler = waiters.poll();
if (handler != null) {
take(handler);
}
}
public synchronized void giveBack(Connection connection) {
if (connection.isConnected()) {
availableConnections.add(connection);
} else {
poolSize -= 1;
}
notifyWaitersAboutAvailableConnection();
}
public synchronized void close() {
availableConnections.forEach(Connection::disconnect);
}
public synchronized void close(Handler<AsyncResult<Void>> handler) {
close();
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
}