package io.vertx.redis.client.impl;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.types.*;
public final class RESPParser implements Handler<Buffer> {
public static final String VERSION = "3";
private static final long MAX_STRING_LENGTH = 536870912;
private final ParserHandler handler;
private final ReadableBuffer buffer = new ReadableBuffer();
private final ArrayStack stack;
RESPParser(ParserHandler handler, int maxStack) {
this.handler = handler;
this.stack = new ArrayStack(maxStack);
}
private boolean eol = true;
private int bytesNeeded = 0;
@Override
public void handle(Buffer chunk) {
buffer.append(chunk);
while (buffer.readableBytes() >= (eol ? 3 : bytesNeeded != -1 ? bytesNeeded + 2 : 0)) {
buffer.mark();
if (eol) {
final byte type = buffer.readByte();
final int start = buffer.offset();
final int eol = buffer.findLineEnd();
if (eol == -1) {
buffer.reset();
break;
}
if (start == eol) {
buffer.reset();
break;
}
switch (type) {
case '+':
handleSimpleString(start, eol);
break;
case '-':
case '!':
handleError(eol);
break;
case ':':
case ',':
case '(':
handleNumber(type, eol);
break;
case '$':
case '=':
handleBulk(eol);
break;
case '*':
case '%':
case '~':
handleMulti(type, eol);
break;
case '_':
handleNull();
break;
case '#':
handleBoolean();
break;
case '|':
handleAttribute(eol);
break;
case '>':
handlePush(eol);
break;
default:
handler.fatal(ErrorType.create("ILLEGAL_STATE Unknown RESP type " + (char) type));
return;
}
} else {
if (bytesNeeded == 0) {
handleResponse(BulkType.EMPTY, false);
} else {
handleResponse(BulkType.create(buffer.readBytes(bytesNeeded)), false);
}
if (buffer.skipEOL()) {
eol = true;
} else {
buffer.reset();
}
}
}
}
private void handleNumber(byte type, int eol) {
try {
switch (type) {
case ':':
handleResponse(NumberType.create(buffer.readNumber(eol, ReadableBuffer.NumericType.INTEGER)), false);
break;
case ',':
handleResponse(NumberType.create(buffer.readNumber(eol, ReadableBuffer.NumericType.DECIMAL)), false);
break;
case '(':
handleResponse(NumberType.create(buffer.readNumber(eol, ReadableBuffer.NumericType.BIGINTEGER)), false);
break;
}
} catch (RuntimeException e) {
handler.fatal(e);
}
}
private long handleLength(int eol) {
final long integer;
try {
integer = buffer.readLong(eol);
} catch (RuntimeException e) {
handler.fatal(e);
return -1;
}
if (integer > Integer.MAX_VALUE) {
handler.fatal(ErrorType.create("ILLEGAL_STATE Redis Multi cannot be larger 2GB elements"));
return -1;
}
if (integer < 0) {
if (integer == -1L) {
handleResponse(null, false);
return -1;
}
handler.fatal(ErrorType.create("ILLEGAL_STATE Redis Multi cannot have negative length"));
return -1;
}
return integer;
}
private void handlePush(int eol) {
long len = handleLength(eol);
if (len >= 0) {
if (len == 0L) {
handler.fatal(ErrorType.create("ILLEGAL_STATE Redis Push must have at least 1 element"));
} else {
handleResponse(PushType.create(len), true);
}
}
}
private void handleAttribute(int eol) {
long len = handleLength(eol);
if (len >= 0L) {
if (len == 0L) {
handler.fatal(ErrorType.create("ILLEGAL_STATE Redis Push must have at least 1 element"));
} else {
handleResponse(AttributeType.create(len), true);
}
}
}
private void handleBoolean() {
byte value = buffer.readByte();
switch (value) {
case 't':
buffer.skipEOL();
handleResponse(BooleanType.TRUE, false);
break;
case 'f':
buffer.skipEOL();
handleResponse(BooleanType.FALSE, false);
break;
default:
handler.fatal(ErrorType.create("Invalid boolean value: " + ((char) value)));
}
}
private void handleSimpleString(int start, int eol) {
final int length = eol - start;
if (length == 2 && buffer.getByte(start) == 'O' && buffer.getByte(start + 1) == 'K') {
handleResponse(SimpleStringType.OK, false);
} else {
handleResponse(SimpleStringType.create(buffer.readLine(eol)), false);
}
}
private void handleError(int eol) {
handleResponse(ErrorType.create(buffer.readLine(eol)), false);
}
private void handleBulk(int eol) {
long len = handleLength(eol);
if (len >= 0L) {
if (len > MAX_STRING_LENGTH) {
handler.fatal(ErrorType.create("ILLEGAL_STATE Redis Bulk cannot be larger than 512MB"));
return;
}
bytesNeeded = (int) len;
this.eol = false;
}
}
private void handleMulti(byte type, int eol) {
long len = handleLength(eol);
if (len >= 0L) {
if (len == 0L) {
handleResponse(type == '%' ? MultiType.EMPTY_MAP : MultiType.EMPTY_MULTI, false);
} else {
handleResponse(MultiType.create(len, type == '%'), true);
}
}
}
private void handleNull() {
buffer.skipEOL();
handleResponse(null, false);
}
private void handleResponse(Response response, boolean push) {
final Multi multi = stack.peek();
if (multi != null) {
multi.add(response);
if (push) {
stack.push(response);
} else {
Multi m = multi;
while (m.complete()) {
stack.pop();
if (stack.empty()) {
if (m.type() != ResponseType.ATTRIBUTE) {
handler.handle(m);
}
return;
}
m = stack.peek();
if (m == null) {
handler.fatal(ErrorType.create("ILLEGAL_STATE Multi can't be null"));
return;
}
}
}
} else {
if (push) {
stack.push(response);
} else {
handler.handle(response);
}
}
}
}