package org.ehcache.clustered.common.internal.messages;
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.PrepareForDestroy;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.ResolveRequest;
import org.ehcache.clustered.common.internal.store.Chain;
import org.ehcache.clustered.common.internal.store.Util;
import org.terracotta.runnel.Struct;
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.ArrayDecoder;
import org.terracotta.runnel.decoding.Enm;
import org.terracotta.runnel.decoding.StructArrayDecoder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.ArrayEncoder;
import org.terracotta.runnel.encoding.StructEncoder;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static java.nio.ByteBuffer.wrap;
import static org.ehcache.clustered.common.internal.messages.ChainCodec.CHAIN_STRUCT;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.AllInvalidationDone;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.ClientInvalidateAll;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.ClientInvalidateHash;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.HashInvalidationDone;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.ServerInvalidateHash;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.MapValue;
import static org.ehcache.clustered.common.internal.messages.EhcacheResponseType.EHCACHE_RESPONSE_TYPES_ENUM_MAPPING;
import static org.ehcache.clustered.common.internal.messages.EhcacheResponseType.RESPONSE_TYPE_FIELD_INDEX;
import static org.ehcache.clustered.common.internal.messages.EhcacheResponseType.RESPONSE_TYPE_FIELD_NAME;
import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.KEY_FIELD;
import static org.ehcache.clustered.common.internal.messages.StateRepositoryOpCodec.WHITELIST_PREDICATE;
import static org.terracotta.runnel.StructBuilder.newStructBuilder;
public class ResponseCodec {
private static final String EXCEPTION_FIELD = "exception";
private static final String INVALIDATION_ID_FIELD = "invalidationId";
private static final String CHAIN_FIELD = "chain";
private static final String MAP_VALUE_FIELD = "mapValue";
private static final String STORES_FIELD = "stores";
private static final Struct SUCCESS_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.build();
private static final Struct FAILURE_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.struct(EXCEPTION_FIELD, 20, ExceptionCodec.EXCEPTION_STRUCT)
.build();
private static final Struct GET_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.struct(CHAIN_FIELD, 20, CHAIN_STRUCT)
.build();
private static final Struct HASH_INVALIDATION_DONE_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 20)
.build();
private static final Struct ALL_INVALIDATION_DONE_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.build();
private static final Struct CLIENT_INVALIDATE_HASH_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 20)
.int32(INVALIDATION_ID_FIELD, 30)
.build();
private static final Struct CLIENT_INVALIDATE_ALL_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.int32(INVALIDATION_ID_FIELD, 20)
.build();
private static final Struct SERVER_INVALIDATE_HASH_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 20)
.build();
private static final Struct MAP_VALUE_RESPONSE_STRUCT = StructBuilder.newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.byteBuffer(MAP_VALUE_FIELD, 20)
.build();
private static final Struct PREPARE_FOR_DESTROY_RESPONSE_STRUCT = newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.strings(STORES_FIELD, 20)
.build();
private static final Struct RESOLVE_REQUEST_RESPONSE_STRUCT = newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.int64(KEY_FIELD, 20)
.struct(CHAIN_FIELD, 30, CHAIN_STRUCT)
.build();
private static final Struct LOCK_RESPONSE_STRUCT = newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.struct(CHAIN_FIELD, 20, CHAIN_STRUCT)
.build();
private static final Struct ITERATOR_BATCH_STRUCT = newStructBuilder()
.enm(RESPONSE_TYPE_FIELD_NAME, RESPONSE_TYPE_FIELD_INDEX, EHCACHE_RESPONSE_TYPES_ENUM_MAPPING)
.string("id", 20)
.structs("chains", 30, CHAIN_STRUCT)
.bool("last", 40)
.build();
public byte[] encode(EhcacheEntityResponse response) {
switch (response.getResponseType()) {
case FAILURE:
final EhcacheEntityResponse.Failure failure = (EhcacheEntityResponse.Failure)response;
return FAILURE_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, failure.getResponseType())
.struct(EXCEPTION_FIELD, failure.getCause(), ExceptionCodec::encode)
.encode().array();
case SUCCESS:
return SUCCESS_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, response.getResponseType())
.encode().array();
case GET_RESPONSE:
final EhcacheEntityResponse.GetResponse getResponse = (EhcacheEntityResponse.GetResponse)response;
return GET_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, getResponse.getResponseType())
.struct(CHAIN_FIELD, getResponse.getChain(), ChainCodec::encode)
.encode().array();
case HASH_INVALIDATION_DONE: {
HashInvalidationDone hashInvalidationDone = (HashInvalidationDone) response;
return HASH_INVALIDATION_DONE_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, hashInvalidationDone.getResponseType())
.int64(KEY_FIELD, hashInvalidationDone.getKey())
.encode().array();
}
case ALL_INVALIDATION_DONE: {
AllInvalidationDone allInvalidationDone = (AllInvalidationDone) response;
return ALL_INVALIDATION_DONE_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, allInvalidationDone.getResponseType())
.encode().array();
}
case CLIENT_INVALIDATE_HASH: {
ClientInvalidateHash clientInvalidateHash = (ClientInvalidateHash) response;
return CLIENT_INVALIDATE_HASH_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, clientInvalidateHash.getResponseType())
.int64(KEY_FIELD, clientInvalidateHash.getKey())
.int32(INVALIDATION_ID_FIELD, clientInvalidateHash.getInvalidationId())
.encode().array();
}
case CLIENT_INVALIDATE_ALL: {
ClientInvalidateAll clientInvalidateAll = (ClientInvalidateAll) response;
return CLIENT_INVALIDATE_ALL_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, clientInvalidateAll.getResponseType())
.int32(INVALIDATION_ID_FIELD, clientInvalidateAll.getInvalidationId())
.encode().array();
}
case SERVER_INVALIDATE_HASH: {
ServerInvalidateHash serverInvalidateHash = (ServerInvalidateHash) response;
return SERVER_INVALIDATE_HASH_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, serverInvalidateHash.getResponseType())
.int64(KEY_FIELD, serverInvalidateHash.getKey())
.encode().array();
}
case MAP_VALUE: {
MapValue mapValue = (MapValue) response;
byte[] encodedMapValue = Util.marshall(mapValue.getValue());
return MAP_VALUE_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, mapValue.getResponseType())
.byteBuffer(MAP_VALUE_FIELD, wrap(encodedMapValue))
.encode().array();
}
case PREPARE_FOR_DESTROY: {
PrepareForDestroy prepare = (PrepareForDestroy) response;
StructEncoder<Void> encoder = PREPARE_FOR_DESTROY_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, prepare.getResponseType());
ArrayEncoder<String, StructEncoder<Void>> storesEncoder = encoder.strings(STORES_FIELD);
for (String storeName : prepare.getStores()) {
storesEncoder.value(storeName);
}
return encoder
.encode().array();
}
case RESOLVE_REQUEST: {
EhcacheEntityResponse.ResolveRequest resolve = (ResolveRequest) response;
return RESOLVE_REQUEST_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, resolve.getResponseType())
.int64(KEY_FIELD, resolve.getKey())
.struct(CHAIN_FIELD, resolve.getChain(), ChainCodec::encode)
.encode().array();
}
case LOCK_SUCCESS: {
EhcacheEntityResponse.LockSuccess lockSuccess = (EhcacheEntityResponse.LockSuccess) response;
return LOCK_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, lockSuccess.getResponseType())
.struct(CHAIN_FIELD, lockSuccess.getChain(), ChainCodec::encode)
.encode().array();
}
case LOCK_FAILURE: {
EhcacheEntityResponse.LockFailure lockFailure = (EhcacheEntityResponse.LockFailure) response;
return LOCK_RESPONSE_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, lockFailure.getResponseType())
.encode().array();
}
case ITERATOR_BATCH: {
EhcacheEntityResponse.IteratorBatch iteratorBatch = (EhcacheEntityResponse.IteratorBatch) response;
return ITERATOR_BATCH_STRUCT.encoder()
.enm(RESPONSE_TYPE_FIELD_NAME, iteratorBatch.getResponseType())
.string("id", iteratorBatch.getIdentity().toString())
.structs("chains", iteratorBatch.getChains(), ChainCodec::encode)
.bool("last", iteratorBatch.isLast())
.encode().array();
}
default:
throw new UnsupportedOperationException("The operation is not supported : " + response.getResponseType());
}
}
public EhcacheEntityResponse decode(byte[] payload) {
ByteBuffer buffer = wrap(payload);
StructDecoder<Void> decoder = SUCCESS_RESPONSE_STRUCT.decoder(buffer);
Enm<EhcacheResponseType> opCodeEnm = decoder.enm(RESPONSE_TYPE_FIELD_NAME);
if (!opCodeEnm.isFound()) {
throw new AssertionError("Got a response without an opCode");
}
if (!opCodeEnm.isValid()) {
return null;
}
EhcacheResponseType opCode = opCodeEnm.get();
buffer.rewind();
switch (opCode) {
case SUCCESS:
return EhcacheEntityResponse.success();
case FAILURE:
decoder = FAILURE_RESPONSE_STRUCT.decoder(buffer);
ClusterException exception = ExceptionCodec.decode(decoder.struct(EXCEPTION_FIELD));
return EhcacheEntityResponse.failure(exception.withClientStackTrace());
case GET_RESPONSE:
decoder = GET_RESPONSE_STRUCT.decoder(buffer);
return EhcacheEntityResponse.getResponse(ChainCodec.decode(decoder.struct(CHAIN_FIELD)));
case HASH_INVALIDATION_DONE: {
decoder = HASH_INVALIDATION_DONE_RESPONSE_STRUCT.decoder(buffer);
long key = decoder.int64(KEY_FIELD);
return EhcacheEntityResponse.hashInvalidationDone(key);
}
case ALL_INVALIDATION_DONE: {
return EhcacheEntityResponse.allInvalidationDone();
}
case CLIENT_INVALIDATE_HASH: {
decoder = CLIENT_INVALIDATE_HASH_RESPONSE_STRUCT.decoder(buffer);
long key = decoder.int64(KEY_FIELD);
int invalidationId = decoder.int32(INVALIDATION_ID_FIELD);
return EhcacheEntityResponse.clientInvalidateHash(key, invalidationId);
}
case CLIENT_INVALIDATE_ALL: {
decoder = CLIENT_INVALIDATE_ALL_RESPONSE_STRUCT.decoder(buffer);
int invalidationId = decoder.int32(INVALIDATION_ID_FIELD);
return EhcacheEntityResponse.clientInvalidateAll(invalidationId);
}
case SERVER_INVALIDATE_HASH: {
decoder = SERVER_INVALIDATE_HASH_RESPONSE_STRUCT.decoder(buffer);
long key = decoder.int64(KEY_FIELD);
return EhcacheEntityResponse.serverInvalidateHash(key);
}
case MAP_VALUE: {
decoder = MAP_VALUE_RESPONSE_STRUCT.decoder(buffer);
return EhcacheEntityResponse.mapValue(
Util.unmarshall(decoder.byteBuffer(MAP_VALUE_FIELD), WHITELIST_PREDICATE));
}
case PREPARE_FOR_DESTROY: {
decoder = PREPARE_FOR_DESTROY_RESPONSE_STRUCT.decoder(buffer);
ArrayDecoder<String, StructDecoder<Void>> storesDecoder = decoder.strings(STORES_FIELD);
Set<String> stores = new HashSet<>();
for (int i = 0; i < storesDecoder.length(); i++) {
stores.add(storesDecoder.value());
}
return EhcacheEntityResponse.prepareForDestroy(stores);
}
case RESOLVE_REQUEST: {
decoder = RESOLVE_REQUEST_RESPONSE_STRUCT.decoder(buffer);
long key = decoder.int64(KEY_FIELD);
Chain chain = ChainCodec.decode(decoder.struct(CHAIN_FIELD));
return EhcacheEntityResponse.resolveRequest(key, chain);
}
case LOCK_SUCCESS: {
decoder = LOCK_RESPONSE_STRUCT.decoder(buffer);
Chain chain = ChainCodec.decode(decoder.struct(CHAIN_FIELD));
return new EhcacheEntityResponse.LockSuccess(chain);
}
case LOCK_FAILURE: {
return EhcacheEntityResponse.lockFailure();
}
case ITERATOR_BATCH: {
decoder = ITERATOR_BATCH_STRUCT.decoder(buffer);
UUID id = UUID.fromString(decoder.string("id"));
StructArrayDecoder<StructDecoder<Void>> chainsDecoder = decoder.structs("chains");
List<Chain> chains = new ArrayList<>(chainsDecoder.length());
while (chainsDecoder.hasNext()) {
chains.add(ChainCodec.decode(chainsDecoder.next()));
}
boolean last = decoder.bool("last");
return new EhcacheEntityResponse.IteratorBatch(id, chains, last);
}
default:
throw new UnsupportedOperationException("The operation is not supported with opCode : " + opCode);
}
}
}