package org.ehcache.clustered.common.internal.messages;
import org.ehcache.clustered.common.internal.store.Chain;
import org.ehcache.clustered.common.internal.store.Element;
import org.ehcache.clustered.common.internal.store.SequencedElement;
import org.terracotta.runnel.Struct;
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructArrayDecoder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructArrayEncoder;
import org.terracotta.runnel.encoding.StructEncoder;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static org.ehcache.clustered.common.internal.util.ChainBuilder.chainFromList;
public final class ChainCodec {
private ChainCodec() {
}
private static final Struct ELEMENT_STRUCT = StructBuilder.newStructBuilder()
.int64("sequence", 10)
.byteBuffer("payload", 20)
.build();
public static final Struct CHAIN_STRUCT = StructBuilder.newStructBuilder()
.structs("elements", 10, ELEMENT_STRUCT)
.build();
public static byte[] encode(Chain chain) {
StructEncoder<Void> encoder = CHAIN_STRUCT.encoder();
encode(encoder, chain);
ByteBuffer byteBuffer = encoder.encode();
return byteBuffer.array();
}
public static void encode(StructEncoder<?> encoder, Chain chain) {
StructArrayEncoder<? extends StructEncoder<?>> elementsEncoder = encoder.structs("elements");
for (Element element : chain) {
StructEncoder<?> elementEncoder = elementsEncoder.add();
if (element instanceof SequencedElement) {
elementEncoder.int64("sequence", ((SequencedElement) element).getSequenceNumber());
}
elementEncoder.byteBuffer("payload", element.getPayload());
elementEncoder.end();
}
elementsEncoder.end();
}
public static Chain decode(byte[] payload) {
StructDecoder<Void> decoder = CHAIN_STRUCT.decoder(ByteBuffer.wrap(payload));
return decode(decoder);
}
public static Chain decode(StructDecoder<?> decoder) {
StructArrayDecoder<? extends StructDecoder<?>> elementsDecoder = decoder.structs("elements");
final List<Element> elements = new ArrayList<>();
for (int i = 0; i < elementsDecoder.length(); i++) {
StructDecoder<?> elementDecoder = elementsDecoder.next();
Long sequence = elementDecoder.int64("sequence");
ByteBuffer byteBuffer = elementDecoder.byteBuffer("payload");
elementDecoder.end();
if (sequence == null) {
elements.add(byteBuffer::asReadOnlyBuffer);
} else {
elements.add(new SequencedElement() {
@Override
public long getSequenceNumber() {
return sequence;
}
@Override
public ByteBuffer getPayload() {
return byteBuffer.asReadOnlyBuffer();
}
});
}
}
elementsDecoder.end();
return chainFromList(elements);
}
}