package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
import com.google.common.base.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.CFName;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.cql3.SuperColumnCompatibility;
import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.cql3.WhereClause;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.RawSelector;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.UnauthorizedException;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.AggregationQueryPager;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
public class SelectStatement implements CQLStatement
{
private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
public static final int DEFAULT_PAGE_SIZE = 10000;
private final int boundTerms;
public final CFMetaData cfm;
public final Parameters parameters;
private final Selection selection;
private final Term limit;
private final Term perPartitionLimit;
private final StatementRestrictions restrictions;
private final boolean isReversed;
private final AggregationSpecification aggregationSpec;
private final Comparator<List<ByteBuffer>> orderingComparator;
private final ColumnFilter queriedColumns;
private static final Parameters defaultParameters = new Parameters(Collections.emptyMap(),
Collections.emptyList(),
false,
false,
false);
public SelectStatement(CFMetaData cfm,
int boundTerms,
Parameters parameters,
Selection selection,
StatementRestrictions restrictions,
boolean isReversed,
AggregationSpecification aggregationSpec,
Comparator<List<ByteBuffer>> orderingComparator,
Term limit,
Term perPartitionLimit)
{
this.cfm = cfm;
this.boundTerms = boundTerms;
this.selection = selection;
this.restrictions = restrictions;
this.isReversed = isReversed;
this.aggregationSpec = aggregationSpec;
this.orderingComparator = orderingComparator;
this.parameters = parameters;
this.limit = limit;
this.perPartitionLimit = perPartitionLimit;
this.queriedColumns = gatherQueriedColumns();
}
public Iterable<Function> getFunctions()
{
List<Function> functions = new ArrayList<>();
addFunctionsTo(functions);
return functions;
}
private void addFunctionsTo(List<Function> functions)
{
selection.addFunctionsTo(functions);
restrictions.addFunctionsTo(functions);
if (limit != null)
limit.addFunctionsTo(functions);
if (perPartitionLimit != null)
perPartitionLimit.addFunctionsTo(functions);
}
private ColumnFilter gatherQueriedColumns()
{
if (selection.isWildcard())
return ColumnFilter.all(cfm);
ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfm);
for (ColumnDefinition def : selection.getColumns())
if (!def.isPrimaryKeyColumn())
builder.add(def);
builder.addAll(restrictions.nonPKRestrictedColumns(true));
return builder.build();
}
public ColumnFilter queriedColumns()
{
return queriedColumns;
}
static SelectStatement forSelection(CFMetaData cfm, Selection selection)
{
return new SelectStatement(cfm,
0,
defaultParameters,
selection,
StatementRestrictions.empty(StatementType.SELECT, cfm),
false,
null,
null,
null,
null);
}
public ResultSet.ResultMetadata getResultMetadata()
{
return selection.getResultMetadata(parameters.isJson);
}
public int getBoundTerms()
{
return boundTerms;
}
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
{
if (cfm.isView())
{
CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
if (baseTable != null)
state.hasColumnFamilyAccess(baseTable, Permission.SELECT);
}
else
{
state.hasColumnFamilyAccess(cfm, Permission.SELECT);
}
for (Function function : getFunctions())
state.ensureHasPermission(Permission.EXECUTE, function);
}
public void validate(ClientState state) throws InvalidRequestException
{
}
public ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
{
ConsistencyLevel cl = options.getConsistency();
checkNotNull(cl, "Invalid empty consistency level");
cl.validateForRead(keyspace());
int nowInSec = FBUtilities.nowInSeconds();
int userLimit = getLimit(options);
int userPerPartitionLimit = getPerPartitionLimit(options);
int pageSize = options.getPageSize();
ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize);
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
return execute(query, options, state, nowInSec, userLimit, queryStartNanoTime);
QueryPager pager = getPager(query, options);
return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit, queryStartNanoTime);
}
public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
{
return getQuery(options, nowInSec, getLimit(options), getPerPartitionLimit(options), options.getPageSize());
}
public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit, int perPartitionLimit, int pageSize)
{
boolean isPartitionRangeQuery = restrictions.isKeyRange() || restrictions.usesSecondaryIndexing();
DataLimits limit = getDataLimits(userLimit, perPartitionLimit, pageSize);
if (isPartitionRangeQuery)
return getRangeCommand(options, limit, nowInSec);
return getSliceCommands(options, limit, nowInSec);
}
private ResultMessage.Rows execute(ReadQuery query,
QueryOptions options,
QueryState state,
int nowInSec,
int userLimit, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState(), queryStartNanoTime))
{
return processResults(data, options, nowInSec, userLimit);
}
}
private static abstract class Pager
{
protected QueryPager pager;
protected Pager(QueryPager pager)
{
this.pager = pager;
}
public static Pager forInternalQuery(QueryPager pager, ReadExecutionController executionController)
{
return new InternalPager(pager, executionController);
}
public static Pager forDistributedQuery(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
{
return new NormalPager(pager, consistency, clientState);
}
public boolean isExhausted()
{
return pager.isExhausted();
}
public PagingState state()
{
return pager.state();
}
public abstract PartitionIterator fetchPage(int pageSize, long queryStartNanoTime);
public static class NormalPager extends Pager
{
private final ConsistencyLevel consistency;
private final ClientState clientState;
private NormalPager(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
{
super(pager);
this.consistency = consistency;
this.clientState = clientState;
}
public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime)
{
return pager.fetchPage(pageSize, consistency, clientState, queryStartNanoTime);
}
}
public static class InternalPager extends Pager
{
private final ReadExecutionController executionController;
private InternalPager(QueryPager pager, ReadExecutionController executionController)
{
super(pager);
this.executionController = executionController;
}
public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime)
{
return pager.fetchPageInternal(pageSize, executionController);
}
}
}
private ResultMessage.Rows execute(Pager pager,
QueryOptions options,
int pageSize,
int nowInSec,
int userLimit,
long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
if (aggregationSpec != null)
{
if (!restrictions.hasPartitionKeyRestrictions())
{
warn("Aggregation query used without partition key");
}
else if (restrictions.keyIsInRelation())
{
warn("Aggregation query used on multiple partition keys (IN restriction)");
}
}
checkFalse(pageSize > 0 && needsPostQueryOrdering(),
"Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
+ " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
ResultMessage.Rows msg;
try (PartitionIterator page = pager.fetchPage(pageSize, queryStartNanoTime))
{
msg = processResults(page, options, nowInSec, userLimit);
}
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
private void warn(String msg)
{
logger.warn(msg);
ClientWarn.instance.warn(msg);
}
private ResultMessage.Rows processResults(PartitionIterator partitions,
QueryOptions options,
int nowInSec,
int userLimit) throws RequestValidationException
{
ResultSet rset = process(partitions, options, nowInSec, userLimit);
return new ResultMessage.Rows(rset);
}
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
return executeInternal(state, options, FBUtilities.nowInSeconds(), System.nanoTime());
}
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
{
int userLimit = getLimit(options);
int userPerPartitionLimit = getPerPartitionLimit(options);
int pageSize = options.getPageSize();
ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize);
try (ReadExecutionController executionController = query.executionController())
{
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
{
try (PartitionIterator data = query.executeInternal(executionController))
{
return processResults(data, options, nowInSec, userLimit);
}
}
else
{
QueryPager pager = getPager(query, options);
return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit, queryStartNanoTime);
}
}
}
private QueryPager getPager(ReadQuery query, QueryOptions options)
{
QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
if (aggregationSpec == null || query == ReadQuery.EMPTY)
return pager;
return new AggregationQueryPager(pager, query.limits());
}
public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
{
return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
}
public String keyspace()
{
return cfm.ksName;
}
public String columnFamily()
{
return cfm.cfName;
}
public Selection getSelection()
{
return selection;
}
public StatementRestrictions getRestrictions()
{
return restrictions;
}
private ReadQuery getSliceCommands(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
{
Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
if (keys.isEmpty())
return ReadQuery.EMPTY;
ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
if (filter == null)
return ReadQuery.EMPTY;
RowFilter rowFilter = getRowFilter(options);
List<SinglePartitionReadCommand> commands = new ArrayList<>(keys.size());
for (ByteBuffer key : keys)
{
QueryProcessor.validateKey(key);
DecoratedKey dk = cfm.decorateKey(ByteBufferUtil.clone(key));
ColumnFilter cf = (cfm.isSuper() && cfm.isDense()) ? SuperColumnCompatibility.getColumnFilter(cfm, options, restrictions.getSuperColumnRestrictions()) : queriedColumns;
commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, cf, rowFilter, limit, dk, filter));
}
return new SinglePartitionReadCommand.Group(commands, limit);
}
public Slices clusteringIndexFilterAsSlices()
{
QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList());
ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
if (filter instanceof ClusteringIndexSliceFilter)
return ((ClusteringIndexSliceFilter)filter).requestedSlices();
Slices.Builder builder = new Slices.Builder(cfm.comparator);
for (Clustering clustering: ((ClusteringIndexNamesFilter)filter).requestedRows())
builder.add(Slice.make(clustering));
return builder.build();
}
public SinglePartitionReadCommand internalReadForView(DecoratedKey key, int nowInSec)
{
QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList());
ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
RowFilter rowFilter = getRowFilter(options);
return SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, filter);
}
public RowFilter rowFilterForInternalCalls()
{
return getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList()));
}
private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
{
ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options);
if (clusteringIndexFilter == null)
return ReadQuery.EMPTY;
RowFilter rowFilter = getRowFilter(options);
AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
if (keyBounds == null)
return ReadQuery.EMPTY;
PartitionRangeReadCommand command =
PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
command.maybeValidateIndex();
return command;
}
private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)
throws InvalidRequestException
{
if (parameters.isDistinct)
{
return new ClusteringIndexSliceFilter(Slices.ALL, false);
}
if (restrictions.isColumnRange())
{
Slices slices = makeSlices(options);
if (slices == Slices.NONE && !selection.containsStaticColumns())
return null;
return new ClusteringIndexSliceFilter(slices, isReversed);
}
else
{
NavigableSet<Clustering> clusterings = getRequestedRows(options);
if (clusterings.isEmpty() && queriedColumns.fetchedColumns().statics.isEmpty())
return null;
return new ClusteringIndexNamesFilter(clusterings, isReversed);
}
}
private Slices makeSlices(QueryOptions options)
throws InvalidRequestException
{
SortedSet<ClusteringBound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
SortedSet<ClusteringBound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
assert startBounds.size() == endBounds.size();
if (startBounds.size() == 1)
{
ClusteringBound start = startBounds.first();
ClusteringBound end = endBounds.first();
return cfm.comparator.compare(start, end) > 0
? Slices.NONE
: Slices.with(cfm.comparator, Slice.make(start, end));
}
Slices.Builder builder = new Slices.Builder(cfm.comparator, startBounds.size());
Iterator<ClusteringBound> startIter = startBounds.iterator();
Iterator<ClusteringBound> endIter = endBounds.iterator();
while (startIter.hasNext() && endIter.hasNext())
{
ClusteringBound start = startIter.next();
ClusteringBound end = endIter.next();
if (cfm.comparator.compare(start, end) > 0)
continue;
builder.add(start, end);
}
return builder.build();
}
private DataLimits getDataLimits(int userLimit, int perPartitionLimit, int pageSize)
{
int cqlRowLimit = DataLimits.NO_LIMIT;
int cqlPerPartitionLimit = DataLimits.NO_LIMIT;
if (aggregationSpec != AggregationSpecification.AGGREGATE_EVERYTHING)
{
if (!needsPostQueryOrdering())
cqlRowLimit = userLimit;
cqlPerPartitionLimit = perPartitionLimit;
}
if (pageSize <= 0)
pageSize = DEFAULT_PAGE_SIZE;
if (aggregationSpec != null && aggregationSpec != AggregationSpecification.AGGREGATE_EVERYTHING)
{
if (parameters.isDistinct)
return DataLimits.distinctLimits(cqlRowLimit);
return DataLimits.groupByLimits(cqlRowLimit,
cqlPerPartitionLimit,
pageSize,
aggregationSpec);
}
if (parameters.isDistinct)
return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit);
return DataLimits.cqlLimits(cqlRowLimit, cqlPerPartitionLimit);
}
public int getLimit(QueryOptions options)
{
return getLimit(limit, options);
}
public int getPerPartitionLimit(QueryOptions options)
{
return getLimit(perPartitionLimit, options);
}
private int getLimit(Term limit, QueryOptions options)
{
int userLimit = DataLimits.NO_LIMIT;
if (limit != null)
{
ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
if (b != UNSET_BYTE_BUFFER)
{
try
{
Int32Type.instance.validate(b);
userLimit = Int32Type.instance.compose(b);
checkTrue(userLimit > 0, "LIMIT must be strictly positive");
}
catch (MarshalException e)
{
throw new InvalidRequestException("Invalid limit value");
}
}
}
return userLimit;
}
private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException
{
assert !restrictions.isColumnRange();
return restrictions.getClusteringColumns(options);
}
public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException
{
ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options);
return filter;
}
private ResultSet process(PartitionIterator partitions,
QueryOptions options,
int nowInSec,
int userLimit) throws InvalidRequestException
{
Selection.ResultSetBuilder result = selection.resultSetBuilder(options, parameters.isJson, aggregationSpec);
while (partitions.hasNext())
{
try (RowIterator partition = partitions.next())
{
processPartition(partition, options, result, nowInSec);
}
}
ResultSet cqlRows = result.build();
orderResults(cqlRows);
cqlRows.trim(userLimit);
return cqlRows;
}
public static ByteBuffer[] getComponents(CFMetaData cfm, DecoratedKey dk)
{
ByteBuffer key = dk.getKey();
if (cfm.getKeyValidator() instanceof CompositeType)
{
return ((CompositeType)cfm.getKeyValidator()).split(key);
}
else
{
return new ByteBuffer[]{ key };
}
}
void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec)
throws InvalidRequestException
{
if (cfm.isSuper() && cfm.isDense())
{
SuperColumnCompatibility.processPartition(cfm, selection, partition, result, options.getProtocolVersion(), restrictions.getSuperColumnRestrictions(), options);
return;
}
ProtocolVersion protocolVersion = options.getProtocolVersion();
ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey());
Row staticRow = partition.staticRow();
if (!partition.hasNext())
{
if (!staticRow.isEmpty() && (queriesFullPartitions() || cfm.isStaticCompactTable()))
{
result.newRow(partition.partitionKey(), staticRow.clustering());
for (ColumnDefinition def : selection.getColumns())
{
switch (def.kind)
{
case PARTITION_KEY:
result.add(keyComponents[def.position()]);
break;
case STATIC:
addValue(result, def, staticRow, nowInSec, protocolVersion);
break;
default:
result.add((ByteBuffer)null);
}
}
}
return;
}
while (partition.hasNext())
{
Row row = partition.next();
result.newRow( partition.partitionKey(), row.clustering());
for (ColumnDefinition def : selection.getColumns())
{
switch (def.kind)
{
case PARTITION_KEY:
result.add(keyComponents[def.position()]);
break;
case CLUSTERING:
result.add(row.clustering().get(def.position()));
break;
case REGULAR:
addValue(result, def, row, nowInSec, protocolVersion);
break;
case STATIC:
addValue(result, def, staticRow, nowInSec, protocolVersion);
break;
}
}
}
}
private boolean queriesFullPartitions()
{
return !restrictions.hasClusteringColumnsRestrictions() && !restrictions.hasRegularColumnsRestrictions();
}
private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, Row row, int nowInSec, ProtocolVersion protocolVersion)
{
if (def.isComplex())
{
assert def.type.isMultiCell();
ComplexColumnData complexData = row.getComplexColumnData(def);
if (complexData == null)
result.add(null);
else if (def.type.isCollection())
result.add(((CollectionType) def.type).serializeForNativeProtocol(complexData.iterator(), protocolVersion));
else
result.add(((UserType) def.type).serializeForNativeProtocol(complexData.iterator(), protocolVersion));
}
else
{
result.add(row.getCell(def), nowInSec);
}
}
private boolean needsPostQueryOrdering()
{
return restrictions.keyIsInRelation() && !parameters.orderings.isEmpty();
}
private void orderResults(ResultSet cqlRows)
{
if (cqlRows.size() == 0 || !needsPostQueryOrdering())
return;
Collections.sort(cqlRows.rows, orderingComparator);
}
public static class RawStatement extends CFStatement
{
public final Parameters parameters;
public final List<RawSelector> selectClause;
public final WhereClause whereClause;
public final Term.Raw limit;
public final Term.Raw perPartitionLimit;
public RawStatement(CFName cfName, Parameters parameters,
List<RawSelector> selectClause,
WhereClause whereClause,
Term.Raw limit,
Term.Raw perPartitionLimit)
{
super(cfName);
this.parameters = parameters;
this.selectClause = selectClause;
this.whereClause = whereClause;
this.limit = limit;
this.perPartitionLimit = perPartitionLimit;
}
public ParsedStatement.Prepared prepare(ClientState clientState) throws InvalidRequestException
{
return prepare(false, clientState);
}
public ParsedStatement.Prepared prepare(boolean forView, ClientState clientState) throws InvalidRequestException
{
CFMetaData cfm = ThriftValidation.validateColumnFamilyWithCompactMode(keyspace(), columnFamily(), clientState.isNoCompactMode());
VariableSpecifications boundNames = getBoundVariables();
Selection selection = prepareSelection(cfm, boundNames);
StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection, forView);
if (parameters.isDistinct)
{
checkNull(perPartitionLimit, "PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries");
validateDistinctSelection(cfm, selection, restrictions);
}
AggregationSpecification aggregationSpec = getAggregationSpecification(cfm,
selection,
restrictions,
parameters.isDistinct);
checkFalse(aggregationSpec == AggregationSpecification.AGGREGATE_EVERYTHING && perPartitionLimit != null,
"PER PARTITION LIMIT is not allowed with aggregate queries.");
Comparator<List<ByteBuffer>> orderingComparator = null;
boolean isReversed = false;
if (!parameters.orderings.isEmpty())
{
assert !forView;
verifyOrderingIsAllowed(restrictions);
orderingComparator = getOrderingComparator(cfm, selection, restrictions, parameters.isJson);
isReversed = isReversed(cfm);
if (isReversed)
orderingComparator = Collections.reverseOrder(orderingComparator);
}
checkNeedsFiltering(restrictions);
SelectStatement stmt = new SelectStatement(cfm,
boundNames.size(),
parameters,
selection,
restrictions,
isReversed,
aggregationSpec,
orderingComparator,
prepareLimit(boundNames, limit, keyspace(), limitReceiver()),
prepareLimit(boundNames, perPartitionLimit, keyspace(), perPartitionLimitReceiver()));
return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
}
private Selection prepareSelection(CFMetaData cfm, VariableSpecifications boundNames)
{
boolean hasGroupBy = !parameters.groups.isEmpty();
if (selectClause.isEmpty())
return hasGroupBy ? Selection.wildcardWithGroupBy(cfm, boundNames) : Selection.wildcard(cfm);
return Selection.fromSelectors(cfm, selectClause, boundNames, hasGroupBy);
}
private StatementRestrictions prepareRestrictions(CFMetaData cfm,
VariableSpecifications boundNames,
Selection selection,
boolean forView) throws InvalidRequestException
{
return new StatementRestrictions(StatementType.SELECT,
cfm,
whereClause,
boundNames,
selection.containsOnlyStaticColumns(),
selection.containsAComplexColumn(),
parameters.allowFiltering,
forView);
}
private Term prepareLimit(VariableSpecifications boundNames, Term.Raw limit,
String keyspace, ColumnSpecification limitReceiver) throws InvalidRequestException
{
if (limit == null)
return null;
Term prepLimit = limit.prepare(keyspace, limitReceiver);
prepLimit.collectMarkerSpecification(boundNames);
return prepLimit;
}
private static void verifyOrderingIsAllowed(StatementRestrictions restrictions) throws InvalidRequestException
{
checkFalse(restrictions.usesSecondaryIndexing(), "ORDER BY with 2ndary indexes is not supported.");
checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
}
private static void validateDistinctSelection(CFMetaData cfm,
Selection selection,
StatementRestrictions restrictions)
throws InvalidRequestException
{
checkFalse(restrictions.hasClusteringColumnsRestrictions() ||
(restrictions.hasNonPrimaryKeyRestrictions() && !restrictions.nonPKRestrictedColumns(true).stream().allMatch(ColumnDefinition::isStatic)),
"SELECT DISTINCT with WHERE clause only supports restriction by partition key and/or static columns.");
Collection<ColumnDefinition> requestedColumns = selection.getColumns();
for (ColumnDefinition def : requestedColumns)
checkFalse(!def.isPartitionKey() && !def.isStatic(),
"SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)",
def.name);
if (!restrictions.isKeyRange())
return;
for (ColumnDefinition def : cfm.partitionKeyColumns())
checkTrue(requestedColumns.contains(def),
"SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name);
}
private AggregationSpecification getAggregationSpecification(CFMetaData cfm,
Selection selection,
StatementRestrictions restrictions,
boolean isDistinct)
{
if (parameters.groups.isEmpty())
return selection.isAggregate() ? AggregationSpecification.AGGREGATE_EVERYTHING
: null;
int clusteringPrefixSize = 0;
Iterator<ColumnDefinition> pkColumns = cfm.primaryKeyColumns().iterator();
for (ColumnDefinition.Raw raw : parameters.groups)
{
ColumnDefinition def = raw.prepare(cfm);
checkTrue(def.isPartitionKey() || def.isClusteringColumn(),
"Group by is currently only supported on the columns of the PRIMARY KEY, got %s", def.name);
while (true)
{
checkTrue(pkColumns.hasNext(),
"Group by currently only support groups of columns following their declared order in the PRIMARY KEY");
ColumnDefinition pkColumn = pkColumns.next();
if (pkColumn.isClusteringColumn())
clusteringPrefixSize++;
if (pkColumn.equals(def))
break;
checkTrue(restrictions.isColumnRestrictedByEq(pkColumn),
"Group by currently only support groups of columns following their declared order in the PRIMARY KEY");
}
}
checkFalse(pkColumns.hasNext() && pkColumns.next().isPartitionKey(),
"Group by is not supported on only a part of the partition key");
checkFalse(clusteringPrefixSize > 0 && isDistinct,
"Grouping on clustering columns is not allowed for SELECT DISTINCT queries");
return AggregationSpecification.aggregatePkPrefix(cfm.comparator, clusteringPrefixSize);
}
private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData cfm,
Selection selection,
StatementRestrictions restrictions,
boolean isJson)
throws InvalidRequestException
{
if (!restrictions.keyIsInRelation())
return null;
Map<ColumnDefinition, Integer> orderingIndexes = getOrderingIndex(cfm, selection, isJson);
List<Integer> idToSort = new ArrayList<Integer>();
List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
for (ColumnDefinition.Raw raw : parameters.orderings.keySet())
{
ColumnDefinition orderingColumn = raw.prepare(cfm);
idToSort.add(orderingIndexes.get(orderingColumn));
sorters.add(orderingColumn.type);
}
return idToSort.size() == 1 ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
: new CompositeComparator(sorters, idToSort);
}
private Map<ColumnDefinition, Integer> getOrderingIndex(CFMetaData cfm, Selection selection, boolean isJson)
throws InvalidRequestException
{
for (ColumnDefinition.Raw raw : parameters.orderings.keySet())
{
final ColumnDefinition def = raw.prepare(cfm);
selection.addColumnForOrdering(def);
}
return selection.getOrderingIndex(isJson);
}
private boolean isReversed(CFMetaData cfm) throws InvalidRequestException
{
Boolean[] reversedMap = new Boolean[cfm.clusteringColumns().size()];
int i = 0;
for (Map.Entry<ColumnDefinition.Raw, Boolean> entry : parameters.orderings.entrySet())
{
ColumnDefinition def = entry.getKey().prepare(cfm);
boolean reversed = entry.getValue();
checkTrue(def.isClusteringColumn(),
"Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", def.name);
checkTrue(i++ == def.position(),
"Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY");
reversedMap[def.position()] = (reversed != def.isReversedType());
}
Boolean isReversed = null;
for (Boolean b : reversedMap)
{
if (b == null)
continue;
if (isReversed == null)
{
isReversed = b;
continue;
}
checkTrue(isReversed.equals(b), "Unsupported order by relation");
}
assert isReversed != null;
return isReversed;
}
private void checkNeedsFiltering(StatementRestrictions restrictions) throws InvalidRequestException
{
if (!parameters.allowFiltering && (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()))
{
checkFalse(restrictions.needFiltering(), StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}
}
private ColumnSpecification limitReceiver()
{
return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance);
}
private ColumnSpecification perPartitionLimitReceiver()
{
return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[per_partition_limit]", true), Int32Type.instance);
}
@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("name", cfName)
.add("selectClause", selectClause)
.add("whereClause", whereClause)
.add("isDistinct", parameters.isDistinct)
.toString();
}
}
public static class Parameters
{
public final Map<ColumnDefinition.Raw, Boolean> orderings;
public final List<ColumnDefinition.Raw> groups;
public final boolean isDistinct;
public final boolean allowFiltering;
public final boolean isJson;
public Parameters(Map<ColumnDefinition.Raw, Boolean> orderings,
List<ColumnDefinition.Raw> groups,
boolean isDistinct,
boolean allowFiltering,
boolean isJson)
{
this.orderings = orderings;
this.groups = groups;
this.isDistinct = isDistinct;
this.allowFiltering = allowFiltering;
this.isJson = isJson;
}
}
private static abstract class ColumnComparator<T> implements Comparator<T>
{
protected final int compare(Comparator<ByteBuffer> comparator, ByteBuffer aValue, ByteBuffer bValue)
{
if (aValue == null)
return bValue == null ? 0 : -1;
return bValue == null ? 1 : comparator.compare(aValue, bValue);
}
}
private static class SingleColumnComparator extends ColumnComparator<List<ByteBuffer>>
{
private final int index;
private final Comparator<ByteBuffer> comparator;
public SingleColumnComparator(int columnIndex, Comparator<ByteBuffer> orderer)
{
index = columnIndex;
comparator = orderer;
}
public int compare(List<ByteBuffer> a, List<ByteBuffer> b)
{
return compare(comparator, a.get(index), b.get(index));
}
}
private static class CompositeComparator extends ColumnComparator<List<ByteBuffer>>
{
private final List<Comparator<ByteBuffer>> orderTypes;
private final List<Integer> positions;
private CompositeComparator(List<Comparator<ByteBuffer>> orderTypes, List<Integer> positions)
{
this.orderTypes = orderTypes;
this.positions = positions;
}
public int compare(List<ByteBuffer> a, List<ByteBuffer> b)
{
for (int i = 0; i < positions.size(); i++)
{
Comparator<ByteBuffer> type = orderTypes.get(i);
int columnPos = positions.get(i);
int comparison = compare(type, a.get(columnPos), b.get(columnPos));
if (comparison != 0)
return comparison;
}
return 0;
}
}
}