package org.apache.cassandra.hints;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import javax.annotation.Nullable;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.NativeLibrary;
class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
private static final int PAGE_SIZE = 512 << 10;
private final HintsDescriptor descriptor;
private final File file;
private final ChecksummedDataInput input;
@Nullable
private final RateLimiter rateLimiter;
protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
{
this.descriptor = descriptor;
this.file = file;
this.input = reader;
this.rateLimiter = rateLimiter;
}
@SuppressWarnings("resource")
static HintsReader open(File file, RateLimiter rateLimiter)
{
ChecksummedDataInput reader = ChecksummedDataInput.open(file);
try
{
HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
if (descriptor.isCompressed())
{
reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
}
else if (descriptor.isEncrypted())
reader = EncryptedChecksummedDataInput.upgradeInput(reader, descriptor.getCipher(), descriptor.createCompressor());
return new HintsReader(descriptor, file, reader, rateLimiter);
}
catch (IOException e)
{
reader.close();
throw new FSReadError(e, file);
}
}
static HintsReader open(File file)
{
return open(file, null);
}
public void close()
{
input.close();
}
public HintsDescriptor descriptor()
{
return descriptor;
}
void seek(InputPosition newPosition)
{
input.seek(newPosition);
}
public Iterator<Page> iterator()
{
return new PagesIterator();
}
public ChecksummedDataInput getInput()
{
return input;
}
final class Page
{
public final InputPosition position;
private Page(InputPosition inputPosition)
{
this.position = inputPosition;
}
Iterator<Hint> hintsIterator()
{
return new HintsIterator(position);
}
Iterator<ByteBuffer> buffersIterator()
{
return new BuffersIterator(position);
}
}
final class PagesIterator extends AbstractIterator<Page>
{
@SuppressWarnings("resource")
protected Page computeNext()
{
input.tryUncacheRead();
if (input.isEOF())
return endOfData();
return new Page(input.getSeekPosition());
}
}
final class HintsIterator extends AbstractIterator<Hint>
{
private final InputPosition offset;
HintsIterator(InputPosition offset)
{
super();
this.offset = offset;
}
protected Hint computeNext()
{
Hint hint;
do
{
InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData();
if (position.subtract(offset) >= PAGE_SIZE)
return endOfData();
try
{
hint = computeNextInternal();
}
catch (EOFException e)
{
logger.warn("Unexpected EOF replaying hints ({}), likely due to unflushed hint file on shutdown; continuing", descriptor.fileName(), e);
return endOfData();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
while (hint == null);
return hint;
}
private Hint computeNextInternal() throws IOException
{
input.resetCrc();
input.resetLimit();
int size = input.readInt();
if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readHint(size);
}
private Hint readHint(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
input.limit(size);
Hint hint;
try
{
hint = Hint.serializer.deserialize(input, descriptor.messagingVersion());
input.checkLimit(0);
}
catch (UnknownColumnFamilyException e)
{
logger.warn("Failed to read a hint for {}: {} - table with id {} is unknown in file {}",
StorageService.instance.getEndpointForHostId(descriptor.hostId),
descriptor.hostId,
e.cfId,
descriptor.fileName());
input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit()));
hint = null;
}
if (input.checkCrc())
return hint;
logger.warn("Failed to read a hint for {}: {} - digest mismatch for hint at position {} in file {}",
StorageService.instance.getEndpointForHostId(descriptor.hostId),
descriptor.hostId,
input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
}
final class BuffersIterator extends AbstractIterator<ByteBuffer>
{
private final InputPosition offset;
BuffersIterator(InputPosition offset)
{
super();
this.offset = offset;
}
protected ByteBuffer computeNext()
{
ByteBuffer buffer;
do
{
InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData();
if (position.subtract(offset) >= PAGE_SIZE)
return endOfData();
try
{
buffer = computeNextInternal();
}
catch (EOFException e)
{
logger.warn("Unexpected EOF replaying hints ({}), likely due to unflushed hint file on shutdown; continuing", descriptor.fileName(), e);
return endOfData();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
while (buffer == null);
return buffer;
}
private ByteBuffer computeNextInternal() throws IOException
{
input.resetCrc();
input.resetLimit();
int size = input.readInt();
if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readBuffer(size);
}
private ByteBuffer readBuffer(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
input.limit(size);
ByteBuffer buffer = ByteBufferUtil.read(input, size);
if (input.checkCrc())
return buffer;
logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
descriptor.hostId,
input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
}
}