/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.util.Objects;
import java.util.Optional;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
import static org.apache.cassandra.utils.Throwables.maybeFail;
FileHandle
provides access to a file for reading, including the ones written by various SequentialWriter
instances, and it is typically used by SSTableReader
. Use Builder
to create an instance, and call createReader()
(and its variants) to access the readers for the underlying file. You can use Builder.complete()
several times during its lifecycle with different overrideLength
(i.e. early opening file). For that reason, the builder keeps a reference to the file channel and makes a copy for each Builder.complete()
call. Therefore, it is important to close the Builder
when it is no longer needed, as well as any FileHandle
instances. /**
* {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
* instances, and it is typically used by {@link org.apache.cassandra.io.sstable.format.SSTableReader}.
*
* Use {@link FileHandle.Builder} to create an instance, and call {@link #createReader()} (and its variants) to
* access the readers for the underlying file.
*
* You can use {@link Builder#complete()} several times during its lifecycle with different {@code overrideLength}(i.e. early opening file).
* For that reason, the builder keeps a reference to the file channel and makes a copy for each {@link Builder#complete()} call.
* Therefore, it is important to close the {@link Builder} when it is no longer needed, as well as any {@link FileHandle}
* instances.
*/
public class FileHandle extends SharedCloseableImpl
{
private static final Logger logger = LoggerFactory.getLogger(FileHandle.class);
public final ChannelProxy channel;
public final long onDiskLength;
/*
* Rebufferer factory to use when constructing RandomAccessReaders
*/
private final RebuffererFactory rebuffererFactory;
/*
* Optional CompressionMetadata when dealing with compressed file
*/
private final Optional<CompressionMetadata> compressionMetadata;
private FileHandle(Cleanup cleanup,
ChannelProxy channel,
RebuffererFactory rebuffererFactory,
CompressionMetadata compressionMetadata,
long onDiskLength)
{
super(cleanup);
this.rebuffererFactory = rebuffererFactory;
this.channel = channel;
this.compressionMetadata = Optional.ofNullable(compressionMetadata);
this.onDiskLength = onDiskLength;
}
private FileHandle(FileHandle copy)
{
super(copy);
channel = copy.channel;
rebuffererFactory = copy.rebuffererFactory;
compressionMetadata = copy.compressionMetadata;
onDiskLength = copy.onDiskLength;
}
Returns: Path to the file this factory is referencing
/**
* @return Path to the file this factory is referencing
*/
public String path()
{
return channel.filePath();
}
public long dataLength()
{
return compressionMetadata.map(c -> c.dataLength).orElseGet(rebuffererFactory::fileLength);
}
public RebuffererFactory rebuffererFactory()
{
return rebuffererFactory;
}
public Optional<CompressionMetadata> compressionMetadata()
{
return compressionMetadata;
}
@Override
public void addTo(Ref.IdentityCollection identities)
{
super.addTo(identities);
compressionMetadata.ifPresent(metadata -> metadata.addTo(identities));
}
@Override
public FileHandle sharedCopy()
{
return new FileHandle(this);
}
Create RandomAccessReader
with configured method of reading content of the file. Returns: RandomAccessReader for the file
/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
*
* @return RandomAccessReader for the file
*/
public RandomAccessReader createReader()
{
return createReader(null);
}
Create RandomAccessReader
with configured method of reading content of the file. Reading from file will be rate limited by given RateLimiter
. Params: - limiter – RateLimiter to use for rate limiting read
Returns: RandomAccessReader for the file
/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
*
* @param limiter RateLimiter to use for rate limiting read
* @return RandomAccessReader for the file
*/
public RandomAccessReader createReader(RateLimiter limiter)
{
return new RandomAccessReader(instantiateRebufferer(limiter));
}
public FileDataInput createReader(long position)
{
RandomAccessReader reader = createReader();
reader.seek(position);
return reader;
}
Drop page cache from start to given before
. Params: - before – uncompressed position from start of the file to be dropped from cache. if 0, to end of file.
/**
* Drop page cache from start to given {@code before}.
*
* @param before uncompressed position from start of the file to be dropped from cache. if 0, to end of file.
*/
public void dropPageCache(long before)
{
long position = compressionMetadata.map(metadata -> {
if (before >= metadata.dataLength)
return 0L;
else
return metadata.chunkFor(before).offset;
}).orElse(before);
NativeLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path());
}
private Rebufferer instantiateRebufferer(RateLimiter limiter)
{
Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
if (limiter != null)
rebufferer = new LimitingRebufferer(rebufferer, limiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE);
return rebufferer;
}
Perform clean up of all resources held by FileHandle
. /**
* Perform clean up of all resources held by {@link FileHandle}.
*/
private static class Cleanup implements RefCounted.Tidy
{
final ChannelProxy channel;
final RebuffererFactory rebufferer;
final CompressionMetadata compressionMetadata;
final Optional<ChunkCache> chunkCache;
private Cleanup(ChannelProxy channel,
RebuffererFactory rebufferer,
CompressionMetadata compressionMetadata,
ChunkCache chunkCache)
{
this.channel = channel;
this.rebufferer = rebufferer;
this.compressionMetadata = compressionMetadata;
this.chunkCache = Optional.ofNullable(chunkCache);
}
public String name()
{
return channel.filePath();
}
public void tidy()
{
chunkCache.ifPresent(cache -> cache.invalidateFile(name()));
try
{
if (compressionMetadata != null)
{
compressionMetadata.close();
}
}
finally
{
try
{
channel.close();
}
finally
{
rebufferer.close();
}
}
}
}
Configures how the file will be read (compressed, mmapped, use cache etc.)
/**
* Configures how the file will be read (compressed, mmapped, use cache etc.)
*/
public static class Builder implements AutoCloseable
{
private final String path;
private ChannelProxy channel;
private CompressionMetadata compressionMetadata;
private MmappedRegions regions;
private ChunkCache chunkCache;
private int bufferSize = RandomAccessReader.DEFAULT_BUFFER_SIZE;
private BufferType bufferType = BufferType.OFF_HEAP;
private boolean mmapped = false;
private boolean compressed = false;
public Builder(String path)
{
this.path = path;
}
public Builder(ChannelProxy channel)
{
this.channel = channel;
this.path = channel.filePath();
}
public Builder compressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
Set ChunkCache
to use. Params: - chunkCache – ChunkCache object to use for caching
Returns: this object
/**
* Set {@link ChunkCache} to use.
*
* @param chunkCache ChunkCache object to use for caching
* @return this object
*/
public Builder withChunkCache(ChunkCache chunkCache)
{
this.chunkCache = chunkCache;
return this;
}
Provide CompressionMetadata
to use when reading compressed file. Params: - metadata – CompressionMetadata to use
Returns: this object
/**
* Provide {@link CompressionMetadata} to use when reading compressed file.
*
* @param metadata CompressionMetadata to use
* @return this object
*/
public Builder withCompressionMetadata(CompressionMetadata metadata)
{
this.compressed = Objects.nonNull(metadata);
this.compressionMetadata = metadata;
return this;
}
Set whether to use mmap for reading
Params: - mmapped – true if using mmap
Returns: this instance
/**
* Set whether to use mmap for reading
*
* @param mmapped true if using mmap
* @return this instance
*/
public Builder mmapped(boolean mmapped)
{
this.mmapped = mmapped;
return this;
}
Set the buffer size to use (if appropriate).
Params: - bufferSize – Buffer size in bytes
Returns: this instance
/**
* Set the buffer size to use (if appropriate).
*
* @param bufferSize Buffer size in bytes
* @return this instance
*/
public Builder bufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
return this;
}
Set the buffer type (on heap or off heap) to use (if appropriate).
Params: - bufferType – Buffer type to use
Returns: this instance
/**
* Set the buffer type (on heap or off heap) to use (if appropriate).
*
* @param bufferType Buffer type to use
* @return this instance
*/
public Builder bufferType(BufferType bufferType)
{
this.bufferType = bufferType;
return this;
}
Complete building FileHandle
without overriding file length. See Also:
/**
* Complete building {@link FileHandle} without overriding file length.
*
* @see #complete(long)
*/
public FileHandle complete()
{
return complete(-1L);
}
Complete building FileHandle
with the given length, which overrides the file length. Params: - overrideLength – Override file length (in bytes) so that read cannot go further than this value.
If the value is less than or equal to 0, then the value is ignored.
Returns: Built file
/**
* Complete building {@link FileHandle} with the given length, which overrides the file length.
*
* @param overrideLength Override file length (in bytes) so that read cannot go further than this value.
* If the value is less than or equal to 0, then the value is ignored.
* @return Built file
*/
@SuppressWarnings("resource")
public FileHandle complete(long overrideLength)
{
if (channel == null)
{
channel = new ChannelProxy(path);
}
ChannelProxy channelCopy = channel.sharedCopy();
try
{
if (compressed && compressionMetadata == null)
compressionMetadata = CompressionMetadata.create(channelCopy.filePath());
long length = overrideLength > 0 ? overrideLength : compressed ? compressionMetadata.compressedFileLength : channelCopy.size();
RebuffererFactory rebuffererFactory;
if (mmapped)
{
if (compressed)
{
regions = MmappedRegions.map(channelCopy, compressionMetadata);
rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channelCopy, compressionMetadata,
regions));
}
else
{
updateRegions(channelCopy, length);
rebuffererFactory = new MmapRebufferer(channelCopy, length, regions.sharedCopy());
}
}
else
{
regions = null;
if (compressed)
{
rebuffererFactory = maybeCached(new CompressedChunkReader.Standard(channelCopy, compressionMetadata));
}
else
{
int chunkSize = DiskOptimizationStrategy.roundForCaching(bufferSize, ChunkCache.roundUp);
rebuffererFactory = maybeCached(new SimpleChunkReader(channelCopy, length, bufferType, chunkSize));
}
}
Cleanup cleanup = new Cleanup(channelCopy, rebuffererFactory, compressionMetadata, chunkCache);
return new FileHandle(cleanup, channelCopy, rebuffererFactory, compressionMetadata, length);
}
catch (Throwable t)
{
channelCopy.close();
throw t;
}
}
public Throwable close(Throwable accumulate)
{
if (!compressed && regions != null)
accumulate = regions.close(accumulate);
if (channel != null)
return channel.close(accumulate);
return accumulate;
}
public void close()
{
maybeFail(close(null));
}
private RebuffererFactory maybeCached(ChunkReader reader)
{
if (chunkCache != null && chunkCache.capacity() > 0)
return chunkCache.wrap(reader);
return reader;
}
private void updateRegions(ChannelProxy channel, long length)
{
if (regions != null && !regions.isValid(channel))
{
Throwable err = regions.close(null);
if (err != null)
logger.error("Failed to close mapped regions", err);
regions = null;
}
if (regions == null)
regions = MmappedRegions.map(channel, length);
else
regions.extend(length);
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "(path='" + path() + '\'' +
", length=" + rebuffererFactory.fileLength() +
')';
}
}