package io.vertx.ext.jdbc.impl;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.jdbc.impl.actions.AbstractJDBCAction;
import io.vertx.ext.jdbc.impl.actions.JDBCQuery;
import io.vertx.ext.jdbc.impl.actions.JDBCStatementHelper;
import io.vertx.ext.jdbc.impl.actions.JDBCUpdate;
import io.vertx.ext.jdbc.spi.DataSourceProvider;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class JDBCClientImpl implements JDBCClient, Closeable {
private static final String DS_LOCAL_MAP_NAME = "__vertx.JDBCClient.datasources";
private final VertxInternal vertx;
private final String datasourceName;
private final JsonObject config;
private final Map<String, DataSourceHolder> holders;
private final JDBCStatementHelper helper;
private boolean closed;
public JDBCClientImpl(Vertx vertx, DataSource dataSource) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(dataSource);
this.vertx = (VertxInternal) vertx;
datasourceName = UUID.randomUUID().toString();
config = null;
holders = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);
DataSourceHolder holder = new DataSourceHolder(dataSource, createExecutor(), createMetrics(datasourceName, -1));
holders.put(datasourceName, holder);
this.helper = new JDBCStatementHelper();
setupCloseHook();
}
public JDBCClientImpl(Vertx vertx, JsonObject config, String datasourceName) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(config);
Objects.requireNonNull(datasourceName);
this.vertx = (VertxInternal) vertx;
this.datasourceName = datasourceName;
this.config = config;
holders = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);
DataSourceProvider provider = createProvider();
holders.compute(datasourceName, (k, h) -> h == null ? new DataSourceHolder(provider) : h.increment());
this.helper = new JDBCStatementHelper(config);
setupCloseHook();
}
public JDBCClientImpl(Vertx vertx, DataSourceProvider dataSourceProvider) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(dataSourceProvider);
this.vertx = (VertxInternal) vertx;
this.datasourceName = UUID.randomUUID().toString();
this.config = new JsonObject();
holders = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);
holders.compute(datasourceName, (k, h) -> h == null ? new DataSourceHolder(dataSourceProvider) : h.increment());
this.helper = new JDBCStatementHelper(config);
setupCloseHook();
}
private void setupCloseHook() {
ContextInternal ctx = vertx.getContext();
if (ctx != null) {
ctx.addCloseHook(this);
}
}
public JDBCStatementHelper getHelper() {
return helper;
}
@Override
public void close(Promise<Void> completion) {
close((Handler<AsyncResult<Void>>) completion);
}
@Override
public void close() {
close(null);
}
@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
if (raiseCloseFlag()) {
do {
DataSourceHolder current = holders.get(datasourceName);
DataSourceHolder next = current.decrement();
if (next.refCount == 0) {
if (holders.remove(datasourceName, current)) {
if (current.dataSource != null) {
doClose(current, completionHandler);
return;
}
break;
}
} else if (holders.replace(datasourceName, current, next)) {
break;
}
} while (true);
}
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
}
private synchronized boolean raiseCloseFlag() {
if (!closed) {
closed = true;
return true;
}
return false;
}
@Override
public JDBCClient update(String sql, Handler<AsyncResult<UpdateResult>> resultHandler) {
executeDirect(new JDBCUpdate(helper, null, sql, null), resultHandler);
return this;
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
@Override
public JDBCClient updateWithParams(String sql, JsonArray in, Handler<AsyncResult<UpdateResult>> resultHandler) {
executeDirect(new JDBCUpdate(helper, null, sql, in), resultHandler);
return this;
}
@Override
public JDBCClient query(String sql, Handler<AsyncResult<ResultSet>> resultHandler) {
executeDirect(new JDBCQuery(helper, null, sql, null), resultHandler);
return this;
}
@Override
public JDBCClient queryWithParams(String sql, JsonArray in, Handler<AsyncResult<ResultSet>> resultHandler) {
executeDirect(new JDBCQuery(helper, null, sql, in), resultHandler);
return this;
}
private <T> void executeDirect(AbstractJDBCAction<T> action, Handler<AsyncResult<T>> handler) {
getConnection().flatMap(sqlConnection -> {
JDBCConnectionImpl conn = (JDBCConnectionImpl) sqlConnection;
return conn.schedule(action).onComplete(v -> conn.close());
}).onComplete(handler);
}
public Future<SQLConnection> getConnection() {
return getConnection(vertx.getOrCreateContext());
}
public Future<SQLConnection> getConnection(ContextInternal ctx) {
return getDataSourceHolder(ctx).flatMap(holder -> {
Promise<SQLConnection> res = ctx.promise();
boolean enabled = holder.metrics != null;
Object queueMetric = enabled ? holder.metrics.submitted() : null;
holder.exec.execute(() -> {
try {
Connection conn = holder.dataSource.getConnection();
Object execMetric = enabled ? holder.metrics.begin(queueMetric) : null;
res.complete(new JDBCConnectionImpl(ctx, helper, conn, holder.metrics, execMetric));
} catch (SQLException e) {
if (enabled) {
holder.metrics.rejected(queueMetric);
}
res.fail(e);
}
});
return res.future();
});
}
private synchronized Future<DataSourceHolder> getDataSourceHolder(ContextInternal ctx) {
if (closed) {
return ctx.failedFuture("Client is closed");
}
DataSourceHolder holder = holders.get(datasourceName);
if (holder.dataSource != null) {
return ctx.succeededFuture(holder);
}
return ctx.executeBlocking(promise -> {
createDataSource(promise);
}, holder.creationQueue);
}
private void createDataSource(Promise<DataSourceHolder> promise) {
DataSourceHolder current = holders.get(datasourceName);
if (current == null) {
promise.fail("Client closed while connecting");
return;
}
if (current.dataSource != null) {
promise.complete(current);
return;
}
DataSourceProvider provider = current.provider;
DataSource dataSource;
int poolSize;
try {
dataSource = provider.getDataSource(config);
poolSize = provider.maximumPoolSize(dataSource, config);
} catch (SQLException e) {
promise.fail(e);
return;
}
ExecutorService exec = createExecutor();
PoolMetrics metrics = createMetrics(datasourceName, poolSize);
current = holders.compute(datasourceName, (k, h) -> h == null ? null : h.created(dataSource, exec, metrics));
if (current != null) {
promise.complete(current);
} else {
if (metrics != null) {
metrics.close();
}
exec.shutdown();
try {
provider.close(dataSource);
} catch (SQLException ignored) {
}
promise.fail("Client closed while connecting");
}
}
@Override
public SQLClient getConnection(Handler<AsyncResult<SQLConnection>> handler) {
getConnection().onComplete(handler);
return this;
}
private DataSourceProvider createProvider() {
String providerClass = config.getString("provider_class");
if (providerClass == null) {
providerClass = JDBCClient.DEFAULT_PROVIDER_CLASS;
}
if (Thread.currentThread().getContextClassLoader() != null) {
try {
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(providerClass);
return (DataSourceProvider) clazz.newInstance();
} catch (ClassNotFoundException e) {
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
try {
Class clazz = this.getClass().getClassLoader().loadClass(providerClass);
return (DataSourceProvider) clazz.newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private PoolMetrics createMetrics(String poolName, int maxPoolSize) {
VertxMetrics metricsSPI = vertx.metricsSPI();
return metricsSPI != null ? metricsSPI.createPoolMetrics("datasource", poolName, maxPoolSize) : null;
}
private ExecutorService createExecutor() {
return new ThreadPoolExecutor(1, 1,
1000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
(r -> new Thread(r, "vertx-jdbc-service-get-connection-thread")));
}
private void doClose(DataSourceHolder holder, Handler<AsyncResult<Void>> completionHandler) {
if (holder.metrics != null) {
holder.metrics.close();
}
vertx.<Void>executeBlocking(promise -> {
try {
if (holder.provider != null) {
holder.provider.close(holder.dataSource);
}
promise.complete();
} catch (SQLException e) {
promise.fail(e);
}
}, false, ar -> {
holder.exec.shutdown();
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
});
}
}