package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.commons.lang3.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
public class CommitLogReplayer implements CommitLogReadHandler
{
@VisibleForTesting
public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64);
@VisibleForTesting
public static MutationInitiator mutationInitiator = new MutationInitiator();
static final String IGNORE_REPLAY_ERRORS_PROPERTY = Config.PROPERTY_PREFIX + "commitlog.ignorereplayerrors";
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger(Config.PROPERTY_PREFIX + "commitlog_max_outstanding_replay_count", 1024);
private final Set<Keyspace> keyspacesReplayed;
private final Queue<Future<Integer>> futures;
private final AtomicInteger replayedCount;
private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted;
private final CommitLogPosition globalPosition;
private long pendingMutationBytes = 0;
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
@VisibleForTesting
protected CommitLogReader commitLogReader;
CommitLogReplayer(CommitLog commitLog,
CommitLogPosition globalPosition,
Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted,
ReplayFilter replayFilter)
{
this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayDeque<Future<Integer>>();
this.replayedCount = new AtomicInteger();
this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
this.archiver = commitLog.archiver;
this.commitLogReader = new CommitLogReader();
}
public static CommitLogReplayer construct(CommitLog commitLog)
{
Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
{
long restoreTime = commitLog.archiver.restorePointInTime;
long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
if (truncatedTime > restoreTime)
{
if (replayFilter.includes(cfs.metadata))
{
logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.",
cfs.metadata.ksName,
cfs.metadata.cfName);
SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
truncatedAt = null;
}
}
}
IntervalSet<CommitLogPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
cfPersisted.put(cfs.metadata.cfId, filter);
}
CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values());
logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}
public void replayPath(File file, boolean tolerateTruncation) throws IOException
{
commitLogReader.readCommitLogSegment(this, file, globalPosition, CommitLogReader.ALL_MUTATIONS, tolerateTruncation);
}
public void replayFiles(File[] clogs) throws IOException
{
commitLogReader.readAllFiles(this, clogs, globalPosition);
}
public int blockForWrites()
{
for (Map.Entry<UUID, AtomicInteger> entry : commitLogReader.getInvalidMutations())
logger.warn("Skipped {} mutations from unknown (probably removed) CF with id {}", entry.getValue(), entry.getKey());
FBUtilities.waitOnFutures(futures);
logger.trace("Finished waiting on mutations from recovery");
futures.clear();
boolean flushingSystem = false;
List<Future<?>> futures = new ArrayList<Future<?>>();
for (Keyspace keyspace : keyspacesReplayed)
{
if (keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
flushingSystem = true;
futures.addAll(keyspace.flush());
}
if (!flushingSystem)
futures.add(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush());
FBUtilities.waitOnFutures(futures);
return replayedCount.get();
}
@VisibleForTesting
public static class MutationInitiator
{
protected Future<Integer> initiateMutation(final Mutation mutation,
final long segmentId,
final int serializedSize,
final int entryLocation,
final CommitLogReplayer commitLogReplayer)
{
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow()
{
if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
return;
if (commitLogReplayer.pointInTimeExceeded(mutation))
return;
final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
Mutation newMutation = null;
for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
{
if (Schema.instance.getCF(update.metadata().cfId) == null)
continue;
if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
{
if (newMutation == null)
newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
newMutation.add(update);
commitLogReplayer.replayedCount.incrementAndGet();
}
}
if (newMutation != null)
{
assert !newMutation.isEmpty();
Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
commitLogReplayer.keyspacesReplayed.add(keyspace);
}
}
};
return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
}
}
public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
{
IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>();
for (SSTableReader reader : onDisk)
builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
if (truncatedAt != null)
builder.add(CommitLogPosition.NONE, truncatedAt);
return builder.build();
}
public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges)
{
return ranges.stream()
.map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE))
.min(Ordering.natural())
.get();
}
abstract static class ReplayFilter
{
public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
public abstract boolean includes(CFMetaData metadata);
public static ReplayFilter create()
{
if (System.getProperty("cassandra.replayList") == null)
return new AlwaysReplayFilter();
Multimap<String, String> toReplay = HashMultimap.create();
for (String rawPair : System.getProperty("cassandra.replayList").split(","))
{
String[] pair = StringUtils.split(rawPair.trim(), '.');
if (pair.length != 2)
throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
if (ks == null)
throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]);
if (cfs == null)
throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
toReplay.put(pair[0], pair[1]);
}
return new CustomReplayFilter(toReplay);
}
}
private static class AlwaysReplayFilter extends ReplayFilter
{
public Iterable<PartitionUpdate> filter(Mutation mutation)
{
return mutation.getPartitionUpdates();
}
public boolean includes(CFMetaData metadata)
{
return true;
}
}
private static class CustomReplayFilter extends ReplayFilter
{
private Multimap<String, String> toReplay;
public CustomReplayFilter(Multimap<String, String> toReplay)
{
this.toReplay = toReplay;
}
public Iterable<PartitionUpdate> filter(Mutation mutation)
{
final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName());
if (cfNames == null)
return Collections.emptySet();
return Iterables.filter(mutation.getPartitionUpdates(), new Predicate<PartitionUpdate>()
{
public boolean apply(PartitionUpdate upd)
{
return cfNames.contains(upd.metadata().cfName);
}
});
}
public boolean includes(CFMetaData metadata)
{
return toReplay.containsEntry(metadata.ksName, metadata.cfName);
}
}
private boolean shouldReplay(UUID cfId, CommitLogPosition position)
{
return !cfPersisted.get(cfId).contains(position);
}
protected boolean pointInTimeExceeded(Mutation fm)
{
long restoreTarget = archiver.restorePointInTime;
for (PartitionUpdate upd : fm.getPartitionUpdates())
{
if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
return true;
}
return false;
}
public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
{
pendingMutationBytes += size;
futures.offer(mutationInitiator.initiateMutation(m,
desc.id,
size,
entryLocation,
this));
while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT
|| pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
|| (!futures.isEmpty() && futures.peek().isDone()))
{
pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll());
}
}
public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException
{
if (exception.permissible)
logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", exception);
else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
logger.error("Ignoring commit log replay error", exception);
else if (!CommitLog.handleCommitError("Failed commit log replay", exception))
{
logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
"commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
"on the command line");
throw new CommitLogReplayException(exception.getMessage(), exception);
}
return false;
}
public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
{
shouldSkipSegmentOnError(exception);
}
@SuppressWarnings("serial")
public static class CommitLogReplayException extends IOException
{
public CommitLogReplayException(String message, Throwable cause)
{
super(message, cause);
}
public CommitLogReplayException(String message)
{
super(message);
}
}
}