package io.vertx.ext.jdbc.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.shareddata.Shareable;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.jdbc.impl.actions.AbstractJDBCAction;
import io.vertx.ext.jdbc.impl.actions.JDBCQuery;
import io.vertx.ext.jdbc.impl.actions.JDBCStatementHelper;
import io.vertx.ext.jdbc.impl.actions.JDBCUpdate;
import io.vertx.ext.jdbc.spi.DataSourceProvider;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class JDBCClientImpl implements JDBCClient {
private static final String DS_LOCAL_MAP_NAME = "__vertx.JDBCClient.datasources";
private final Vertx vertx;
private final DataSourceHolder holder;
private final ExecutorService exec;
private final DataSource ds;
private final PoolMetrics metrics;
private final JDBCStatementHelper helper;
public JDBCClientImpl(Vertx vertx, DataSource dataSource) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(dataSource);
this.vertx = vertx;
this.holder = new DataSourceHolder((VertxInternal) vertx, dataSource);
this.exec = holder.exec();
this.ds = dataSource;
this.metrics = holder.metrics;
this.helper = new JDBCStatementHelper();
setupCloseHook();
}
public JDBCClientImpl(Vertx vertx, JsonObject config, String datasourceName) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(config);
Objects.requireNonNull(datasourceName);
this.vertx = vertx;
this.holder = lookupHolder(datasourceName, config);
this.exec = holder.exec();
this.ds = holder.ds();
this.metrics = holder.metrics;
this.helper = new JDBCStatementHelper(config);
setupCloseHook();
}
private void setupCloseHook() {
Context ctx = Vertx.currentContext();
if (ctx != null && ctx.owner() == vertx) {
ctx.addCloseHook(holder::close);
}
}
@Override
public void close() {
holder.close(null);
}
@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
holder.close(completionHandler);
}
@Override
public JDBCClient update(String sql, Handler<AsyncResult<UpdateResult>> resultHandler) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
executeDirect(ctx, new JDBCUpdate(vertx, helper, null, ctx, sql, null), resultHandler);
return this;
}
@Override
public JDBCClient updateWithParams(String sql, JsonArray in, Handler<AsyncResult<UpdateResult>> resultHandler) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
executeDirect(ctx, new JDBCUpdate(vertx, helper, null, ctx, sql, in), resultHandler);
return this;
}
@Override
public JDBCClient query(String sql, Handler<AsyncResult<ResultSet>> resultHandler) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
executeDirect(ctx, new JDBCQuery(vertx, helper, null, ctx, sql, null), resultHandler);
return this;
}
@Override
public JDBCClient queryWithParams(String sql, JsonArray in, Handler<AsyncResult<ResultSet>> resultHandler) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
executeDirect(ctx, new JDBCQuery(vertx, helper, null, ctx, sql, in), resultHandler);
return this;
}
private <T> void executeDirect(Context ctx, AbstractJDBCAction<T> action, Handler<AsyncResult<T>> handler) {
getConnection(ctx, ar1 -> {
Promise<T> promise = Promise.promise();
promise.future().setHandler(ar2 -> ctx.runOnContext(v -> handler.handle(ar2)));
if (ar1.succeeded()) {
JDBCConnectionImpl conn = (JDBCConnectionImpl) ar1.result();
try {
T result = action.execute(conn.conn);
promise.complete(result);
} catch (Exception e) {
promise.fail(e);
} finally {
if (metrics != null) {
metrics.end(conn.metric, true);
}
try {
conn.conn.close();
} catch (Exception e) {
JDBCConnectionImpl.log.error("Failure in closing connection", ar1.cause());
}
}
} else {
promise.fail(ar1.cause());
}
});
}
private void getConnection(Context ctx, Handler<AsyncResult<SQLConnection>> handler) {
boolean enabled = metrics != null && metrics.isEnabled();
Object queueMetric = enabled ? metrics.submitted() : null;
PoolMetrics metrics = enabled ? this.metrics : null;
exec.execute(() -> {
Promise<SQLConnection> res = Promise.promise();
res.future().setHandler(handler);
try {
Connection conn = ds.getConnection();
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
res.complete(new JDBCConnectionImpl(ctx, helper, conn, metrics, execMetric));
} catch (SQLException e) {
if (metrics != null) {
metrics.rejected(queueMetric);
}
res.fail(e);
}
});
}
@Override
public SQLClient getConnection(Handler<AsyncResult<SQLConnection>> handler) {
Context ctx = vertx.getOrCreateContext();
getConnection(ctx, ar -> ctx.runOnContext(v -> handler.handle(ar)));
return this;
}
private DataSourceHolder lookupHolder(String datasourceName, JsonObject config) {
synchronized (vertx) {
LocalMap<String, DataSourceHolder> map = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);
DataSourceHolder theHolder = map.get(datasourceName);
if (theHolder == null) {
theHolder = new DataSourceHolder((VertxInternal) vertx, config, map, datasourceName);
} else {
theHolder.incRefCount();
}
return theHolder;
}
}
private class DataSourceHolder implements Shareable {
private final VertxInternal vertx;
private final LocalMap<String, DataSourceHolder> map;
DataSourceProvider provider;
JsonObject config;
DataSource ds;
PoolMetrics metrics;
ExecutorService exec;
private int refCount = 1;
private final String name;
DataSourceHolder(VertxInternal vertx, DataSource ds) {
this.ds = ds;
this.metrics = vertx.metricsSPI() != null ? vertx.metricsSPI().createPoolMetrics("datasource", UUID.randomUUID().toString(), -1) : null;
this.vertx = vertx;
this.map = null;
this.name = null;
}
DataSourceHolder(VertxInternal vertx, JsonObject config, LocalMap<String, DataSourceHolder> map, String name) {
this.config = config;
this.map = map;
this.vertx = vertx;
this.name = name;
map.put(name, this);
}
synchronized DataSource ds() {
if (ds == null) {
String providerClass = config.getString("provider_class");
if (providerClass == null) {
providerClass = DEFAULT_PROVIDER_CLASS;
}
VertxMetrics vertxMetrics = vertx.metricsSPI();
if (Thread.currentThread().getContextClassLoader() != null) {
try {
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(providerClass);
provider = (DataSourceProvider) clazz.newInstance();
ds = provider.getDataSource(config);
int poolSize = provider.maximumPoolSize(ds, config);
metrics = vertxMetrics != null ? vertxMetrics.createPoolMetrics( "datasource", name, poolSize) : null;
return ds;
} catch (ClassNotFoundException e) {
} catch (InstantiationException | SQLException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
try {
Class clazz = this.getClass().getClassLoader().loadClass(providerClass);
provider = (DataSourceProvider) clazz.newInstance();
ds = provider.getDataSource(config);
int poolSize = provider.maximumPoolSize(ds, config);
metrics = vertxMetrics != null ? vertxMetrics.createPoolMetrics( "datasource", name, poolSize) : null;
return ds;
} catch (ClassNotFoundException | InstantiationException | SQLException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return ds;
}
synchronized ExecutorService exec() {
if (exec == null) {
exec = new ThreadPoolExecutor(1, 1,
1000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
(r -> new Thread(r, "vertx-jdbc-service-get-connection-thread")));
}
return exec;
}
void incRefCount() {
refCount++;
}
void close(Handler<AsyncResult<Void>> completionHandler) {
synchronized (vertx) {
if (--refCount == 0) {
if (metrics != null) {
metrics.close();
}
Promise<Void> f1 = Promise.promise();
Promise<Void> f2 = Promise.promise();
if (completionHandler != null) {
CompositeFuture.all(f1.future(), f2.future()).<Void>map(f -> null).setHandler(completionHandler);
}
if (provider != null) {
vertx.executeBlocking(future -> {
try {
provider.close(ds);
future.complete();
} catch (SQLException e) {
future.fail(e);
}
}, f2);
} else {
f2.complete();
}
try {
if (exec != null) {
exec.shutdown();
}
if (map != null) {
map.remove(name);
if (map.isEmpty()) {
map.close();
}
}
f1.complete();
} catch (Throwable t) {
f1.fail(t);
}
} else {
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
}
}
}
}
}