package com.datastax.oss.driver.internal.core.protocol;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.FrameCodec;
import com.datastax.oss.protocol.internal.Segment;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.ArrayList;
import java.util.List;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@NotThreadSafe
public class SegmentToFrameDecoder extends MessageToMessageDecoder<Segment<ByteBuf>> {
private static final Logger LOG = LoggerFactory.getLogger(SegmentToFrameDecoder.class);
private static final int UNKNOWN_LENGTH = Integer.MIN_VALUE;
private final FrameCodec<ByteBuf> frameCodec;
private final String logPrefix;
private int targetLength = UNKNOWN_LENGTH;
private final List<ByteBuf> accumulatedSlices = new ArrayList<>();
private int accumulatedLength;
public SegmentToFrameDecoder(@NonNull FrameCodec<ByteBuf> frameCodec, @NonNull String logPrefix) {
this.logPrefix = logPrefix;
this.frameCodec = frameCodec;
}
@Override
protected void decode(
@NonNull ChannelHandlerContext ctx,
@NonNull Segment<ByteBuf> segment,
@NonNull List<Object> out) {
if (segment.isSelfContained) {
decodeSelfContained(segment, out);
} else {
decodeSlice(segment, ctx.alloc(), out);
}
}
private void decodeSelfContained(Segment<ByteBuf> segment, List<Object> out) {
ByteBuf payload = segment.payload;
int frameCount = 0;
do {
Frame frame = frameCodec.decode(payload);
LOG.trace(
"[{}] Decoded response frame {} from self-contained segment", logPrefix, frame.streamId);
out.add(frame);
frameCount += 1;
} while (payload.isReadable());
payload.release();
LOG.trace("[{}] Done processing self-contained segment ({} frames)", logPrefix, frameCount);
}
private void decodeSlice(Segment<ByteBuf> segment, ByteBufAllocator allocator, List<Object> out) {
assert targetLength != UNKNOWN_LENGTH ^ (accumulatedSlices.isEmpty() && accumulatedLength == 0);
ByteBuf slice = segment.payload;
if (targetLength == UNKNOWN_LENGTH) {
targetLength = FrameCodec.V3_ENCODED_HEADER_SIZE + frameCodec.decodeBodySize(slice);
}
accumulatedSlices.add(slice);
accumulatedLength += slice.readableBytes();
LOG.trace(
"[{}] Decoded slice {}, {}/{} bytes",
logPrefix,
accumulatedSlices.size(),
accumulatedLength,
targetLength);
assert accumulatedLength <= targetLength;
if (accumulatedLength == targetLength) {
CompositeByteBuf encodedFrame = allocator.compositeBuffer(accumulatedSlices.size());
encodedFrame.addComponents(true, accumulatedSlices);
Frame frame = frameCodec.decode(encodedFrame);
LOG.trace(
"[{}] Decoded response frame {} from {} slices",
logPrefix,
frame.streamId,
accumulatedSlices.size());
out.add(frame);
targetLength = UNKNOWN_LENGTH;
accumulatedSlices.clear();
accumulatedLength = 0;
}
}
}