package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.ForwardingVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ThriftResultsMerger;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public abstract class ReadResponse
{
public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>()
{
@Override
protected IVersionedSerializer<ReadResponse> delegate(int version)
{
return version < MessagingService.VERSION_30
? legacyRangeSliceReplySerializer
: serializer;
}
};
private final ReadCommand command;
protected ReadResponse(ReadCommand command)
{
this.command = command;
}
public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
return new LocalDataResponse(data, command);
}
@VisibleForTesting
public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()));
}
public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
return new DigestResponse(makeDigest(data, command));
}
public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
public abstract ByteBuffer digest(ReadCommand command);
public abstract boolean isDigestResponse();
public String toDebugString(ReadCommand command, DecoratedKey key)
{
if (isDigestResponse())
return "Digest:0x" + ByteBufferUtil.bytesToHex(digest(command));
try (UnfilteredPartitionIterator iter = makeIterator(command))
{
while (iter.hasNext())
{
try (UnfilteredRowIterator partition = iter.next())
{
if (partition.partitionKey().equals(key))
return toDebugString(partition, command.metadata());
}
}
}
return "<key " + key + " not found>";
}
private String toDebugString(UnfilteredRowIterator partition, CFMetaData metadata)
{
StringBuilder sb = new StringBuilder();
sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s",
metadata.ksName,
metadata.cfName,
metadata.getKeyValidator().getString(partition.partitionKey().getKey()),
partition.partitionLevelDeletion(),
partition.columns()));
if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
sb.append("\n ").append(partition.staticRow().toString(metadata, true));
while (partition.hasNext())
sb.append("\n ").append(partition.next().toString(metadata, true));
return sb.toString();
}
protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
{
MessageDigest digest = FBUtilities.threadLocalMD5Digest();
UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion());
return ByteBuffer.wrap(digest.digest());
}
private static class DigestResponse extends ReadResponse
{
private final ByteBuffer digest;
private DigestResponse(ByteBuffer digest)
{
super(null);
assert digest.hasRemaining();
this.digest = digest;
}
public UnfilteredPartitionIterator makeIterator(ReadCommand command)
{
throw new UnsupportedOperationException();
}
public ByteBuffer digest(ReadCommand command)
{
return digest;
}
public boolean isDigestResponse()
{
return true;
}
}
private static class LocalDataResponse extends DataResponse
{
private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
{
super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL);
}
private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
{
try (DataOutputBuffer buffer = new DataOutputBuffer())
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version);
return buffer.buffer();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
private static class RemoteDataResponse extends DataResponse
{
protected RemoteDataResponse(ByteBuffer data)
{
super(null, data, SerializationHelper.Flag.FROM_REMOTE);
}
}
static abstract class DataResponse extends ReadResponse
{
private final ByteBuffer data;
private final SerializationHelper.Flag flag;
protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag flag)
{
super(command);
this.data = data;
this.flag = flag;
}
public UnfilteredPartitionIterator makeIterator(ReadCommand command)
{
try (DataInputBuffer in = new DataInputBuffer(data, true))
{
return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
MessagingService.current_version,
command.metadata(),
command.columnFilter(),
flag);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
public ByteBuffer digest(ReadCommand command)
{
try (UnfilteredPartitionIterator iterator = makeIterator(command))
{
return makeDigest(iterator, command);
}
}
public boolean isDigestResponse()
{
return false;
}
}
@VisibleForTesting
static class LegacyRemoteDataResponse extends ReadResponse
{
private final List<ImmutableBTreePartition> partitions;
@VisibleForTesting
LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
{
super(null);
this.partitions = partitions;
}
public UnfilteredPartitionIterator makeIterator(final ReadCommand command)
{
boolean skipFirst = false;
boolean skipLast = false;
if (!partitions.isEmpty() && command instanceof PartitionRangeReadCommand)
{
AbstractBounds<PartitionPosition> keyRange = ((PartitionRangeReadCommand)command).dataRange().keyRange();
boolean isExcludingBounds = keyRange instanceof ExcludingBounds;
skipFirst = isExcludingBounds && !keyRange.contains(partitions.get(0).partitionKey());
skipLast = (isExcludingBounds || keyRange instanceof IncludingExcludingBounds) && !keyRange.contains(partitions.get(partitions.size() - 1).partitionKey());
}
final List<ImmutableBTreePartition> toReturn;
if (skipFirst || skipLast)
{
toReturn = partitions.size() == 1
? Collections.emptyList()
: partitions.subList(skipFirst ? 1 : 0, skipLast ? partitions.size() - 1 : partitions.size());
}
else
{
toReturn = partitions;
}
return new AbstractUnfilteredPartitionIterator()
{
private int idx;
public boolean isForThrift()
{
return true;
}
public CFMetaData metadata()
{
return command.metadata();
}
public boolean hasNext()
{
return idx < toReturn.size();
}
public UnfilteredRowIterator next()
{
ImmutableBTreePartition partition = toReturn.get(idx++);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey());
UnfilteredRowIterator iterator = partition.unfilteredIterator(command.columnFilter(), filter.getSlices(command.metadata()), filter.isReversed());
if (command.isForThrift())
return ThriftResultsMerger.maybeWrap(iterator, command.nowInSec());
else
return iterator;
}
};
}
public ByteBuffer digest(ReadCommand command)
{
try (UnfilteredPartitionIterator iterator = makeIterator(command))
{
return makeDigest(iterator, command);
}
}
public boolean isDigestResponse()
{
return false;
}
}
private static class Serializer implements IVersionedSerializer<ReadResponse>
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
if (version < MessagingService.VERSION_30)
{
out.writeInt(digest.remaining());
out.write(digest);
out.writeBoolean(isDigest);
if (!isDigest)
{
assert response.command != null;
try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
{
assert iter.hasNext();
try (UnfilteredRowIterator partition = iter.next())
{
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
}
assert !iter.hasNext();
}
}
return;
}
ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
{
ByteBuffer data = ((DataResponse)response).data;
ByteBufferUtil.writeWithVIntLength(data, out);
}
}
public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
{
if (version < MessagingService.VERSION_30)
{
byte[] digest = null;
int digestSize = in.readInt();
if (digestSize > 0)
{
digest = new byte[digestSize];
in.readFully(digest, 0, digestSize);
}
boolean isDigest = in.readBoolean();
assert isDigest == digestSize > 0;
if (isDigest)
{
assert digest != null;
return new DigestResponse(ByteBuffer.wrap(digest));
}
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
try (UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
{
if (rowIterator == null)
return new LegacyRemoteDataResponse(Collections.emptyList());
return new LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator)));
}
}
ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
if (digest.hasRemaining())
return new DigestResponse(digest);
assert version >= MessagingService.VERSION_30;
ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
return new RemoteDataResponse(data);
}
public long serializedSize(ReadResponse response, int version)
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
if (version < MessagingService.VERSION_30)
{
long size = TypeSizes.sizeof(digest.remaining())
+ digest.remaining()
+ TypeSizes.sizeof(isDigest);
if (!isDigest)
{
assert response.command != null;
try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
{
assert iter.hasNext();
try (UnfilteredRowIterator partition = iter.next())
{
size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
}
assert !iter.hasNext();
}
}
return size;
}
long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
if (!isDigest)
{
assert version >= MessagingService.VERSION_30;
ByteBuffer data = ((DataResponse)response).data;
size += ByteBufferUtil.serializedSizeWithVIntLength(data);
}
return size;
}
}
private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
{
public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
{
assert version < MessagingService.VERSION_30;
int numPartitions = 0;
assert response.command != null;
try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator atomIterator = iterator.next())
{
numPartitions++;
while (atomIterator.hasNext())
atomIterator.next();
}
}
}
out.writeInt(numPartitions);
try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
}
}
}
}
public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
{
assert version < MessagingService.VERSION_30;
int partitionCount = in.readInt();
ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount);
for (int i = 0; i < partitionCount; i++)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
{
partitions.add(ImmutableBTreePartition.create(partition));
}
}
return new LegacyRemoteDataResponse(partitions);
}
public long serializedSize(ReadResponse response, int version)
{
assert version < MessagingService.VERSION_30;
long size = TypeSizes.sizeof(0);
assert response.command != null;
try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
}
}
}
return size;
}
}
}