/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.utils.Throwables.merge;
Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
fsync incrementally if Config.trickle_fsync is enabled.
/**
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
public class SequentialWriter extends BufferedDataOutputStreamPlus implements Transactional
{
// absolute path to the given file
private final String filePath;
// Offset for start of buffer relative to underlying file
protected long bufferOffset;
protected final FileChannel fchannel;
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
private final SequentialWriterOption option;
private int bytesSinceTrickleFsync = 0;
protected long lastFlushOffset;
protected Runnable runPostFlush;
private final TransactionalProxy txnProxy = txnProxy();
// due to lack of multiple-inheritance, we proxy our transactional implementation
protected class TransactionalProxy extends AbstractTransactional
{
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
// close is idempotent
try { channel.close(); }
catch (Throwable t) { accumulate = merge(accumulate, t); }
if (buffer != null)
{
try { FileUtils.clean(buffer); }
catch (Throwable t) { accumulate = merge(accumulate, t); }
buffer = null;
}
return accumulate;
}
protected void doPrepare()
{
syncInternal();
}
protected Throwable doCommit(Throwable accumulate)
{
return accumulate;
}
protected Throwable doAbort(Throwable accumulate)
{
return accumulate;
}
}
// TODO: we should specify as a parameter if we permit an existing file or not
private static FileChannel openChannel(File file)
{
try
{
if (file.exists())
{
return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
}
else
{
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try
{
SyncUtil.trySyncDir(file.getParentFile());
}
catch (Throwable t)
{
try { channel.close(); }
catch (Throwable t2) { t.addSuppressed(t2); }
}
return channel;
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
Create heap-based, non-compressed SequenialWriter with default buffer size(64k).
Params: - file – File to write
/**
* Create heap-based, non-compressed SequenialWriter with default buffer size(64k).
*
* @param file File to write
*/
public SequentialWriter(File file)
{
this(file, SequentialWriterOption.DEFAULT);
}
Create SequentialWriter for given file with specific writer option.
Params: - file – File to write
- option – Writer option
/**
* Create SequentialWriter for given file with specific writer option.
*
* @param file File to write
* @param option Writer option
*/
public SequentialWriter(File file, SequentialWriterOption option)
{
super(openChannel(file), option.allocateBuffer());
strictFlushing = true;
fchannel = (FileChannel)channel;
filePath = file.getAbsolutePath();
this.option = option;
}
public void skipBytes(int numBytes) throws IOException
{
flush();
fchannel.position(fchannel.position() + numBytes);
bufferOffset = fchannel.position();
}
Synchronize file contents with disk.
/**
* Synchronize file contents with disk.
*/
public void sync()
{
syncInternal();
}
protected void syncDataOnlyInternal()
{
try
{
SyncUtil.force(fchannel, false);
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
}
/*
* This is only safe to call before truncation or close for CompressedSequentialWriter
* Otherwise it will leave a non-uniform size compressed block in the middle of the file
* and the compressed format can't handle that.
*/
protected void syncInternal()
{
doFlush(0);
syncDataOnlyInternal();
}
@Override
protected void doFlush(int count)
{
flushData();
if (option.trickleFsync())
{
bytesSinceTrickleFsync += buffer.position();
if (bytesSinceTrickleFsync >= option.trickleFsyncByteInterval())
{
syncDataOnlyInternal();
bytesSinceTrickleFsync = 0;
}
}
// Remember that we wrote, so we don't write it again on next flush().
resetBuffer();
}
public void setPostFlushListener(Runnable runPostFlush)
{
assert this.runPostFlush == null;
this.runPostFlush = runPostFlush;
}
Override this method instead of overriding flush()
Throws: - FSWriteError – on any I/O error.
/**
* Override this method instead of overriding flush()
* @throws FSWriteError on any I/O error.
*/
protected void flushData()
{
try
{
buffer.flip();
channel.write(buffer);
lastFlushOffset += buffer.position();
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
if (runPostFlush != null)
runPostFlush.run();
}
public boolean hasPosition()
{
return true;
}
public long position()
{
return current();
}
Returns the current file pointer of the underlying on-disk file.
Note that since write works by buffering data, the value of this will increase by buffer
size and not every write to the writer will modify this value.
Furthermore, for compressed files, this value refers to compressed data, while the
writer getFilePointer() refers to uncompressedFile
Returns: the current file pointer
/**
* Returns the current file pointer of the underlying on-disk file.
* Note that since write works by buffering data, the value of this will increase by buffer
* size and not every write to the writer will modify this value.
* Furthermore, for compressed files, this value refers to compressed data, while the
* writer getFilePointer() refers to uncompressedFile
*
* @return the current file pointer
*/
public long getOnDiskFilePointer()
{
return position();
}
public long getEstimatedOnDiskBytesWritten()
{
return getOnDiskFilePointer();
}
public long length()
{
try
{
return Math.max(current(), fchannel.size());
}
catch (IOException e)
{
throw new FSReadError(e, getPath());
}
}
public String getPath()
{
return filePath;
}
protected void resetBuffer()
{
bufferOffset = current();
buffer.clear();
}
protected long current()
{
return bufferOffset + (buffer == null ? 0 : buffer.position());
}
public DataPosition mark()
{
return new BufferedFileWriterMark(current());
}
Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
the underlying file to the marked position
/**
* Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
* the underlying file to the marked position
*/
public void resetAndTruncate(DataPosition mark)
{
assert mark instanceof BufferedFileWriterMark;
long previous = current();
long truncateTarget = ((BufferedFileWriterMark) mark).pointer;
// If we're resetting to a point within our buffered data, just adjust our buffered position to drop bytes to
// the right of the desired mark.
if (previous - truncateTarget <= buffer.position())
{
buffer.position(buffer.position() - ((int) (previous - truncateTarget)));
return;
}
// synchronize current buffer with disk - we don't want any data loss
syncInternal();
// truncate file to given position
truncate(truncateTarget);
try
{
fchannel.position(truncateTarget);
}
catch (IOException e)
{
throw new FSReadError(e, getPath());
}
bufferOffset = truncateTarget;
resetBuffer();
}
public long getLastFlushOffset()
{
return lastFlushOffset;
}
public void truncate(long toSize)
{
try
{
fchannel.truncate(toSize);
lastFlushOffset = toSize;
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
}
public boolean isOpen()
{
return channel.isOpen();
}
public final void prepareToCommit()
{
txnProxy.prepareToCommit();
}
public final Throwable commit(Throwable accumulate)
{
return txnProxy.commit(accumulate);
}
public final Throwable abort(Throwable accumulate)
{
return txnProxy.abort(accumulate);
}
@Override
public final void close()
{
if (option.finishOnClose())
txnProxy.finish();
else
txnProxy.close();
}
public final void finish()
{
txnProxy.finish();
}
protected TransactionalProxy txnProxy()
{
return new TransactionalProxy();
}
Class to hold a mark to the position of the file
/**
* Class to hold a mark to the position of the file
*/
protected static class BufferedFileWriterMark implements DataPosition
{
final long pointer;
public BufferedFileWriterMark(long pointer)
{
this.pointer = pointer;
}
}
}