package org.apache.cassandra.index.sasi;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
import org.apache.cassandra.io.util.FileUtils;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TermIterator extends RangeIterator<Long, Token>
{
private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
private static final FastThreadLocal<ExecutorService> SEARCH_EXECUTOR = new FastThreadLocal<ExecutorService>()
{
public ExecutorService initialValue()
{
final String currentThread = Thread.currentThread().getName();
final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor();
logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread);
return (concurrencyFactor <= 1)
? MoreExecutors.newDirectExecutorService()
: Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory()
{
public final AtomicInteger count = new AtomicInteger();
public Thread newThread(Runnable task)
{
return NamedThreadFactory.createThread(task, currentThread + "-SEARCH-" + count.incrementAndGet(), true);
}
});
}
};
private final Expression expression;
private final RangeIterator<Long, Token> union;
private final Set<SSTableIndex> referencedIndexes;
private TermIterator(Expression e,
RangeIterator<Long, Token> union,
Set<SSTableIndex> referencedIndexes)
{
super(union.getMinimum(), union.getMaximum(), union.getCount());
this.expression = e;
this.union = union;
this.referencedIndexes = referencedIndexes;
}
@SuppressWarnings("resource")
public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes)
{
final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>();
final AtomicLong tokenCount = new AtomicLong(0);
RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e);
if (memtableIterator != null)
{
tokens.add(memtableIterator);
tokenCount.addAndGet(memtableIterator.getCount());
}
final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>();
try
{
final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size());
final ExecutorService searchExecutor = SEARCH_EXECUTOR.get();
for (final SSTableIndex index : perSSTableIndexes)
{
if (e.getOp() == Expression.Op.PREFIX &&
index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials())
throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " +
"to support prefix queries in CONTAINS mode. " +
"Wait for compaction or rebuild the index.",
index.getPath()));
if (!index.reference())
{
latch.countDown();
continue;
}
referencedIndexes.add(index);
searchExecutor.submit((Runnable) () -> {
try
{
e.checkpoint();
RangeIterator<Long, Token> keyIterator = index.search(e);
if (keyIterator == null)
{
releaseIndex(referencedIndexes, index);
return;
}
tokens.add(keyIterator);
tokenCount.getAndAdd(keyIterator.getCount());
}
catch (Throwable e1)
{
releaseIndex(referencedIndexes, index);
if (logger.isDebugEnabled())
logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1);
}
finally
{
latch.countDown();
}
});
}
Uninterruptibles.awaitUninterruptibly(latch);
e.checkpoint();
RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens);
return new TermIterator(e, ranges, referencedIndexes);
}
catch (Throwable ex)
{
referencedIndexes.forEach(TermIterator::releaseQuietly);
throw ex;
}
}
protected Token computeNext()
{
try
{
return union.hasNext() ? union.next() : endOfData();
}
finally
{
expression.checkpoint();
}
}
protected void performSkipTo(Long nextToken)
{
try
{
union.skipTo(nextToken);
}
finally
{
expression.checkpoint();
}
}
public void close()
{
FileUtils.closeQuietly(union);
referencedIndexes.forEach(TermIterator::releaseQuietly);
referencedIndexes.clear();
}
private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index)
{
indexes.remove(index);
releaseQuietly(index);
}
private static void releaseQuietly(SSTableIndex index)
{
try
{
index.release();
}
catch (Throwable e)
{
logger.error(String.format("Failed to release index %s", index.getPath()), e);
}
}
}