package io.vertx.sqlclient.impl;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.impl.command.CommandScheduler;
import io.vertx.sqlclient.impl.command.ExtendedQueryCommand;
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collector;
class QueryExecutor<T, R extends SqlResultBase<T>, L extends SqlResult<T>> {
private final io.vertx.core.spi.metrics.ClientMetrics metrics;
private final QueryTracer tracer;
private final Function<T, R> factory;
private final Collector<Row, ?, T> collector;
public QueryExecutor(QueryTracer tracer,
ClientMetrics metrics,
Function<T, R> factory,
Collector<Row, ?, T> collector) {
this.tracer = tracer;
this.metrics = metrics;
this.factory = factory;
this.collector = collector;
}
QueryTracer tracer() {
return tracer;
}
ClientMetrics metrics() {
return metrics;
}
private QueryResultBuilder<T, R, L> createHandler(PromiseInternal<L> promise, Object payload) {
return createHandler(promise, payload, null);
}
private QueryResultBuilder<T, R, L> createHandler(PromiseInternal<L> promise, Object payload, Object metric) {
return new QueryResultBuilder<>(factory, tracer, payload, metrics, metric, promise);
}
void executeSimpleQuery(CommandScheduler scheduler,
String sql,
boolean autoCommit,
boolean singleton,
PromiseInternal<L> promise) {
ContextInternal context = promise.context();
Object payload;
if (tracer != null) {
payload = tracer.sendRequest(context, sql);
} else {
payload = null;
}
Object metric;
if (metrics != null) {
metric = metrics.requestBegin(sql, sql);
metrics.requestEnd(metric);
} else {
metric = null;
}
QueryResultBuilder handler = createHandler(promise, payload, metric);
scheduler.schedule(new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler), handler);
}
QueryResultBuilder<T, R, L> executeExtendedQuery(CommandScheduler scheduler,
PreparedStatement preparedStatement,
boolean autoCommit,
Tuple arguments,
int fetch,
String cursorId,
boolean suspended,
PromiseInternal<L> promise) {
ContextInternal context = promise.context();
Object payload;
if (tracer != null) {
payload = tracer.sendRequest(context, preparedStatement.sql(), arguments);
} else {
payload = null;
}
Object metric;
if (metrics != null) {
metric = metrics.requestBegin(preparedStatement.sql(), preparedStatement.sql());
metrics.requestEnd(metric);
} else {
metric = null;
}
QueryResultBuilder handler = createHandler(promise, payload, metric);
String msg = preparedStatement.prepare((TupleInternal) arguments);
if (msg != null) {
handler.fail(msg);
return null;
}
ExtendedQueryCommand<T> cmd = ExtendedQueryCommand.createQuery(
preparedStatement.sql(),
preparedStatement,
arguments,
fetch,
cursorId,
suspended,
autoCommit,
collector,
handler);
scheduler.schedule(cmd, handler);
return handler;
}
void executeExtendedQuery(CommandScheduler scheduler, String sql, boolean autoCommit, Tuple arguments, PromiseInternal<L> promise) {
ContextInternal context = (ContextInternal) promise.context();
Object payload;
if (tracer != null) {
payload = tracer.sendRequest(context, sql, arguments);
} else {
payload = null;
}
Object metric;
if (metrics != null) {
metric = metrics.requestBegin(sql, sql);
metrics.requestEnd(metric);
} else {
metric = null;
}
QueryResultBuilder handler = this.createHandler(promise, payload, metric);
ExtendedQueryCommand cmd = createExtendedQueryCommand(sql, autoCommit, arguments, handler);
scheduler.schedule(cmd, handler);
}
private ExtendedQueryCommand<T> createExtendedQueryCommand(String sql,
boolean autoCommit,
Tuple tuple,
QueryResultBuilder<T, R, L> handler) {
return ExtendedQueryCommand.createQuery(
sql,
null,
tuple,
autoCommit,
collector,
handler);
}
void executeBatchQuery(CommandScheduler scheduler,
PreparedStatement preparedStatement,
boolean autoCommit,
List<Tuple> batch,
PromiseInternal<L> promise) {
ContextInternal context = (ContextInternal) promise.context();
Object payload;
if (tracer != null) {
payload = tracer.sendRequest(context, preparedStatement.sql(), batch);
} else {
payload = null;
}
Object metric;
if (metrics != null) {
metric = metrics.requestBegin(preparedStatement.sql(), preparedStatement.sql());
metrics.requestEnd(metric);
} else {
metric = null;
}
QueryResultBuilder handler = createHandler(promise, payload, metric);
for (Tuple args : batch) {
String msg = preparedStatement.prepare((TupleInternal)args);
if (msg != null) {
handler.fail(msg);
return;
}
}
ExtendedQueryCommand<T> cmd = ExtendedQueryCommand.createBatch(preparedStatement.sql(), preparedStatement, batch, autoCommit, collector, handler);
scheduler.schedule(cmd, handler);
}
void executeBatchQuery(CommandScheduler scheduler, String sql, boolean autoCommit, List<Tuple> batch, PromiseInternal<L> promise) {
ContextInternal context = (ContextInternal) promise.context();
Object payload;
if (tracer != null) {
payload = tracer.sendRequest(context, sql, batch);
} else {
payload = null;
}
Object metric;
if (metrics != null) {
metric = metrics.requestBegin(sql, sql);
metrics.requestEnd(metric);
} else {
metric = null;
}
QueryResultBuilder handler = createHandler(promise, payload, metric);
ExtendedQueryCommand<T> cmd = createBatchQueryCommand(sql, autoCommit, batch, handler);
scheduler.schedule(cmd, handler);
}
private ExtendedQueryCommand<T> createBatchQueryCommand(String sql,
boolean autoCommit,
List<Tuple> argsList,
QueryResultBuilder<T, R, L> handler) {
return ExtendedQueryCommand.createBatch(sql, null, argsList, autoCommit, collector, handler);
}
}