package io.vertx.mysqlclient.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.mysqlclient.MySQLAuthOptions;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLConnection;
import io.vertx.mysqlclient.MySQLSetOption;
import io.vertx.mysqlclient.impl.command.*;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.SqlConnectionImpl;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
public class MySQLConnectionImpl extends SqlConnectionImpl<MySQLConnectionImpl> implements MySQLConnection {
public static Future<MySQLConnection> connect(ContextInternal ctx, MySQLConnectOptions options) {
if (options.isUsingDomainSocket() && !ctx.owner().isNativeTransportEnabled()) {
return ctx.failedFuture("Native transport is not available");
}
MySQLConnectionFactory client;
try {
client = new MySQLConnectionFactory(ctx, options);
} catch (Exception e) {
return ctx.failedFuture(e);
}
ctx.addCloseHook(client);
QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options);
Promise<MySQLConnection> promise = ctx.promise();
ctx.emit(v -> connect(client, ctx, tracer, null, promise));
return promise.future();
}
private static void connect(MySQLConnectionFactory client, ContextInternal ctx, QueryTracer tracer, ClientMetrics metrics, Promise<MySQLConnection> promise) {
client.connect()
.map(conn -> {
MySQLConnectionImpl mySQLConnection = new MySQLConnectionImpl(client, ctx, conn, tracer, metrics);
conn.init(mySQLConnection);
return (MySQLConnection) mySQLConnection;
}).onComplete(promise);
}
private final MySQLConnectionFactory factory;
public MySQLConnectionImpl(MySQLConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
super(context, conn, tracer, metrics);
this.factory = factory;
}
@Override
public MySQLConnection ping(Handler<AsyncResult<Void>> handler) {
Future<Void> fut = ping();
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> ping() {
Promise<Void> promise = promise();
schedule(new PingCommand(), promise);
return promise.future();
}
@Override
public MySQLConnection specifySchema(String schemaName, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = specifySchema(schemaName);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> specifySchema(String schemaName) {
Promise<Void> promise = promise();
schedule(new InitDbCommand(schemaName), promise);
return promise.future();
}
@Override
public MySQLConnection getInternalStatistics(Handler<AsyncResult<String>> handler) {
Future<String> fut = getInternalStatistics();
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<String> getInternalStatistics() {
Promise<String> promise = promise();
schedule(new StatisticsCommand(), promise);
return promise.future();
}
@Override
public MySQLConnection setOption(MySQLSetOption option, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = setOption(option);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> setOption(MySQLSetOption option) {
Promise<Void> promise = promise();
schedule(new SetOptionCommand(option), promise);
return promise.future();
}
@Override
public MySQLConnection resetConnection(Handler<AsyncResult<Void>> handler) {
Future<Void> fut = resetConnection();
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> resetConnection() {
Promise<Void> promise = promise();
schedule(new ResetConnectionCommand(), promise);
return promise.future();
}
@Override
public MySQLConnection debug(Handler<AsyncResult<Void>> handler) {
Future<Void> fut = debug();
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> debug() {
Promise<Void> promise = promise();
schedule(new DebugCommand(), promise);
return promise.future();
}
@Override
public MySQLConnection changeUser(MySQLAuthOptions options, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = changeUser(options);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> changeUser(MySQLAuthOptions options) {
MySQLCollation collation;
if (options.getCollation() != null) {
collation = MySQLCollation.valueOfName(options.getCollation());
} else {
String charset = options.getCharset();
if (charset == null) {
collation = MySQLCollation.DEFAULT_COLLATION;
} else {
collation = MySQLCollation.valueOfName(MySQLCollation.getDefaultCollationFromCharsetName(charset));
}
}
Buffer serverRsaPublicKey = null;
if (options.getServerRsaPublicKeyValue() != null) {
serverRsaPublicKey = options.getServerRsaPublicKeyValue();
} else {
if (options.getServerRsaPublicKeyPath() != null) {
serverRsaPublicKey = context.owner().fileSystem().readFileBlocking(options.getServerRsaPublicKeyPath());
}
}
ChangeUserCommand cmd = new ChangeUserCommand(options.getUser(), options.getPassword(), options.getDatabase(), collation, serverRsaPublicKey, options.getProperties());
Promise<Void> promise = promise();
schedule(cmd, promise);
return promise.future();
}
}