/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.internal.keys;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class KeysSearcher extends CassandraIndexSearcher
{
private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
public KeysSearcher(ReadCommand command,
RowFilter.Expression expression,
CassandraIndex indexer)
{
super(command, expression, indexer);
}
protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
final RowIterator indexHits,
final ReadCommand command,
final ReadExecutionController executionController)
{
assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
return new UnfilteredPartitionIterator()
{
private UnfilteredRowIterator next;
public boolean isForThrift()
{
return command.isForThrift();
}
public CFMetaData metadata()
{
return command.metadata();
}
public boolean hasNext()
{
return prepareNext();
}
public UnfilteredRowIterator next()
{
if (next == null)
prepareNext();
UnfilteredRowIterator toReturn = next;
next = null;
return toReturn;
}
private boolean prepareNext()
{
while (next == null && indexHits.hasNext())
{
Row hit = indexHits.next();
DecoratedKey key = index.baseCfs.decorateKey(hit.clustering().get(0));
if (!command.selectsKey(key))
continue;
ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter());
SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
index.baseCfs.metadata,
command.nowInSec(),
extendedFilter,
command.rowFilter(),
DataLimits.NONE,
key,
command.clusteringIndexFilter(key),
null);
@SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
// Otherwise, we close right away if empty, and if it's assigned to next it will be called either
// by the next caller of next, or through closing this iterator is this come before.
UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
hit,
indexKey.getKey(),
executionController.writeOpOrderGroup(),
isForThrift(),
command.nowInSec());
if (dataIter != null)
{
if (dataIter.isEmpty())
dataIter.close();
else
next = dataIter;
}
}
return next != null;
}
public void remove()
{
throw new UnsupportedOperationException();
}
public void close()
{
indexHits.close();
if (next != null)
next.close();
}
};
}
private ColumnFilter getExtendedFilter(ColumnFilter initialFilter)
{
if (command.columnFilter().fetches(index.getIndexedColumn()))
return initialFilter;
ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
builder.addAll(initialFilter.fetchedColumns());
builder.add(index.getIndexedColumn());
return builder.build();
}
private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
Row indexHit,
ByteBuffer indexedValue,
OpOrder.Group writeOp,
boolean isForThrift,
int nowInSec)
{
if (isForThrift)
{
// The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
// is the indexed name and so we need to materialize the partition.
ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
iterator.close();
Row data = result.getRow(Clustering.make(index.getIndexedColumn().name.bytes));
if (data == null)
return null;
// for thrift tables, we need to compare the index entry against the compact value column,
// not the column actually designated as the indexed column so we don't use the index function
// lib for the staleness check like we do in every other case
Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn());
if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0)
{
// Index is stale, remove the index entry and ignore
index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
Clustering.make(index.getIndexedColumn().name.bytes),
new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
writeOp);
return null;
}
else
{
if (command.columnFilter().fetches(index.getIndexedColumn()))
return result.unfilteredIterator();
// The query on the base table used an extended column filter to ensure that the
// indexed column was actually read for use in the staleness check, before
// returning the results we must filter the base table partition so that it
// contains only the originally requested columns. See CASSANDRA-11523
ClusteringComparator comparator = result.metadata().comparator;
Slices.Builder slices = new Slices.Builder(comparator);
for (ColumnDefinition selected : command.columnFilter().fetchedColumns())
slices.add(Slice.make(comparator, selected.name.bytes));
return result.unfilteredIterator(ColumnFilter.all(command.metadata()), slices.build(), false);
}
}
else
{
if (!iterator.metadata().isCompactTable())
{
logger.warn("Non-composite index was used on the table '{}' during the query. Starting from Cassandra 4.0, only " +
"composite indexes will be supported. If compact flags were dropped for this table, drop and re-create " +
"the index.", iterator.metadata().cfName);
}
Row data = iterator.staticRow();
if (index.isStale(data, indexedValue, nowInSec))
{
// Index is stale, remove the index entry and ignore
index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
writeOp);
iterator.close();
return null;
}
else
{
return iterator;
}
}
}
}