/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
public class CommitLogReader
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
@VisibleForTesting
public static final int ALL_MUTATIONS = -1;
private final CRC32 checksum;
private final Map<UUID, AtomicInteger> invalidMutations;
private byte[] buffer;
public CommitLogReader()
{
checksum = new CRC32();
invalidMutations = new HashMap<>();
buffer = new byte[4096];
}
public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
{
return invalidMutations.entrySet();
}
Reads all passed in files with no minimum, no start, and no mutation limit.
/**
* Reads all passed in files with no minimum, no start, and no mutation limit.
*/
public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
{
readAllFiles(handler, files, CommitLogPosition.NONE);
}
private static boolean shouldSkip(File file) throws IOException, ConfigurationException
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
if (desc.version < CommitLogDescriptor.VERSION_21)
{
return false;
}
try(RandomAccessReader reader = RandomAccessReader.open(file))
{
CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
int end = reader.readInt();
long filecrc = reader.readInt() & 0xffffffffL;
return end == 0 && filecrc == 0;
}
}
private static List<File> filterCommitLogFiles(File[] toFilter)
{
List<File> filtered = new ArrayList<>(toFilter.length);
for (File file: toFilter)
{
try
{
if (shouldSkip(file))
{
logger.info("Skipping playback of empty log: {}", file.getName());
}
else
{
filtered.add(file);
}
}
catch (Exception e)
{
// let recover deal with it
filtered.add(file);
}
}
return filtered;
}
Reads all passed in files with minPosition, no start, and no mutation limit.
/**
* Reads all passed in files with minPosition, no start, and no mutation limit.
*/
public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
{
List<File> filteredLogs = filterCommitLogFiles(files);
int i = 0;
for (File file: filteredLogs)
{
i++;
readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, i == filteredLogs.size());
}
}
Reads passed in file fully
/**
* Reads passed in file fully
*/
public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
{
readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
}
Reads passed in file fully, up to mutationLimit count
/**
* Reads passed in file fully, up to mutationLimit count
*/
@VisibleForTesting
public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
{
readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
}
Reads mutations from file, handing them off to handler
Params: - handler – Handler that will take action based on deserialized Mutations
- file – CommitLogSegment file to read
- minPosition – Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
- mutationLimit – Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
- tolerateTruncation – Whether or not we should allow truncation of this file or throw if EOF found
Throws:
/**
* Reads mutations from file, handing them off to handler
* @param handler Handler that will take action based on deserialized Mutations
* @param file CommitLogSegment file to read
* @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
* @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
* @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
*
* @throws IOException
*/
public void readCommitLogSegment(CommitLogReadHandler handler,
File file,
CommitLogPosition minPosition,
int mutationLimit,
boolean tolerateTruncation) throws IOException
{
// just transform from the file name (no reading of headers) to determine version
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
try(RandomAccessReader reader = RandomAccessReader.open(file))
{
if (desc.version < CommitLogDescriptor.VERSION_21)
{
if (!shouldSkipSegmentId(file, desc, minPosition))
{
if (minPosition.segmentId == desc.id)
reader.seek(minPosition.position);
ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
statusTracker.errorContext = desc.fileName();
readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
}
return;
}
final long segmentIdFromFilename = desc.id;
try
{
// The following call can either throw or legitimately return null. For either case, we need to check
// desc outside this block and set it to null in the exception case.
desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
}
catch (Exception e)
{
desc = null;
}
if (desc == null)
{
// don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
// whether or not we continue with startup will depend on whether this is the last segment
handler.handleUnrecoverableError(new CommitLogReadException(
String.format("Could not read commit log descriptor in file %s", file),
CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
tolerateTruncation));
return;
}
if (segmentIdFromFilename != desc.id)
{
if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
"Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
false)))
{
return;
}
}
if (shouldSkipSegmentId(file, desc, minPosition))
return;
CommitLogSegmentReader segmentReader;
try
{
segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
}
catch(Exception e)
{
handler.handleUnrecoverableError(new CommitLogReadException(
String.format("Unable to create segment reader for commit log file: %s", e),
CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
tolerateTruncation));
return;
}
try
{
ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
{
// Only tolerate truncation if we allow in both global and segment
statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
// Skip segments that are completely behind the desired minPosition
if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
continue;
statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
if (!statusTracker.shouldContinue())
break;
}
}
// Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
// is wrapping an IOException.
catch (RuntimeException re)
{
if (re.getCause() instanceof IOException)
throw (IOException) re.getCause();
throw re;
}
logger.debug("Finished reading {}", file);
}
}
Any segment with id >= minPosition.segmentId is a candidate for read.
/**
* Any segment with id >= minPosition.segmentId is a candidate for read.
*/
private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
{
logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
file.getPath(),
desc.version,
desc.getMessagingVersion(),
desc.compression);
if (minPosition.segmentId > desc.id)
{
logger.trace("Skipping read of fully-flushed {}", file);
return true;
}
return false;
}
Reads a section of a file containing mutations
Params: - handler – Handler that will take action based on deserialized Mutations
- reader – FileDataInput / logical buffer containing commitlog mutations
- minPosition – CommitLogPosition indicating when we should start actively replaying mutations
- end – logical numeric end of the segment being read
- statusTracker – ReadStatusTracker with current state of mutation count, error state, etc
- desc – Descriptor for CommitLog serialization
/**
* Reads a section of a file containing mutations
*
* @param handler Handler that will take action based on deserialized Mutations
* @param reader FileDataInput / logical buffer containing commitlog mutations
* @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
* @param end logical numeric end of the segment being read
* @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
* @param desc Descriptor for CommitLog serialization
*/
private void readSection(CommitLogReadHandler handler,
FileDataInput reader,
CommitLogPosition minPosition,
int end,
ReadStatusTracker statusTracker,
CommitLogDescriptor desc) throws IOException
{
// seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
reader.seek(minPosition.position);
while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
{
long mutationStart = reader.getFilePointer();
if (logger.isTraceEnabled())
logger.trace("Reading mutation at {}", mutationStart);
long claimedCRC32;
int serializedSize;
try
{
// We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
// of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
// However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
// from the end of the file, which means that we'll be unable to read an a full int and instead
// read an EOF here
if(end - reader.getFilePointer() < 4)
{
logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing");
statusTracker.requestTermination();
return;
}
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
statusTracker.requestTermination();
return;
}
// Mutation must be at LEAST 10 bytes:
// 3 for a non-empty Keyspace
// 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
// 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
{
if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
checksum.reset();
CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
if (checksum.getValue() != claimedSizeChecksum)
{
if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
if (serializedSize > buffer.length)
buffer = new byte[(int) (1.2 * serializedSize)];
reader.readFully(buffer, 0, serializedSize);
claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
}
catch (EOFException eof)
{
if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.EOF,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
continue;
}
long mutationPosition = reader.getFilePointer();
readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
// Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
// are before this mark.
if (mutationPosition >= minPosition.position)
statusTracker.addProcessedMutation();
}
}
Deserializes and passes a Mutation to the ICommitLogReadHandler requested
Params: - handler – Handler that will take action based on deserialized Mutations
- inputBuffer – raw byte array w/Mutation data
- size – deserialized size of mutation
- minPosition – We need to suppress replay of mutations that are before the required minPosition
- entryLocation – filePointer offset of mutation within CommitLogSegment
- desc – CommitLogDescriptor being worked on
/**
* Deserializes and passes a Mutation to the ICommitLogReadHandler requested
*
* @param handler Handler that will take action based on deserialized Mutations
* @param inputBuffer raw byte array w/Mutation data
* @param size deserialized size of mutation
* @param minPosition We need to suppress replay of mutations that are before the required minPosition
* @param entryLocation filePointer offset of mutation within CommitLogSegment
* @param desc CommitLogDescriptor being worked on
*/
@VisibleForTesting
protected void readMutation(CommitLogReadHandler handler,
byte[] inputBuffer,
int size,
CommitLogPosition minPosition,
final int entryLocation,
final CommitLogDescriptor desc) throws IOException
{
// For now, we need to go through the motions of deserializing the mutation to determine its size and move
// the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
boolean shouldReplay = entryLocation > minPosition.position;
final Mutation mutation;
try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
{
mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
SerializationHelper.Flag.LOCAL);
// doublecheck that what we read is still] valid for the current schema
for (PartitionUpdate upd : mutation.getPartitionUpdates())
upd.validate();
}
catch (UnknownColumnFamilyException ex)
{
if (ex.cfId == null)
return;
AtomicInteger i = invalidMutations.get(ex.cfId);
if (i == null)
{
i = new AtomicInteger(1);
invalidMutations.put(ex.cfId, i);
}
else
i.incrementAndGet();
return;
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
File f = File.createTempFile("mutation", "dat");
try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
{
out.write(inputBuffer, 0, size);
}
// Checksum passed so this error can't be permissible.
handler.handleUnrecoverableError(new CommitLogReadException(
String.format(
"Unexpected error deserializing mutation; saved to %s. " +
"This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
"Exception follows: %s", f.getAbsolutePath(), t),
CommitLogReadErrorReason.MUTATION_ERROR,
false));
return;
}
if (logger.isTraceEnabled())
logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
"{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
if (shouldReplay)
handler.handleMutation(mutation, size, entryLocation, desc);
}
Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
/**
* Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
*/
private static class CommitLogFormat
{
public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
{
switch (commitLogVersion)
{
case CommitLogDescriptor.VERSION_12:
case CommitLogDescriptor.VERSION_20:
return input.readLong();
// Changed format in 2.1
default:
return input.readInt() & 0xffffffffL;
}
}
public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
{
switch (commitLogVersion)
{
case CommitLogDescriptor.VERSION_12:
checksum.update(serializedSize);
break;
// Changed format in 2.0
default:
updateChecksumInt(checksum, serializedSize);
break;
}
}
public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
{
switch (commitLogVersion)
{
case CommitLogDescriptor.VERSION_12:
case CommitLogDescriptor.VERSION_20:
return input.readLong();
// Changed format in 2.1
default:
return input.readInt() & 0xffffffffL;
}
}
}
private static class ReadStatusTracker
{
private int mutationsLeft;
public String errorContext = "";
public boolean tolerateErrorsInSection;
private boolean error;
public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
{
this.mutationsLeft = mutationLimit;
this.tolerateErrorsInSection = tolerateErrorsInSection;
}
public void addProcessedMutation()
{
if (mutationsLeft == ALL_MUTATIONS)
return;
--mutationsLeft;
}
public boolean shouldContinue()
{
return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
}
public void requestTermination()
{
error = true;
}
}
}