package org.apache.cassandra.db.commitlog;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
public class CommitLog implements CommitLogMBean
{
private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
public static final CommitLog instance = CommitLog.construct();
final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
final public AbstractCommitLogSegmentManager segmentManager;
public final CommitLogArchiver archiver;
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
volatile Configuration configuration;
private static CommitLog construct()
{
CommitLog log = new CommitLog(CommitLogArchiver.construct());
MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log.start();
}
@VisibleForTesting
CommitLog(CommitLogArchiver archiver)
{
this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
DatabaseDescriptor.getEncryptionContext());
DatabaseDescriptor.createAllDirectories();
this.archiver = archiver;
metrics = new CommitLogMetrics();
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
? new BatchCommitLogService(this)
: new PeriodicCommitLogService(this);
segmentManager = DatabaseDescriptor.isCDCEnabled()
? new CommitLogSegmentManagerCDC(this, DatabaseDescriptor.getCommitLogLocation())
: new CommitLogSegmentManagerStandard(this, DatabaseDescriptor.getCommitLogLocation());
metrics.attach(executor, segmentManager);
}
CommitLog start()
{
segmentManager.start();
executor.start();
return this;
}
public int recoverSegmentsOnDisk() throws IOException
{
FilenameFilter unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter))
{
archiver.maybeArchive(file.getPath(), file.getName());
archiver.maybeWaitForArchiving(file.getName());
}
assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
archiver.maybeRestoreArchive();
File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
int replayed = 0;
if (files.length == 0)
{
logger.info("No commitlog files found; skipping replay");
}
else
{
Arrays.sort(files, new CommitLogSegmentFileComparator());
logger.info("Replaying {}", StringUtils.join(files, ", "));
replayed = recoverFiles(files);
logger.info("Log replay complete, {} replayed mutations", replayed);
for (File f : files)
segmentManager.handleReplayedSegment(f);
}
return replayed;
}
public int recoverFiles(File... clogs) throws IOException
{
CommitLogReplayer replayer = CommitLogReplayer.construct(this);
replayer.replayFiles(clogs);
return replayer.blockForWrites();
}
public void recoverPath(String path) throws IOException
{
CommitLogReplayer replayer = CommitLogReplayer.construct(this);
replayer.replayPath(new File(path), false);
replayer.blockForWrites();
}
public void recover(String path) throws IOException
{
recoverPath(path);
}
public CommitLogPosition getCurrentPosition()
{
return segmentManager.getCurrentPosition();
}
public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
{
segmentManager.forceRecycleAll(droppedCfs);
}
public void forceRecycleAllSegments()
{
segmentManager.forceRecycleAll(Collections.<UUID>emptyList());
}
public void sync(boolean flush) throws IOException
{
segmentManager.sync(flush);
}
public void requestExtraSync()
{
executor.requestExtraSync();
}
public CommitLogPosition add(Mutation mutation) throws WriteTimeoutException
{
assert mutation != null;
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
Mutation.serializer.serialize(mutation, dob, MessagingService.current_version);
int size = dob.getLength();
int totalSize = size + ENTRY_OVERHEAD_SIZE;
if (totalSize > MAX_MUTATION_SIZE)
{
throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s",
FBUtilities.prettyPrintMemory(totalSize),
FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
}
Allocation alloc = segmentManager.allocate(mutation, totalSize);
CRC32 checksum = new CRC32();
final ByteBuffer buffer = alloc.getBuffer();
try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
{
dos.writeInt(size);
updateChecksumInt(checksum, size);
buffer.putInt((int) checksum.getValue());
dos.write(dob.getData(), 0, size);
updateChecksum(checksum, buffer, buffer.position() - size, size);
buffer.putInt((int) checksum.getValue());
}
catch (IOException e)
{
throw new FSWriteError(e, alloc.getSegment().getPath());
}
finally
{
alloc.markWritten();
}
executor.finishWriteFor(alloc);
return alloc.getCommitLogPosition();
}
catch (IOException e)
{
throw new FSWriteError(e, segmentManager.allocatingFrom().getPath());
}
}
public void discardCompletedSegments(final UUID cfId, final CommitLogPosition lowerBound, final CommitLogPosition upperBound)
{
logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId);
for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
segment.markClean(cfId, lowerBound, upperBound);
if (segment.isUnused())
{
logger.debug("Commit log segment {} is unused", segment);
segmentManager.archiveAndDiscard(segment);
}
else
{
if (logger.isTraceEnabled())
logger.trace("Not safe to delete{} commit log segment {}; dirty is {}",
(iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
}
if (segment.contains(upperBound))
break;
}
}
@Override
public String getArchiveCommand()
{
return archiver.archiveCommand;
}
@Override
public String getRestoreCommand()
{
return archiver.restoreCommand;
}
@Override
public String getRestoreDirectories()
{
return archiver.restoreDirectories;
}
@Override
public long getRestorePointInTime()
{
return archiver.restorePointInTime;
}
@Override
public String getRestorePrecision()
{
return archiver.precision.toString();
}
public List<String> getActiveSegmentNames()
{
List<String> segmentNames = new ArrayList<>();
for (CommitLogSegment seg : segmentManager.getActiveSegments())
segmentNames.add(seg.getName());
return segmentNames;
}
public List<String> getArchivingSegmentNames()
{
return new ArrayList<>(archiver.archivePending.keySet());
}
@Override
public long getActiveContentSize()
{
long size = 0;
for (CommitLogSegment seg : segmentManager.getActiveSegments())
size += seg.contentSize();
return size;
}
@Override
public long getActiveOnDiskSize()
{
return segmentManager.onDiskSize();
}
@Override
public Map<String, Double> getActiveSegmentCompressionRatios()
{
Map<String, Double> segmentRatios = new TreeMap<>();
for (CommitLogSegment seg : segmentManager.getActiveSegments())
segmentRatios.put(seg.getName(), 1.0 * seg.onDiskSize() / seg.contentSize());
return segmentRatios;
}
public void shutdownBlocking() throws InterruptedException
{
executor.shutdown();
executor.awaitTermination();
segmentManager.shutdown();
segmentManager.awaitTermination();
}
public int resetUnsafe(boolean deleteSegments) throws IOException
{
stopUnsafe(deleteSegments);
resetConfiguration();
return restartUnsafe();
}
public void resetConfiguration()
{
configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
DatabaseDescriptor.getEncryptionContext());
}
public void stopUnsafe(boolean deleteSegments)
{
executor.shutdown();
try
{
executor.awaitTermination();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
segmentManager.stopUnsafe(deleteSegments);
CommitLogSegment.resetReplayLimit();
if (DatabaseDescriptor.isCDCEnabled() && deleteSegments)
for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
FileUtils.deleteWithConfirm(f);
}
public int restartUnsafe() throws IOException
{
return start().recoverSegmentsOnDisk();
}
@VisibleForTesting
public static boolean handleCommitError(String message, Throwable t)
{
JVMStabilityInspector.inspectCommitLogThrowable(t);
switch (DatabaseDescriptor.getCommitFailurePolicy())
{
case die:
case stop:
StorageService.instance.stopTransports();
case stop_commit:
logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
return false;
case ignore:
logger.error(message, t);
return true;
default:
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
public static final class Configuration
{
private final ParameterizedClass compressorClass;
private final ICompressor compressor;
private EncryptionContext encryptionContext;
public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext)
{
this.compressorClass = compressorClass;
this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
this.encryptionContext = encryptionContext;
}
public boolean useCompression()
{
return compressor != null;
}
public boolean useEncryption()
{
return encryptionContext.isEnabled();
}
public ICompressor getCompressor()
{
return compressor;
}
public ParameterizedClass getCompressorClass()
{
return compressorClass;
}
public String getCompressorName()
{
return useCompression() ? compressor.getClass().getSimpleName() : "none";
}
public EncryptionContext getEncryptionContext()
{
return encryptionContext;
}
}
}