package org.apache.cassandra.io.sstable.format.big;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.Transactional;
public class BigTableWriter extends SSTableWriter
{
private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
private final ColumnIndex columnIndexWriter;
private final IndexWriter iwriter;
private final FileHandle.Builder dbuilder;
protected final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
private DataPosition dataMark;
private long lastEarlyOpenLength = 0;
private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance);
private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder()
.trickleFsync(DatabaseDescriptor.getTrickleFsync())
.trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
.build();
public BigTableWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
LifecycleNewTracker lifecycleNewTracker)
{
super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
lifecycleNewTracker.trackNew(this);
if (compression)
{
dataFile = new CompressedSequentialWriter(new File(getFilename()),
descriptor.filenameFor(Component.COMPRESSION_INFO),
new File(descriptor.filenameFor(descriptor.digestComponent)),
writerOption,
metadata.params.compression,
metadataCollector);
}
else
{
dataFile = new ChecksummedSequentialWriter(new File(getFilename()),
new File(descriptor.filenameFor(Component.CRC)),
new File(descriptor.filenameFor(descriptor.digestComponent)),
writerOption);
}
dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
.mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
chunkCache.ifPresent(dbuilder::withChunkCache);
iwriter = new IndexWriter(keyCount);
columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer());
}
public void mark()
{
dataMark = dataFile.mark();
iwriter.mark();
}
public void resetAndTruncate()
{
dataFile.resetAndTruncate(dataMark);
iwriter.resetAndTruncate();
}
protected long beforeAppend(DecoratedKey decoratedKey)
{
assert decoratedKey != null : "Keys must not be null";
if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
return (lastWrittenKey == null) ? 0 : dataFile.position();
}
private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index, ByteBuffer indexInfo) throws IOException
{
metadataCollector.addKey(decoratedKey.getKey());
lastWrittenKey = decoratedKey;
last = lastWrittenKey;
if (first == null)
first = lastWrittenKey;
if (logger.isTraceEnabled())
logger.trace("wrote {} at {}", decoratedKey, dataEnd);
iwriter.append(decoratedKey, index, dataEnd, indexInfo);
}
public RowIndexEntry append(UnfilteredRowIterator iterator)
{
DecoratedKey key = iterator.partitionKey();
if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
{
logger.error("Key size {} exceeds maximum of {}, skipping row", key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
return null;
}
if (iterator.isEmpty())
return null;
long startPosition = beforeAppend(key);
observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
columnIndexWriter.reset();
try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector)))
{
columnIndexWriter.buildRowIndex(collecting);
long indexFilePosition = ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + iwriter.indexFile.position();
RowIndexEntry entry = RowIndexEntry.create(startPosition, indexFilePosition,
collecting.partitionLevelDeletion(),
columnIndexWriter.headerLength,
columnIndexWriter.columnIndexCount,
columnIndexWriter.indexInfoSerializedSize(),
columnIndexWriter.indexSamples(),
columnIndexWriter.offsets(),
getRowIndexEntrySerializer().indexInfoSerializer());
long endPosition = dataFile.position();
long rowSize = endPosition - startPosition;
maybeLogLargePartitionWarning(key, rowSize);
metadataCollector.addPartitionSizeInBytes(rowSize);
afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
return entry;
}
catch (IOException e)
{
throw new FSWriteError(e, dataFile.getPath());
}
}
private RowIndexEntry.IndexSerializer<IndexInfo> getRowIndexEntrySerializer()
{
return (RowIndexEntry.IndexSerializer<IndexInfo>) rowIndexEntrySerializer;
}
private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
{
if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
{
String keyString = metadata.getKeyValidator().getString(key.getKey());
logger.warn("Writing large partition {}/{}:{} ({}) to sstable {}", metadata.ksName, metadata.cfName, keyString, FBUtilities.prettyPrintMemory(rowSize), getFilename());
}
}
private static class StatsCollector extends Transformation
{
private final MetadataCollector collector;
private int cellCount;
StatsCollector(MetadataCollector collector)
{
this.collector = collector;
}
@Override
public Row applyToStatic(Row row)
{
if (!row.isEmpty())
cellCount += Rows.collectStats(row, collector);
return row;
}
@Override
public Row applyToRow(Row row)
{
collector.updateClusteringValues(row.clustering());
cellCount += Rows.collectStats(row, collector);
return row;
}
@Override
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
collector.updateClusteringValues(marker.clustering());
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
collector.update(bm.endDeletionTime());
collector.update(bm.startDeletionTime());
}
else
{
collector.update(((RangeTombstoneBoundMarker)marker).deletionTime());
}
return marker;
}
@Override
public void onPartitionClose()
{
collector.addCellPerPartitionCount(cellCount);
}
@Override
public DeletionTime applyToDeletion(DeletionTime deletionTime)
{
collector.update(deletionTime);
return deletionTime;
}
}
@SuppressWarnings("resource")
public SSTableReader openEarly()
{
IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
if (boundary == null)
return null;
StatsMetadata stats = statsMetadata();
assert boundary.indexLength > 0 && boundary.dataLength > 0;
IndexSummary indexSummary = iwriter.summary.build(metadata.partitioner, boundary);
long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength);
if (compression)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength));
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength);
invalidateCacheAtBoundary(dfile);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components, metadata,
ifile, dfile, indexSummary,
iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(boundary.lastKey);
return sstable;
}
void invalidateCacheAtBoundary(FileHandle dfile)
{
chunkCache.ifPresent(cache -> {
if (lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
cache.invalidatePosition(dfile, lastEarlyOpenLength);
});
lastEarlyOpenLength = dfile.dataLength();
}
public SSTableReader openFinalEarly()
{
dataFile.sync();
iwriter.indexFile.sync();
return openFinal(SSTableReader.OpenReason.EARLY);
}
@SuppressWarnings("resource")
private SSTableReader openFinal(SSTableReader.OpenReason openReason)
{
if (maxDataAge < 0)
maxDataAge = System.currentTimeMillis();
StatsMetadata stats = statsMetadata();
IndexSummary indexSummary = iwriter.summary.build(this.metadata.partitioner);
long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete();
if (compression)
dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
invalidateCacheAtBoundary(dfile);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components,
this.metadata,
ifile,
dfile,
indexSummary,
iwriter.bf.sharedCopy(),
maxDataAge,
stats,
openReason,
header);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
return sstable;
}
protected SSTableWriter.TransactionalProxy txnProxy()
{
return new TransactionalProxy();
}
class TransactionalProxy extends SSTableWriter.TransactionalProxy
{
protected void doPrepare()
{
iwriter.prepareToCommit();
dataFile.prepareToCommit();
writeMetadata(descriptor, finalizeMetadata());
SSTable.appendTOC(descriptor, components);
if (openResult)
finalReader = openFinal(SSTableReader.OpenReason.NORMAL);
}
protected Throwable doCommit(Throwable accumulate)
{
accumulate = dataFile.commit(accumulate);
accumulate = iwriter.commit(accumulate);
return accumulate;
}
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = dbuilder.close(accumulate);
return accumulate;
}
protected Throwable doAbort(Throwable accumulate)
{
accumulate = iwriter.abort(accumulate);
accumulate = dataFile.abort(accumulate);
return accumulate;
}
}
private void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
File file = new File(desc.filenameFor(Component.STATS));
try (SequentialWriter out = new SequentialWriter(file, writerOption))
{
desc.getMetadataSerializer().serialize(components, out, desc.version);
out.finish();
}
catch (IOException e)
{
throw new FSWriteError(e, file.getPath());
}
}
public long getFilePointer()
{
return dataFile.position();
}
public long getOnDiskFilePointer()
{
return dataFile.getOnDiskFilePointer();
}
public long getEstimatedOnDiskBytesWritten()
{
return dataFile.getEstimatedOnDiskBytesWritten();
}
class IndexWriter extends AbstractTransactional implements Transactional
{
private final SequentialWriter indexFile;
public final FileHandle.Builder builder;
public final IndexSummaryBuilder summary;
public final IFilter bf;
private DataPosition mark;
IndexWriter(long keyCount)
{
indexFile = new SequentialWriter(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption);
builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
chunkCache.ifPresent(builder::withChunkCache);
summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder());
indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset()));
dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset()));
}
IndexSummaryBuilder.ReadableBoundary getMaxReadable()
{
return summary.getLastReadableBoundary();
}
public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd, ByteBuffer indexInfo) throws IOException
{
bf.add(key);
long indexStart = indexFile.position();
try
{
ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile);
rowIndexEntrySerializer.serialize(indexEntry, indexFile, indexInfo);
}
catch (IOException e)
{
throw new FSWriteError(e, indexFile.getPath());
}
long indexEnd = indexFile.position();
if (logger.isTraceEnabled())
logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
}
void flushBf()
{
if (components.contains(Component.FILTER))
{
String path = descriptor.filenameFor(Component.FILTER);
try (FileOutputStream fos = new FileOutputStream(path);
DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos))
{
FilterFactory.serialize(bf, stream);
stream.flush();
SyncUtil.sync(fos);
}
catch (IOException e)
{
throw new FSWriteError(e, path);
}
}
}
public void mark()
{
mark = indexFile.mark();
}
public void resetAndTruncate()
{
indexFile.resetAndTruncate(mark);
}
protected void doPrepare()
{
flushBf();
long position = indexFile.position();
indexFile.prepareToCommit();
FileUtils.truncate(indexFile.getPath(), position);
summary.prepareToCommit();
try (IndexSummary indexSummary = summary.build(getPartitioner()))
{
SSTableReader.saveSummary(descriptor, first, last, indexSummary);
}
}
protected Throwable doCommit(Throwable accumulate)
{
return indexFile.commit(accumulate);
}
protected Throwable doAbort(Throwable accumulate)
{
return indexFile.abort(accumulate);
}
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = summary.close(accumulate);
accumulate = bf.close(accumulate);
accumulate = builder.close(accumulate);
return accumulate;
}
}
}