package org.ehcache.clustered.common.internal.messages;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.AppendMessage;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ClearMessage;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ClientInvalidationAck;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ClientInvalidationAllAck;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.GetAndAppendMessage;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.GetMessage;
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ReplaceAtHeadMessage;
import org.ehcache.clustered.common.internal.store.Chain;
import org.terracotta.runnel.Struct;
import org.terracotta.runnel.decoding.StructDecoder;
import java.nio.ByteBuffer;
import java.util.UUID;
import static org.ehcache.clustered.common.internal.messages.ChainCodec.CHAIN_STRUCT;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.EHCACHE_MESSAGE_TYPES_ENUM_MAPPING;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.MESSAGE_TYPE_FIELD_INDEX;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.MESSAGE_TYPE_FIELD_NAME;
import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.KEY_FIELD;
import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.encodeMandatoryFields;
import static org.terracotta.runnel.StructBuilder.newStructBuilder;
public class ServerStoreOpCodec {
private static final Struct GET_AND_APPEND_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 30)
.byteBuffer("payload", 40)
.build();
private static final Struct APPEND_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 30)
.byteBuffer("payload", 40)
.build();
private static final Struct REPLACE_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 30)
.struct("expect", 40, CHAIN_STRUCT)
.struct("update", 50, CHAIN_STRUCT)
.build();
private static final Struct CLIENT_INVALIDATION_ACK_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 20)
.int32("invalidationId", 30)
.build();
private static final Struct CLIENT_INVALIDATION_ALL_ACK_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int32("invalidationId", 40)
.build();
private static final Struct CLEAR_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.build();
private static final Struct GET_MESSAGE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 30)
.build();
private static final Struct LOCK_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int64("hash", 30)
.build();
private static final Struct ITERATOR_OPEN_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.int32("batchSize", 20)
.build();
private static final Struct ITERATOR_CLOSE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.string("id", 20)
.build();
private static final Struct ITERATOR_ADVANCE_STRUCT = newStructBuilder()
.enm(MESSAGE_TYPE_FIELD_NAME, MESSAGE_TYPE_FIELD_INDEX, EHCACHE_MESSAGE_TYPES_ENUM_MAPPING)
.string("id", 20)
.int32("batchSize", 30)
.build();
public byte[] encode(ServerStoreOpMessage message) {
switch (message.getMessageType()) {
case GET_STORE:
GetMessage getMessage = (GetMessage) message;
return encodeMandatoryFields(GET_MESSAGE_STRUCT, message)
.int64(KEY_FIELD, getMessage.getKey())
.encode().array();
case APPEND:
AppendMessage appendMessage = (AppendMessage) message;
return encodeMandatoryFields(APPEND_MESSAGE_STRUCT, message)
.int64(KEY_FIELD, appendMessage.getKey())
.byteBuffer("payload", appendMessage.getPayload())
.encode().array();
case GET_AND_APPEND:
GetAndAppendMessage getAndAppendMessage = (GetAndAppendMessage) message;
return encodeMandatoryFields(GET_AND_APPEND_MESSAGE_STRUCT, message)
.int64(KEY_FIELD, getAndAppendMessage.getKey())
.byteBuffer("payload", getAndAppendMessage.getPayload())
.encode().array();
case REPLACE:
final ReplaceAtHeadMessage replaceAtHeadMessage = (ReplaceAtHeadMessage) message;
return encodeMandatoryFields(REPLACE_MESSAGE_STRUCT, message)
.int64(KEY_FIELD, replaceAtHeadMessage.getKey())
.struct("expect", replaceAtHeadMessage.getExpect(), ChainCodec::encode)
.struct("update", replaceAtHeadMessage.getUpdate(), ChainCodec::encode)
.encode().array();
case CLIENT_INVALIDATION_ACK:
ClientInvalidationAck clientInvalidationAckMessage = (ClientInvalidationAck) message;
return encodeMandatoryFields(CLIENT_INVALIDATION_ACK_MESSAGE_STRUCT, message)
.int64(KEY_FIELD, clientInvalidationAckMessage.getKey())
.int32("invalidationId", clientInvalidationAckMessage.getInvalidationId())
.encode().array();
case CLIENT_INVALIDATION_ALL_ACK:
ClientInvalidationAllAck clientInvalidationAllAckMessage = (ClientInvalidationAllAck) message;
return encodeMandatoryFields(CLIENT_INVALIDATION_ALL_ACK_MESSAGE_STRUCT, message)
.int32("invalidationId", clientInvalidationAllAckMessage.getInvalidationId())
.encode().array();
case CLEAR:
return encodeMandatoryFields(CLEAR_MESSAGE_STRUCT, message)
.encode().array();
case LOCK:
ServerStoreOpMessage.LockMessage lockMessage = (ServerStoreOpMessage.LockMessage) message;
return encodeMandatoryFields(LOCK_STRUCT, message)
.int64("hash", lockMessage.getHash())
.encode().array();
case UNLOCK:
ServerStoreOpMessage.UnlockMessage unlockMessage = (ServerStoreOpMessage.UnlockMessage) message;
return encodeMandatoryFields(LOCK_STRUCT, message)
.int64("hash", unlockMessage.getHash())
.encode().array();
case ITERATOR_OPEN:
return encodeMandatoryFields(ITERATOR_OPEN_STRUCT, message)
.int32("batchSize", ((ServerStoreOpMessage.IteratorOpenMessage) message).getBatchSize())
.encode().array();
case ITERATOR_CLOSE:
return encodeMandatoryFields(ITERATOR_CLOSE_STRUCT, message)
.string("id", ((ServerStoreOpMessage.IteratorCloseMessage) message).getIdentity().toString())
.encode().array();
case ITERATOR_ADVANCE:
return encodeMandatoryFields(ITERATOR_ADVANCE_STRUCT, message)
.string("id", ((ServerStoreOpMessage.IteratorAdvanceMessage) message).getIdentity().toString())
.int32("batchSize", ((ServerStoreOpMessage.IteratorAdvanceMessage) message).getBatchSize())
.encode().array();
default:
throw new RuntimeException("Unhandled message operation : " + message.getMessageType());
}
}
public EhcacheEntityMessage decode(EhcacheMessageType opCode, ByteBuffer messageBuffer) {
switch (opCode) {
case GET_STORE: {
StructDecoder<Void> decoder = GET_MESSAGE_STRUCT.decoder(messageBuffer);
Long key = decoder.int64(KEY_FIELD);
return new GetMessage(key);
}
case GET_AND_APPEND: {
StructDecoder<Void> decoder = GET_AND_APPEND_MESSAGE_STRUCT.decoder(messageBuffer);
Long key = decoder.int64(KEY_FIELD);
ByteBuffer payload = decoder.byteBuffer("payload");
return new GetAndAppendMessage(key, payload);
}
case APPEND: {
StructDecoder<Void> decoder = APPEND_MESSAGE_STRUCT.decoder(messageBuffer);
Long key = decoder.int64(KEY_FIELD);
ByteBuffer payload = decoder.byteBuffer("payload");
return new AppendMessage(key, payload);
}
case REPLACE: {
StructDecoder<Void> decoder = REPLACE_MESSAGE_STRUCT.decoder(messageBuffer);
Long key = decoder.int64(KEY_FIELD);
Chain expect = ChainCodec.decode(decoder.struct("expect"));
Chain update = ChainCodec.decode(decoder.struct("update"));
return new ReplaceAtHeadMessage(key, expect, update);
}
case CLIENT_INVALIDATION_ACK: {
StructDecoder<Void> decoder = CLIENT_INVALIDATION_ACK_MESSAGE_STRUCT.decoder(messageBuffer);
Long key = decoder.int64(KEY_FIELD);
Integer invalidationId = decoder.int32("invalidationId");
return new ClientInvalidationAck(key, invalidationId);
}
case CLIENT_INVALIDATION_ALL_ACK: {
StructDecoder<Void> decoder = CLIENT_INVALIDATION_ALL_ACK_MESSAGE_STRUCT.decoder(messageBuffer);
Integer invalidationId = decoder.int32("invalidationId");
return new ClientInvalidationAllAck(invalidationId);
}
case CLEAR: {
return new ClearMessage();
}
case LOCK: {
StructDecoder<Void> decoder = LOCK_STRUCT.decoder(messageBuffer);
long hash = decoder.int64("hash");
return new ServerStoreOpMessage.LockMessage(hash);
}
case UNLOCK: {
StructDecoder<Void> decoder = LOCK_STRUCT.decoder(messageBuffer);
long hash = decoder.int64("hash");
return new ServerStoreOpMessage.UnlockMessage(hash);
}
case ITERATOR_OPEN: {
StructDecoder<Void> decoder = ITERATOR_OPEN_STRUCT.decoder(messageBuffer);
int batchSize = decoder.int32("batchSize");
return new ServerStoreOpMessage.IteratorOpenMessage(batchSize);
}
case ITERATOR_CLOSE: {
StructDecoder<Void> decoder = ITERATOR_CLOSE_STRUCT.decoder(messageBuffer);
UUID identity = UUID.fromString(decoder.string("id"));
return new ServerStoreOpMessage.IteratorCloseMessage(identity);
}
case ITERATOR_ADVANCE: {
StructDecoder<Void> decoder = ITERATOR_ADVANCE_STRUCT.decoder(messageBuffer);
UUID identity = UUID.fromString(decoder.string("id"));
int batchSize = decoder.int32("batchSize");
return new ServerStoreOpMessage.IteratorAdvanceMessage(identity, batchSize);
}
default:
throw new RuntimeException("Unhandled message operation : " + opCode);
}
}
}