package io.vertx.sqlclient.impl;
import io.vertx.core.*;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.pool.ConnectionPool;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
public abstract class PoolBase<P extends Pool> extends SqlClientBase<P> implements Pool, Closeable {
private final ContextInternal context;
private final VertxInternal vertx;
private final ConnectionFactory factory;
private final ConnectionPool pool;
private final CloseFuture closeFuture;
public PoolBase(ContextInternal context, ConnectionFactory factory, QueryTracer tracer, ClientMetrics metrics, PoolOptions poolOptions) {
super(tracer, metrics);
this.context = context;
this.vertx = context.owner();
this.factory = factory;
this.pool = new ConnectionPool(factory, context, poolOptions.getMaxSize(), poolOptions.getMaxWaitQueueSize());
this.closeFuture = new CloseFuture(this);
}
public CloseFuture closeFuture() {
return closeFuture;
}
@Override
protected <T> PromiseInternal<T> promise() {
return vertx.promise();
}
@Override
protected <T> PromiseInternal<T> promise(Handler<AsyncResult<T>> handler) {
return vertx.promise(handler);
}
public abstract void connect(Handler<AsyncResult<Connection>> completionHandler);
@Override
public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
Future<SqlConnection> fut = getConnection();
if (handler != null) {
fut.onComplete(handler);
}
}
@Override
public Future<SqlConnection> getConnection() {
ContextInternal current = vertx.getOrCreateContext();
Object metric;
if (metrics != null) {
metric = metrics.enqueueRequest();
} else {
metric = null;
}
Promise<Connection> promise = current.promise();
acquire(promise);
if (metrics != null) {
promise.future().onComplete(ar -> {
metrics.dequeueRequest(metric);
});
}
return promise.future().map(conn -> {
SqlConnectionImpl wrapper = wrap(current, conn);
conn.init(wrapper);
return wrapper;
});
}
@Override
public <R> void schedule(CommandBase<R> cmd, Promise<R> promise) {
Object metric;
if (metrics != null) {
metric = metrics.enqueueRequest();
} else {
metric = null;
}
acquire(new CommandWaiter() {
@Override
protected void onSuccess(Connection conn) {
if (metrics != null) {
metrics.dequeueRequest(metric);
}
conn.schedule(cmd, promise);
promise.future().onComplete(ar -> {
conn.close(this, Promise.promise());
});
}
@Override
protected void onFailure(Throwable cause) {
if (metrics != null) {
metrics.dequeueRequest(metric);
}
promise.fail(cause);
}
});
}
private void acquire(Handler<AsyncResult<Connection>> completionHandler) {
pool.acquire(completionHandler);
}
private static abstract class CommandWaiter implements Connection.Holder, Handler<AsyncResult<Connection>> {
protected abstract void onSuccess(Connection conn);
protected abstract void onFailure(Throwable cause);
@Override
public void handleEvent(Object event) {
}
@Override
public void handle(AsyncResult<Connection> ar) {
if (ar.succeeded()) {
Connection conn = ar.result();
conn.init(this);
onSuccess(conn);
} else {
onFailure(ar.cause());
}
}
@Override
public void handleClosed() {
}
@Override
public void handleException(Throwable err) {
}
}
protected abstract SqlConnectionImpl wrap(ContextInternal context, Connection conn);
@Override
public void close(Promise<Void> completion) {
doClose().onComplete(completion);
}
@Override
public Future<Void> close() {
Promise<Void> promise = vertx.promise();
closeFuture.close(promise);
return promise.future();
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
closeFuture.close(vertx.promise(handler));
}
private Future<Void> doClose() {
return pool.close().eventually(v -> {
PromiseInternal<Void> promise = context.promise();
factory.close(promise);
return promise;
}).onComplete(v -> {
if (metrics != null) {
metrics.close();
}
});
}
}