/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.net;

import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.SystemTimeSource;
import org.apache.cassandra.utils.TimeSource;
import org.apache.cassandra.utils.concurrent.IntervalLock;

Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed over a sliding time window with size equal to write RPC timeout.
/** * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed * over a sliding time window with size equal to write RPC timeout. */
public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState> { static final String HIGH_RATIO = "high_ratio"; static final String FACTOR = "factor"; static final String FLOW = "flow"; private static final String BACK_PRESSURE_HIGH_RATIO = "0.90"; private static final String BACK_PRESSURE_FACTOR = "5"; private static final String BACK_PRESSURE_FLOW = "FAST"; private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class); private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS); private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); protected final TimeSource timeSource; protected final double highRatio; protected final int factor; protected final Flow flow; protected final long windowSize; private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build(); enum Flow { FAST, SLOW } public static ParameterizedClass withDefaultParams() { return new ParameterizedClass(RateBasedBackPressure.class.getName(), ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO, FACTOR, BACK_PRESSURE_FACTOR, FLOW, BACK_PRESSURE_FLOW)); } public RateBasedBackPressure(Map<String, Object> args) { this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout()); } @VisibleForTesting public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize) { if (args.size() != 3) { throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName() + " requires 3 arguments: high ratio, back-pressure factor and flow type."); } try { highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim()); factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim()); flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase()); } catch (Exception ex) { throw new IllegalArgumentException(ex.getMessage(), ex); } if (highRatio <= 0 || highRatio > 1) { throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1"); } if (factor < 1) { throw new IllegalArgumentException("Back-pressure factor must be >= 1"); } if (windowSize < 10) { throw new IllegalArgumentException("Back-pressure window size must be >= 10"); } this.timeSource = timeSource; this.windowSize = windowSize; logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.", highRatio, factor, flow, windowSize); } @Override public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit) { // Go through the back-pressure states, try updating each of them and collect min/max rates: boolean isUpdated = false; double minRateLimit = Double.POSITIVE_INFINITY; double maxRateLimit = Double.NEGATIVE_INFINITY; double minIncomingRate = Double.POSITIVE_INFINITY; RateLimiter currentMin = null; RateLimiter currentMax = null; for (RateBasedBackPressureState backPressure : states) { // Get the incoming/outgoing rates: double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS); double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS); // Compute the min incoming rate: if (incomingRate < minIncomingRate) minIncomingRate = incomingRate; // Try acquiring the interval lock: if (backPressure.tryIntervalLock(windowSize)) { // If acquired, proceed updating thi back-pressure state rate limit: isUpdated = true; try { RateLimiter limiter = backPressure.rateLimiter; // If we have sent any outgoing requests during this time window, go ahead with rate limiting // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in // RateBasedBackPressureState): if (outgoingRate > 0) { // Compute the incoming/outgoing ratio: double actualRatio = incomingRate / outgoingRate; // If the ratio is above the high mark, try growing by the back-pressure factor: double limiterRate = limiter.getRate(); if (actualRatio >= highRatio) { // Only if the outgoing rate is able to keep up with the rate increase: if (limiterRate <= outgoingRate) { double newRate = limiterRate + ((limiterRate * factor) / 100); if (newRate > 0 && newRate != Double.POSITIVE_INFINITY) { limiter.setRate(newRate); } } } // If below, set the rate limiter at the incoming rate, decreased by factor: else { // Only if the new rate is actually less than the actual rate: double newRate = incomingRate - ((incomingRate * factor) / 100); if (newRate > 0 && newRate < limiterRate) { limiter.setRate(newRate); } } if (logger.isTraceEnabled()) { logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}", backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate()); } } // Otherwise reset the rate limiter: else { limiter.setRate(Double.POSITIVE_INFINITY); } // Housekeeping: pruning windows and resetting the last check timestamp! backPressure.incomingRate.prune(); backPressure.outgoingRate.prune(); } finally { backPressure.releaseIntervalLock(); } } if (backPressure.rateLimiter.getRate() <= minRateLimit) { minRateLimit = backPressure.rateLimiter.getRate(); currentMin = backPressure.rateLimiter; } if (backPressure.rateLimiter.getRate() >= maxRateLimit) { maxRateLimit = backPressure.rateLimiter.getRate(); currentMax = backPressure.rateLimiter; } } // Now find the rate limiter corresponding to the replica group represented by these back-pressure states: if (!states.isEmpty()) { try { // Get the rate limiter: IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource)); // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group: if (isUpdated && rateLimiter.tryIntervalLock(windowSize)) { try { // Update the rate limiter value based on the configured flow: if (flow.equals(Flow.FAST)) rateLimiter.limiter = currentMax; else rateLimiter.limiter = currentMin; tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states); } finally { rateLimiter.releaseIntervalLock(); } } // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate // limiting to be stable within the group itself. // Finally apply the rate limit with a max pause time equal to the provided timeout minus the // response time computed from the incoming rate, to reduce the number of client timeouts by taking into // account how long it could take to process responses after back-pressure: long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate); doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos)); } catch (ExecutionException ex) { throw new IllegalStateException(ex); } } } @Override public RateBasedBackPressureState newState(InetAddress host) { return new RateBasedBackPressureState(host, timeSource, windowSize); } @VisibleForTesting RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states) { IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states); return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY); } @VisibleForTesting boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos) { if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS)) { timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS); oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.", rateLimiter, timeoutInNanos); return false; } return true; } private static class IntervalRateLimiter extends IntervalLock { public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY); IntervalRateLimiter(TimeSource timeSource) { super(timeSource); } } }