package io.vertx.mysqlclient.impl.codec;
import io.netty.buffer.ByteBuf;
import io.vertx.core.Future;
import io.vertx.mysqlclient.impl.datatype.DataFormat;
import io.vertx.mysqlclient.impl.protocol.CommandType;
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import static io.vertx.mysqlclient.impl.protocol.Packets.*;
class SimpleQueryCommandCodec<T> extends QueryCommandBaseCodec<T, SimpleQueryCommand<T>> {
SimpleQueryCommandCodec(SimpleQueryCommand<T> cmd) {
super(cmd, DataFormat.TEXT);
}
@Override
void encode(MySQLEncoder encoder) {
super.encode(encoder);
sendQueryCommand();
}
@Override
protected void handleInitPacket(ByteBuf payload) {
int firstByte = payload.getUnsignedByte(payload.readerIndex());
if (firstByte == OK_PACKET_HEADER) {
OkPacket okPacket = decodeOkPacketPayload(payload);
handleSingleResultsetDecodingCompleted(okPacket.serverStatusFlags(), okPacket.affectedRows(), okPacket.lastInsertId());
} else if (firstByte == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else if (firstByte == 0xFB) {
handleLocalInfile(payload);
} else {
handleResultsetColumnCountPacketBody(payload);
}
}
private void sendQueryCommand() {
ByteBuf packet = allocateBuffer();
int packetStartIdx = packet.writerIndex();
packet.writeMediumLE(0);
packet.writeByte(sequenceId);
packet.writeByte(CommandType.COM_QUERY);
packet.writeCharSequence(cmd.sql(), encoder.encodingCharset);
int payloadLength = packet.writerIndex() - packetStartIdx - 4;
packet.setMediumLE(packetStartIdx, payloadLength);
sendPacket(packet, payloadLength);
}
private void handleLocalInfile(ByteBuf payload) {
payload.skipBytes(1);
String filename = readRestOfPacketString(payload, StandardCharsets.UTF_8);
File file = new File(filename);
long fileLength = file.length();
List<Supplier<Future<Void>>> sendingFileInPacketContList = new ArrayList<>();
int offset = 0;
int remainingLen = (int) fileLength;
while (remainingLen >= PACKET_PAYLOAD_LENGTH_LIMIT) {
final int currentOffset = offset;
sendingFileInPacketContList.add(() -> sendFileInPacket(filename, currentOffset, PACKET_PAYLOAD_LENGTH_LIMIT));
remainingLen -= PACKET_PAYLOAD_LENGTH_LIMIT;
offset += PACKET_PAYLOAD_LENGTH_LIMIT;
}
final int tailLength = remainingLen;
final int tailOffset = offset;
Future<Void> cont = Future.succeededFuture();
for (Supplier<Future<Void>> futureSupplier : sendingFileInPacketContList) {
cont = cont.flatMap(v -> futureSupplier.get());
}
if (tailLength > 0) {
cont = cont.flatMap(v -> sendFileInPacket(filename, tailOffset, tailLength));
} else {
}
cont.onComplete(v -> sendEmptyPacket());
}
private Future<Void> sendFileInPacket(String filename, int offset, int length) {
ByteBuf packetHeader = allocateBuffer(4);
packetHeader.writeMediumLE(length);
packetHeader.writeByte(sequenceId++);
encoder.chctx.write(packetHeader);
return encoder.socketConnection.socket().sendFile(filename, offset, length);
}
private void sendEmptyPacket() {
ByteBuf packet = allocateBuffer(4);
packet.writeMediumLE(0);
packet.writeByte(sequenceId);
sendNonSplitPacket(packet);
}
}