package org.apache.cassandra.index.sasi.disk;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
import org.apache.cassandra.index.sasi.utils.TypeUtil;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PerSSTableIndexWriter implements SSTableFlushObserver
{
private static final Logger logger = LoggerFactory.getLogger(PerSSTableIndexWriter.class);
private static final int POOL_SIZE = 8;
private static final ThreadPoolExecutor INDEX_FLUSHER_MEMTABLE;
private static final ThreadPoolExecutor INDEX_FLUSHER_GENERAL;
static
{
INDEX_FLUSHER_GENERAL = new JMXEnabledThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("SASI-General"),
"internal");
INDEX_FLUSHER_GENERAL.allowCoreThreadTimeOut(true);
INDEX_FLUSHER_MEMTABLE = new JMXEnabledThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("SASI-Memtable"),
"internal");
INDEX_FLUSHER_MEMTABLE.allowCoreThreadTimeOut(true);
}
private final int nowInSec = FBUtilities.nowInSeconds();
private final Descriptor descriptor;
private final OperationType source;
private final AbstractType<?> keyValidator;
@VisibleForTesting
protected final Map<ColumnDefinition, Index> indexes;
private DecoratedKey currentKey;
private long currentKeyPosition;
private boolean isComplete;
public PerSSTableIndexWriter(AbstractType<?> keyValidator,
Descriptor descriptor,
OperationType source,
Map<ColumnDefinition, ColumnIndex> supportedIndexes)
{
this.keyValidator = keyValidator;
this.descriptor = descriptor;
this.source = source;
this.indexes = new HashMap<>();
for (Map.Entry<ColumnDefinition, ColumnIndex> entry : supportedIndexes.entrySet())
indexes.put(entry.getKey(), newIndex(entry.getValue()));
}
public void begin()
{}
public void startPartition(DecoratedKey key, long curPosition)
{
currentKey = key;
currentKeyPosition = curPosition;
}
public void nextUnfilteredCluster(Unfiltered unfiltered)
{
if (!unfiltered.isRow())
return;
Row row = (Row) unfiltered;
indexes.forEach((column, index) -> {
ByteBuffer value = ColumnIndex.getValueOf(column, row, nowInSec);
if (value == null)
return;
if (index == null)
throw new IllegalArgumentException("No index exists for column " + column.name.toString());
index.add(value.duplicate(), currentKey, currentKeyPosition);
});
}
public void complete()
{
if (isComplete)
return;
currentKey = null;
try
{
CountDownLatch latch = new CountDownLatch(indexes.size());
for (Index index : indexes.values())
index.complete(latch);
Uninterruptibles.awaitUninterruptibly(latch);
}
finally
{
indexes.clear();
isComplete = true;
}
}
public Index getIndex(ColumnDefinition columnDef)
{
return indexes.get(columnDef);
}
public Descriptor getDescriptor()
{
return descriptor;
}
@VisibleForTesting
protected Index newIndex(ColumnIndex columnIndex)
{
return new Index(columnIndex);
}
@VisibleForTesting
protected class Index
{
@VisibleForTesting
protected final String outputFile;
private final ColumnIndex columnIndex;
private final AbstractAnalyzer analyzer;
private final long maxMemorySize;
@VisibleForTesting
protected final Set<Future<OnDiskIndex>> segments;
private int segmentNumber = 0;
private OnDiskIndexBuilder currentBuilder;
public Index(ColumnIndex columnIndex)
{
this.columnIndex = columnIndex;
this.outputFile = descriptor.filenameFor(columnIndex.getComponent());
this.analyzer = columnIndex.getAnalyzer();
this.segments = new HashSet<>();
this.maxMemorySize = maxMemorySize(columnIndex);
this.currentBuilder = newIndexBuilder();
}
public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
{
if (term.remaining() == 0)
return;
boolean isAdded = false;
analyzer.reset(term);
while (analyzer.hasNext())
{
ByteBuffer token = analyzer.next();
int size = token.remaining();
if (token.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE)
{
logger.info("Rejecting value (size {}, maximum {}) for column {} (analyzed {}) at {} SSTable.",
FBUtilities.prettyPrintMemory(term.remaining()),
FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE),
columnIndex.getColumnName(),
columnIndex.getMode().isAnalyzed,
descriptor);
continue;
}
if (!TypeUtil.isValid(token, columnIndex.getValidator()))
{
if ((token = TypeUtil.tryUpcast(token, columnIndex.getValidator())) == null)
{
logger.info("({}) Failed to add {} to index for key: {}, value size was {}, validator is {}.",
outputFile,
columnIndex.getColumnName(),
keyValidator.getString(key.getKey()),
FBUtilities.prettyPrintMemory(size),
columnIndex.getValidator());
continue;
}
}
currentBuilder.add(token, key, keyPosition);
isAdded = true;
}
if (!isAdded || currentBuilder.estimatedMemoryUse() < maxMemorySize)
return;
segments.add(getExecutor().submit(scheduleSegmentFlush(false)));
}
@VisibleForTesting
protected Callable<OnDiskIndex> scheduleSegmentFlush(final boolean isFinal)
{
final OnDiskIndexBuilder builder = currentBuilder;
currentBuilder = newIndexBuilder();
final String segmentFile = filename(isFinal);
return () -> {
long start = System.nanoTime();
try
{
File index = new File(segmentFile);
return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null;
}
catch (Exception | FSError e)
{
logger.error("Failed to build index segment {}", segmentFile, e);
return null;
}
finally
{
if (!isFinal)
logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
};
}
public void complete(final CountDownLatch latch)
{
logger.info("Scheduling index flush to {}", outputFile);
getExecutor().submit((Runnable) () -> {
long start1 = System.nanoTime();
OnDiskIndex[] parts = new OnDiskIndex[segments.size() + 1];
try
{
if (segments.isEmpty())
{
scheduleSegmentFlush(true).call();
return;
}
if (!currentBuilder.isEmpty())
{
@SuppressWarnings("resource")
OnDiskIndex last = scheduleSegmentFlush(false).call();
segments.add(Futures.immediateFuture(last));
}
int index = 0;
ByteBuffer combinedMin = null, combinedMax = null;
for (Future<OnDiskIndex> f : segments)
{
@SuppressWarnings("resource")
OnDiskIndex part = f.get();
if (part == null)
continue;
parts[index++] = part;
combinedMin = (combinedMin == null || keyValidator.compare(combinedMin, part.minKey()) > 0) ? part.minKey() : combinedMin;
combinedMax = (combinedMax == null || keyValidator.compare(combinedMax, part.maxKey()) < 0) ? part.maxKey() : combinedMax;
}
OnDiskIndexBuilder builder = newIndexBuilder();
builder.finish(Pair.create(combinedMin, combinedMax),
new File(outputFile),
new CombinedTermIterator(parts));
}
catch (Exception | FSError e)
{
logger.error("Failed to flush index {}.", outputFile, e);
FileUtils.delete(outputFile);
}
finally
{
logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
for (int segment = 0; segment < segmentNumber; segment++)
{
@SuppressWarnings("resource")
OnDiskIndex part = parts[segment];
if (part != null)
FileUtils.closeQuietly(part);
FileUtils.delete(outputFile + "_" + segment);
}
latch.countDown();
}
});
}
private ExecutorService getExecutor()
{
return source == OperationType.FLUSH ? INDEX_FLUSHER_MEMTABLE : INDEX_FLUSHER_GENERAL;
}
private OnDiskIndexBuilder newIndexBuilder()
{
return new OnDiskIndexBuilder(keyValidator, columnIndex.getValidator(), columnIndex.getMode().mode);
}
public String filename(boolean isFinal)
{
return outputFile + (isFinal ? "" : "_" + segmentNumber++);
}
}
protected long maxMemorySize(ColumnIndex columnIndex)
{
return source == OperationType.FLUSH ? 1073741824L : columnIndex.getMode().maxCompactionFlushMemoryInMb;
}
public int hashCode()
{
return descriptor.hashCode();
}
public boolean equals(Object o)
{
return !(o == null || !(o instanceof PerSSTableIndexWriter)) && descriptor.equals(((PerSSTableIndexWriter) o).descriptor);
}
}