package org.apache.cassandra.net;
import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.SystemTimeSource;
import org.apache.cassandra.utils.TimeSource;
import org.apache.cassandra.utils.concurrent.IntervalLock;
public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState>
{
static final String HIGH_RATIO = "high_ratio";
static final String FACTOR = "factor";
static final String FLOW = "flow";
private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
private static final String BACK_PRESSURE_FACTOR = "5";
private static final String BACK_PRESSURE_FLOW = "FAST";
private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class);
private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
protected final TimeSource timeSource;
protected final double highRatio;
protected final int factor;
protected final Flow flow;
protected final long windowSize;
private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build();
enum Flow
{
FAST,
SLOW
}
public static ParameterizedClass withDefaultParams()
{
return new ParameterizedClass(RateBasedBackPressure.class.getName(),
ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO,
FACTOR, BACK_PRESSURE_FACTOR,
FLOW, BACK_PRESSURE_FLOW));
}
public RateBasedBackPressure(Map<String, Object> args)
{
this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout());
}
@VisibleForTesting
public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
{
if (args.size() != 3)
{
throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
+ " requires 3 arguments: high ratio, back-pressure factor and flow type.");
}
try
{
highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim());
factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim());
flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase());
}
catch (Exception ex)
{
throw new IllegalArgumentException(ex.getMessage(), ex);
}
if (highRatio <= 0 || highRatio > 1)
{
throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1");
}
if (factor < 1)
{
throw new IllegalArgumentException("Back-pressure factor must be >= 1");
}
if (windowSize < 10)
{
throw new IllegalArgumentException("Back-pressure window size must be >= 10");
}
this.timeSource = timeSource;
this.windowSize = windowSize;
logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.",
highRatio, factor, flow, windowSize);
}
@Override
public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit)
{
boolean isUpdated = false;
double minRateLimit = Double.POSITIVE_INFINITY;
double maxRateLimit = Double.NEGATIVE_INFINITY;
double minIncomingRate = Double.POSITIVE_INFINITY;
RateLimiter currentMin = null;
RateLimiter currentMax = null;
for (RateBasedBackPressureState backPressure : states)
{
double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS);
double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS);
if (incomingRate < minIncomingRate)
minIncomingRate = incomingRate;
if (backPressure.tryIntervalLock(windowSize))
{
isUpdated = true;
try
{
RateLimiter limiter = backPressure.rateLimiter;
if (outgoingRate > 0)
{
double actualRatio = incomingRate / outgoingRate;
double limiterRate = limiter.getRate();
if (actualRatio >= highRatio)
{
if (limiterRate <= outgoingRate)
{
double newRate = limiterRate + ((limiterRate * factor) / 100);
if (newRate > 0 && newRate != Double.POSITIVE_INFINITY)
{
limiter.setRate(newRate);
}
}
}
else
{
double newRate = incomingRate - ((incomingRate * factor) / 100);
if (newRate > 0 && newRate < limiterRate)
{
limiter.setRate(newRate);
}
}
if (logger.isTraceEnabled())
{
logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}",
backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate());
}
}
else
{
limiter.setRate(Double.POSITIVE_INFINITY);
}
backPressure.incomingRate.prune();
backPressure.outgoingRate.prune();
}
finally
{
backPressure.releaseIntervalLock();
}
}
if (backPressure.rateLimiter.getRate() <= minRateLimit)
{
minRateLimit = backPressure.rateLimiter.getRate();
currentMin = backPressure.rateLimiter;
}
if (backPressure.rateLimiter.getRate() >= maxRateLimit)
{
maxRateLimit = backPressure.rateLimiter.getRate();
currentMax = backPressure.rateLimiter;
}
}
if (!states.isEmpty())
{
try
{
IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource));
if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
{
try
{
if (flow.equals(Flow.FAST))
rateLimiter.limiter = currentMax;
else
rateLimiter.limiter = currentMin;
tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states);
}
finally
{
rateLimiter.releaseIntervalLock();
}
}
long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
}
catch (ExecutionException ex)
{
throw new IllegalStateException(ex);
}
}
}
@Override
public RateBasedBackPressureState newState(InetAddress host)
{
return new RateBasedBackPressureState(host, timeSource, windowSize);
}
@VisibleForTesting
RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states)
{
IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY);
}
@VisibleForTesting
boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
{
if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
{
timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.",
rateLimiter, timeoutInNanos);
return false;
}
return true;
}
private static class IntervalRateLimiter extends IntervalLock
{
public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY);
IntervalRateLimiter(TimeSource timeSource)
{
super(timeSource);
}
}
}