package org.apache.cassandra.utils;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.Locale;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
private static final String DEBUG_COALESCING_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug";
private static final boolean DEBUG_COALESCING = Boolean.getBoolean(DEBUG_COALESCING_PROPERTY);
private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path";
private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug");
static
{
if (DEBUG_COALESCING)
{
File directory = new File(DEBUG_COALESCING_PATH);
if (directory.exists())
FileUtils.deleteRecursive(directory);
if (!directory.mkdirs())
throw new ExceptionInInitializerError("Couldn't create log dir");
}
}
@VisibleForTesting
interface Clock
{
long nanoTime();
}
@VisibleForTesting
static Clock CLOCK = new Clock()
{
public long nanoTime()
{
return System.nanoTime();
}
};
public static interface Coalescable
{
long timestampNanos();
}
@VisibleForTesting
static void parkLoop(long nanos)
{
long now = System.nanoTime();
final long timer = now + nanos;
final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
now = System.nanoTime();
}
while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
return false;
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
return false;
while (sleep * 2 < maxCoalesceWindow)
sleep *= 2;
parker.park(sleep);
return true;
}
public static abstract class CoalescingStrategy
{
protected final Parker parker;
protected final Logger logger;
protected volatile boolean shouldLogAverage = false;
protected final ByteBuffer logBuffer;
private RandomAccessFile ras;
private final String displayName;
protected CoalescingStrategy(Parker parker, Logger logger, String displayName)
{
this.parker = parker;
this.logger = logger;
this.displayName = displayName;
if (DEBUG_COALESCING)
{
NamedThreadFactory.createThread(() ->
{
while (true)
{
try
{
Thread.sleep(5000);
}
catch (InterruptedException e)
{
throw new AssertionError();
}
shouldLogAverage = true;
}
}, displayName + " debug thread").start();
}
RandomAccessFile rasTemp = null;
ByteBuffer logBufferTemp = null;
if (DEBUG_COALESCING)
{
try
{
File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH));
rasTemp = new RandomAccessFile(outFile, "rw");
logBufferTemp = ras.getChannel().map(MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
logBufferTemp.putLong(0);
}
catch (Exception e)
{
logger.error("Unable to create output file for debugging coalescing", e);
}
}
ras = rasTemp;
logBuffer = logBufferTemp;
}
final protected void debugGap(long averageGap)
{
if (DEBUG_COALESCING && shouldLogAverage)
{
shouldLogAverage = false;
logger.info("{} gap {}μs", this, TimeUnit.NANOSECONDS.toMicros(averageGap));
}
}
final protected void debugTimestamp(long timestamp)
{
if(DEBUG_COALESCING && logBuffer != null)
{
logBuffer.putLong(0, logBuffer.getLong(0) + 1);
logBuffer.putLong(timestamp);
}
}
final protected <C extends Coalescable> void debugTimestamps(Collection<C> coalescables)
{
if (DEBUG_COALESCING)
{
for (C coalescable : coalescables)
{
debugTimestamp(coalescable.timestampNanos());
}
}
}
public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
Preconditions.checkArgument(out.isEmpty(), "out list should be empty");
coalesceInternal(input, out, maxItems);
}
protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException;
}
@VisibleForTesting
interface Parker
{
void park(long nanos);
}
private static final Parker PARKER = new Parker()
{
@Override
public void park(long nanos)
{
parkLoop(nanos);
}
};
@VisibleForTesting
static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy
{
private static final int INDEX_SHIFT = 26;
private static final long BUCKET_INTERVAL = 1L << 26;
private static final int BUCKET_COUNT = 16;
private static final long INTERVAL = BUCKET_INTERVAL * BUCKET_COUNT;
private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1);
private long epoch = CLOCK.nanoTime();
private final int samples[] = new int[BUCKET_COUNT];
private long sum = 0;
private final long maxCoalesceWindow;
public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
{
super(parker, logger, displayName);
this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
sum = 0;
}
private void logSample(long nanos)
{
debugTimestamp(nanos);
long epoch = this.epoch;
long delta = nanos - epoch;
if (delta < 0)
return;
if (delta > INTERVAL)
epoch = rollepoch(delta, epoch, nanos);
int ix = ix(nanos);
samples[ix]++;
if (ix != ix(epoch - 1))
sum++;
}
private long averageGap()
{
if (sum == 0)
return Integer.MAX_VALUE;
return MEASURED_INTERVAL / sum;
}
private long rollepoch(long delta, long epoch, long nanos)
{
if (delta > 2 * INTERVAL)
{
epoch = epoch(nanos);
sum = 0;
Arrays.fill(samples, 0);
}
else
{
sum += samples[ix(epoch - 1)];
while (epoch + INTERVAL < nanos)
{
int index = ix(epoch);
sum -= samples[index];
samples[index] = 0;
epoch += BUCKET_INTERVAL;
}
}
this.epoch = epoch;
return epoch;
}
private long epoch(long latestNanos)
{
return (latestNanos - MEASURED_INTERVAL) & ~(BUCKET_INTERVAL - 1);
}
private int ix(long nanos)
{
return (int) ((nanos >>> INDEX_SHIFT) & 15);
}
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
logSample(qm.timestampNanos());
long averageGap = averageGap();
debugGap(averageGap);
int count = out.size();
if (maybeSleep(count, averageGap, maxCoalesceWindow, parker))
{
input.drainTo(out, maxItems - out.size());
int prevCount = count;
count = out.size();
for (int i = prevCount; i < count; i++)
logSample(out.get(i).timestampNanos());
}
}
@Override
public String toString()
{
return "Time horizon moving average";
}
}
@VisibleForTesting
static class MovingAverageCoalescingStrategy extends CoalescingStrategy
{
private final int samples[] = new int[16];
private long lastSample = 0;
private int index = 0;
private long sum = 0;
private final long maxCoalesceWindow;
public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
{
super(parker, logger, displayName);
this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
for (int ii = 0; ii < samples.length; ii++)
samples[ii] = Integer.MAX_VALUE;
sum = Integer.MAX_VALUE * (long)samples.length;
}
private long logSample(int value)
{
sum -= samples[index];
sum += value;
samples[index] = value;
index++;
index = index & ((1 << 4) - 1);
return sum / 16;
}
private long notifyOfSample(long sample)
{
debugTimestamp(sample);
if (sample > lastSample)
{
final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample));
lastSample = sample;
return logSample(delta);
}
else
{
return logSample(1);
}
}
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
debugGap(average);
if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
input.drainTo(out, maxItems - out.size());
}
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@Override
public String toString()
{
return "Moving average";
}
}
@VisibleForTesting
static class FixedCoalescingStrategy extends CoalescingStrategy
{
private final long coalesceWindow;
public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
{
super(parker, logger, displayName);
coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros);
}
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
input.drainTo(out, maxItems - out.size());
if (out.size() < enough) {
parker.park(coalesceWindow);
input.drainTo(out, maxItems - out.size());
}
}
debugTimestamps(out);
}
@Override
public String toString()
{
return "Fixed";
}
}
@VisibleForTesting
static class DisabledCoalescingStrategy extends CoalescingStrategy
{
public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
{
super(parker, logger, displayName);
}
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
input.drainTo(out, maxItems - 1);
}
debugTimestamps(out);
}
@Override
public String toString()
{
return "Disabled";
}
}
@VisibleForTesting
static CoalescingStrategy newCoalescingStrategy(String strategy,
int coalesceWindow,
Parker parker,
Logger logger,
String displayName)
{
String classname = null;
String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH);
switch(strategyCleaned)
{
case "MOVINGAVERAGE":
classname = MovingAverageCoalescingStrategy.class.getName();
break;
case "FIXED":
classname = FixedCoalescingStrategy.class.getName();
break;
case "TIMEHORIZON":
classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName();
break;
case "DISABLED":
classname = DisabledCoalescingStrategy.class.getName();
break;
default:
classname = strategy;
}
try
{
Class<?> clazz = Class.forName(classname);
if (!CoalescingStrategy.class.isAssignableFrom(clazz))
{
throw new RuntimeException(classname + " is not an instance of CoalescingStrategy");
}
Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class);
return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName)
{
return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName);
}
}