package org.apache.cassandra.hints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
final class HintsWriteExecutor
{
private static final Logger logger = LoggerFactory.getLogger(HintsWriteExecutor.class);
static final int WRITE_BUFFER_SIZE = 256 << 10;
private final HintsCatalog catalog;
private final ByteBuffer writeBuffer;
private final ExecutorService executor;
HintsWriteExecutor(HintsCatalog catalog)
{
this.catalog = catalog;
writeBuffer = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE);
executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("HintsWriteExecutor", 1);
}
void shutdownBlocking()
{
executor.shutdown();
try
{
executor.awaitTermination(1, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
Future<?> flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferTask(buffer, bufferPool));
}
Future<?> flushBufferPool(HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferPoolTask(bufferPool));
}
Future<?> flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
{
return executor.submit(new PartiallyFlushBufferPoolTask(bufferPool, stores));
}
void fsyncWritersBlockingly(Iterable<HintsStore> stores)
{
try
{
executor.submit(new FsyncWritersTask(stores)).get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
}
Future<?> closeWriter(HintsStore store)
{
return executor.submit(store::closeWriter);
}
Future<?> closeAllWriters()
{
return executor.submit(() -> catalog.stores().forEach(HintsStore::closeWriter));
}
private final class FlushBufferTask implements Runnable
{
private final HintsBuffer buffer;
private final HintsBufferPool bufferPool;
FlushBufferTask(HintsBuffer buffer, HintsBufferPool bufferPool)
{
this.buffer = buffer;
this.bufferPool = bufferPool;
}
public void run()
{
buffer.waitForModifications();
try
{
flush(buffer);
}
finally
{
HintsBuffer recycledBuffer = buffer.recycle();
bufferPool.offer(recycledBuffer);
}
}
}
private final class FlushBufferPoolTask implements Runnable
{
private final HintsBufferPool bufferPool;
FlushBufferPoolTask(HintsBufferPool bufferPool)
{
this.bufferPool = bufferPool;
}
public void run()
{
HintsBuffer buffer = bufferPool.currentBuffer();
buffer.waitForModifications();
try
{
flush(buffer);
}
catch(FSError e)
{
logger.error("Unable to flush hint buffer: {}", e.getLocalizedMessage(), e);
FileUtils.handleFSErrorAndPropagate(e);
}
}
}
private final class PartiallyFlushBufferPoolTask implements Runnable
{
private final HintsBufferPool bufferPool;
private final Iterable<HintsStore> stores;
PartiallyFlushBufferPoolTask(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
{
this.bufferPool = bufferPool;
this.stores = stores;
}
public void run()
{
HintsBuffer buffer = bufferPool.currentBuffer();
buffer.waitForModifications();
stores.forEach(store -> flush(buffer.consumingHintsIterator(store.hostId), store));
}
}
private final class FsyncWritersTask implements Runnable
{
private final Iterable<HintsStore> stores;
FsyncWritersTask(Iterable<HintsStore> stores)
{
this.stores = stores;
}
public void run()
{
stores.forEach(HintsStore::fsyncWriter);
catalog.fsyncDirectory();
}
}
private void flush(HintsBuffer buffer)
{
buffer.hostIds().forEach(hostId -> flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
}
private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
{
while (true)
{
if (iterator.hasNext())
flushInternal(iterator, store);
if (!iterator.hasNext())
break;
store.closeWriter();
}
}
@SuppressWarnings("resource")
private void flushInternal(Iterator<ByteBuffer> iterator, HintsStore store)
{
long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
HintsWriter writer = store.getOrOpenWriter();
try (HintsWriter.Session session = writer.newSession(writeBuffer))
{
while (iterator.hasNext())
{
session.append(iterator.next());
if (session.position() >= maxHintsFileSize)
break;
}
}
catch (IOException e)
{
throw new FSWriteError(e, writer.descriptor().fileName());
}
}
}