package io.vertx.jdbcclient.impl.actions;
import io.vertx.ext.jdbc.impl.actions.AbstractJDBCAction;
import io.vertx.ext.jdbc.impl.actions.JDBCStatementHelper;
import io.vertx.ext.sql.SQLOptions;
import io.vertx.jdbcclient.impl.JDBCRow;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.impl.RowDesc;
import java.math.BigDecimal;
import java.sql.*;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collector;
public abstract class JDBCQueryAction<C, R> extends AbstractJDBCAction<JDBCResponse<R>> {
private final Collector<Row, C, R> collector;
public JDBCQueryAction(JDBCStatementHelper helper, SQLOptions options, Collector<Row, C, R> collector) {
super(helper, options);
this.collector = collector;
}
protected JDBCResponse<R> decode(Statement statement, boolean returnedResultSet, boolean returnedKeys, List<Integer> out) throws SQLException {
final JDBCResponse<R> response = new JDBCResponse<>(statement.getUpdateCount());
if (returnedResultSet) {
while (returnedResultSet) {
try (ResultSet rs = statement.getResultSet()) {
decodeResultSet(rs, response);
}
returnedResultSet = statement.getMoreResults();
}
} else {
collector.accumulator();
C container = collector.supplier().get();
response.empty(collector.finisher().apply(container));
}
if (returnedKeys) {
decodeReturnedKeys(statement, response);
}
if (out.size() > 0) {
decodeOutput((CallableStatement) statement, out, response);
}
return response;
}
protected JDBCResponse<R> decode(Statement statement, int[] returnedBatchResult, boolean returnedKeys) throws SQLException {
final JDBCResponse<R> response = new JDBCResponse<>(returnedBatchResult.length);
BiConsumer<C, Row> accumulator = collector.accumulator();
RowDesc desc = new RowDesc(Collections.emptyList());
C container = collector.supplier().get();
for (int result : returnedBatchResult) {
Row row = new JDBCRow(desc);
row.addValue(result);
accumulator.accept(container, row);
}
response
.push(collector.finisher().apply(container), desc, returnedBatchResult.length);
if (returnedKeys) {
decodeReturnedKeys(statement, response);
}
return response;
}
private void decodeResultSet(ResultSet rs, JDBCResponse<R> response) throws SQLException {
BiConsumer<C, Row> accumulator = collector.accumulator();
List<String> columnNames = new ArrayList<>();
RowDesc desc = new RowDesc(columnNames);
C container = collector.supplier().get();
int size = 0;
ResultSetMetaData metaData = rs.getMetaData();
int cols = metaData.getColumnCount();
for (int i = 1; i <= cols; i++) {
columnNames.add(metaData.getColumnLabel(i));
}
while (rs.next()) {
size++;
Row row = new JDBCRow(desc);
for (int i = 1; i <= cols; i++) {
Object res = convertSqlValue(rs.getObject(i));
row.addValue(res);
}
accumulator.accept(container, row);
}
response
.push(collector.finisher().apply(container), desc, size);
}
private R decodeRawResultSet(ResultSet rs) throws SQLException {
BiConsumer<C, Row> accumulator = collector.accumulator();
List<String> columnNames = new ArrayList<>();
RowDesc desc = new RowDesc(columnNames);
C container = collector.supplier().get();
ResultSetMetaData metaData = rs.getMetaData();
int cols = metaData.getColumnCount();
for (int i = 1; i <= cols; i++) {
columnNames.add(metaData.getColumnLabel(i));
}
while (rs.next()) {
Row row = new JDBCRow(desc);
for (int i = 1; i <= cols; i++) {
Object res = convertSqlValue(rs.getObject(i));
row.addValue(res);
}
accumulator.accept(container, row);
}
return collector.finisher().apply(container);
}
private void decodeOutput(CallableStatement cs, List<Integer> out, JDBCResponse<R> output) throws SQLException {
BiConsumer<C, Row> accumulator = collector.accumulator();
C container = collector.supplier().get();
Row row = new JDBCRow(new RowDesc(Collections.emptyList()));
for (Integer idx : out) {
if (cs.getObject(idx) instanceof ResultSet) {
row.addValue(decodeRawResultSet((ResultSet) cs.getObject(idx)));
} else {
Object res = convertSqlValue(cs.getObject(idx));
row.addValue(res);
}
}
accumulator.accept(container, row);
R result = collector.finisher().apply(container);
output.outputs(result, null, 1);
}
private void decodeReturnedKeys(Statement statement, JDBCResponse<R> response) throws SQLException {
Row keys = null;
ResultSet keysRS = statement.getGeneratedKeys();
if (keysRS != null) {
List<String> keysColumnNames = new ArrayList<>();
RowDesc keysDesc = new RowDesc(keysColumnNames);
ResultSetMetaData metaData = keysRS.getMetaData();
int cols = metaData.getColumnCount();
for (int i = 1; i <= cols; i++) {
keysColumnNames.add(metaData.getColumnLabel(i));
}
if (keysRS.next()) {
keys = new JDBCRow(keysDesc);
for (int i = 1; i <= cols; i++) {
Object res = convertSqlValue(keysRS.getObject(i));
keys.addValue(res);
}
}
}
response.returnedKeys(keys);
}
public static Object convertSqlValue(Object value) throws SQLException {
if (value == null) {
return null;
}
if (value instanceof Boolean || value instanceof String || value instanceof byte[]) {
return value;
}
if (value instanceof Number) {
if (value instanceof BigDecimal) {
BigDecimal d = (BigDecimal) value;
if (d.scale() == 0) {
return ((BigDecimal) value).toBigInteger();
} else {
return ((BigDecimal) value).doubleValue();
}
}
return value;
}
if (value instanceof Time) {
return ((Time) value).toLocalTime();
}
if (value instanceof Date) {
return ((Date) value).toLocalDate();
}
if (value instanceof Timestamp) {
return ((Timestamp) value).toInstant().atOffset(ZoneOffset.UTC);
}
if (value instanceof Clob) {
Clob c = (Clob) value;
try {
return c.getSubString(1, (int) c.length());
} finally {
try {
c.free();
} catch (AbstractMethodError | SQLFeatureNotSupportedException e) {
}
}
}
if (value instanceof Blob) {
Blob b = (Blob) value;
try {
return b.getBytes(1, (int) b.length());
} finally {
try {
b.free();
} catch (AbstractMethodError | SQLFeatureNotSupportedException e) {
}
}
}
if (value instanceof Array) {
Array a = (Array) value;
try {
Object[] arr = (Object[]) a.getArray();
if (arr != null) {
Object[] castedArray = new Object[arr.length];
for (int i = 0; i < arr.length; i++) {
castedArray[i] = convertSqlValue(arr[i]);
}
return castedArray;
}
} finally {
a.free();
}
}
if (value instanceof RowId) {
return ((RowId) value).getBytes();
}
if (value instanceof Struct) {
return Tuple.of(((Struct) value).getAttributes());
}
return value.toString();
}
}