package io.vertx.mysqlclient.impl.codec;
import io.netty.buffer.ByteBuf;
import io.vertx.mysqlclient.MySQLClient;
import io.vertx.mysqlclient.impl.MySQLRowDesc;
import io.vertx.mysqlclient.impl.datatype.DataFormat;
import io.vertx.mysqlclient.impl.protocol.ColumnDefinition;
import io.vertx.mysqlclient.impl.util.BufferUtils;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.impl.RowDesc;
import io.vertx.sqlclient.impl.command.CommandResponse;
import io.vertx.sqlclient.impl.command.QueryCommandBase;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collector;
import static io.vertx.mysqlclient.impl.protocol.Packets.*;
abstract class QueryCommandBaseCodec<T, C extends QueryCommandBase<T>> extends CommandCodec<Boolean, C> {
private final DataFormat format;
protected CommandHandlerState commandHandlerState = CommandHandlerState.INIT;
protected ColumnDefinition[] columnDefinitions;
protected RowResultDecoder<?, T> decoder;
private int currentColumn;
QueryCommandBaseCodec(C cmd, DataFormat format) {
super(cmd);
this.format = format;
}
private static <A, T> T emptyResult(Collector<Row, A, T> collector) {
return collector.finisher().apply(collector.supplier().get());
}
@Override
void decodePayload(ByteBuf payload, int payloadLength) {
switch (commandHandlerState) {
case INIT:
handleInitPacket(payload);
break;
case HANDLING_COLUMN_DEFINITION:
handleResultsetColumnDefinitions(payload);
break;
case COLUMN_DEFINITIONS_DECODING_COMPLETED:
skipEofPacketIfNeeded(payload);
handleResultsetColumnDefinitionsDecodingCompleted();
break;
case HANDLING_ROW_DATA_OR_END_PACKET:
handleRows(payload, payloadLength);
break;
}
}
protected abstract void handleInitPacket(ByteBuf payload);
protected void handleResultsetColumnCountPacketBody(ByteBuf payload) {
int columnCount = decodeColumnCountPacketPayload(payload);
commandHandlerState = CommandHandlerState.HANDLING_COLUMN_DEFINITION;
columnDefinitions = new ColumnDefinition[columnCount];
}
protected void handleResultsetColumnDefinitions(ByteBuf payload) {
ColumnDefinition def = decodeColumnDefinitionPacketPayload(payload);
columnDefinitions[currentColumn++] = def;
if (currentColumn == columnDefinitions.length) {
if (isDeprecatingEofFlagEnabled()) {
handleResultsetColumnDefinitionsDecodingCompleted();
} else {
commandHandlerState = CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED;
}
}
}
protected void handleResultsetColumnDefinitionsDecodingCompleted() {
commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET;
decoder = new RowResultDecoder<>(cmd.collector(), new MySQLRowDesc(columnDefinitions, format));
}
protected void handleRows(ByteBuf payload, int payloadLength) {
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
}
else if (first == EOF_PACKET_HEADER && payloadLength < 0xFFFFFF) {
int serverStatusFlags;
long affectedRows = -1;
long lastInsertId = -1;
if (isDeprecatingEofFlagEnabled()) {
OkPacket okPacket = decodeOkPacketPayload(payload);
serverStatusFlags = okPacket.serverStatusFlags();
affectedRows = okPacket.affectedRows();
lastInsertId = okPacket.lastInsertId();
} else {
serverStatusFlags = decodeEofPacketPayload(payload).serverStatusFlags();
}
handleSingleResultsetDecodingCompleted(serverStatusFlags, affectedRows, lastInsertId);
} else {
decoder.handleRow(columnDefinitions.length, payload);
}
}
protected void handleSingleResultsetDecodingCompleted(int serverStatusFlags, long affectedRows, long lastInsertId) {
handleSingleResultsetEndPacket(serverStatusFlags, affectedRows, lastInsertId);
resetIntermediaryResult();
if (isDecodingCompleted(serverStatusFlags)) {
handleAllResultsetDecodingCompleted();
}
}
protected boolean isDecodingCompleted(int serverStatusFlags) {
return (serverStatusFlags & ServerStatusFlags.SERVER_MORE_RESULTS_EXISTS) == 0;
}
private void handleSingleResultsetEndPacket(int serverStatusFlags, long affectedRows, long lastInsertId) {
this.result = (serverStatusFlags & ServerStatusFlags.SERVER_STATUS_LAST_ROW_SENT) == 0;
T result;
Throwable failure;
int size;
RowDesc rowDesc;
if (decoder != null) {
failure = decoder.complete();
result = decoder.result();
rowDesc = decoder.rowDesc;
size = decoder.size();
decoder.reset();
} else {
result = emptyResult(cmd.collector());
failure = null;
size = 0;
rowDesc = null;
}
cmd.resultHandler().handleResult((int) affectedRows, size, rowDesc, result, failure);
cmd.resultHandler().addProperty(MySQLClient.LAST_INSERTED_ID, lastInsertId);
}
protected void handleAllResultsetDecodingCompleted() {
CommandResponse<Boolean> response;
if (this.failure != null) {
response = CommandResponse.failure(this.failure);
} else {
response = CommandResponse.success(this.result);
}
completionHandler.handle(response);
}
private int decodeColumnCountPacketPayload(ByteBuf payload) {
long columnCount = BufferUtils.readLengthEncodedInteger(payload);
return (int) columnCount;
}
private void resetIntermediaryResult() {
commandHandlerState = CommandHandlerState.INIT;
columnDefinitions = null;
currentColumn = 0;
}
protected enum CommandHandlerState {
INIT,
HANDLING_COLUMN_DEFINITION,
COLUMN_DEFINITIONS_DECODING_COMPLETED,
HANDLING_ROW_DATA_OR_END_PACKET
}
}