/*
 * Copyright 2014 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.traffic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.AbstractCollection;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

This implementation of the AbstractTrafficShapingHandler is for global and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever the number of opened channels and a per channel limitation of the bandwidth.

This version shall not be in the same pipeline than other TrafficShapingHandler.

The general use should be as follow:
  • Create your unique GlobalChannelTrafficShapingHandler like:

    GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);

    The executor could be the underlying IO worker pool
    pipeline.addLast(myHandler);

    Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created and shared among all channels as the counter must be shared among all channels.

    Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) or the check interval (in millisecond) that represents the delay between two computations of the bandwidth and so the call back of the doAccounting method (0 means no accounting at all).
    Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets, respectively Global and Channel.

    A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, it is recommended to set a positive value, even if it is high since the precision of the Traffic Shaping depends on the period where the traffic is computed. The highest the interval, the less precise the traffic shaping will be. It is suggested as higher value something close to 5 or 10 minutes.

    maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.

  • In your handler, you should consider to use the channel.isWritable() and channelWritabilityChanged(ctx) to handle writability, or through future.addListener(new GenericFutureListener()) on the future returned by ctx.write().
  • You shall also consider to have object size in read or write operations relatively adapted to the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.

  • Some configuration methods will be taken as best effort, meaning that all already scheduled traffics will not be changed, but only applied to new traffics.
    So the expected usage of those methods are to be used not too often, accordingly to the traffic shaping configuration.

Be sure to call release() once this handler is not needed anymore to release all internal resources. This will not shutdown the EventExecutor as it may be shared, so you need to do this by your own.
/** * This implementation of the {@link AbstractTrafficShapingHandler} is for global * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever * the number of opened channels and a per channel limitation of the bandwidth.<br><br> * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br> * * The general use should be as follow:<br> * <ul> * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br> * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br> * The executor could be the underlying IO worker pool<br> * <tt>pipeline.addLast(myHandler);</tt><br><br> * * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * and shared among all channels as the counter must be shared among all channels.</b><br><br> * * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br> * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets, * respectively Global and Channel.<br><br> * * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close * to 5 or 10 minutes.<br><br> * * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br> * </li> * <li>In your handler, you should consider to use the {@code channel.isWritable()} and * {@code channelWritabilityChanged(ctx)} to handle writability, or through * {@code future.addListener(new GenericFutureListener())} on the future returned by * {@code ctx.write()}.</li> * <li>You shall also consider to have object size in read or write operations relatively adapted to * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li> * <li>Some configuration methods will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.<br> * So the expected usage of those methods are to be used not too often, * accordingly to the traffic shaping configuration.</li> * </ul><br> * * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. */
@Sharable public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
All queues per channel
/** * All queues per channel */
final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
Global queues size
/** * Global queues size */
private final AtomicLong queuesSize = new AtomicLong();
Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
/** * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same) */
private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
/** * Maximum cumulative read bytes for one channel among all (as long as channels stay the same) */
private final AtomicLong cumulativeReadBytes = new AtomicLong();
Max size in the list before proposing to stop writing new objects from next handlers for all channel (global)
/** * Max size in the list before proposing to stop writing new objects from next handlers * for all channel (global) */
volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
Limit in B/s to apply to write
/** * Limit in B/s to apply to write */
private volatile long writeChannelLimit;
Limit in B/s to apply to read
/** * Limit in B/s to apply to read */
private volatile long readChannelLimit; private static final float DEFAULT_DEVIATION = 0.1F; private static final float MAX_DEVIATION = 0.4F; private static final float DEFAULT_SLOWDOWN = 0.4F; private static final float DEFAULT_ACCELERATION = -0.1F; private volatile float maxDeviation; private volatile float accelerationFactor; private volatile float slowDownFactor; private volatile boolean readDeviationActive; private volatile boolean writeDeviationActive; static final class PerChannel { ArrayDeque<ToSend> messagesQueue; TrafficCounter channelTrafficCounter; long queueSize; long lastWriteTimestamp; long lastReadTimestamp; }
Create the global TrafficCounter
/** * Create the global TrafficCounter */
void createGlobalTrafficCounter(ScheduledExecutorService executor) { // Default setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION); if (executor == null) { throw new IllegalArgumentException("Executor must not be null"); } TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval); setTrafficCounter(tc); tc.start(); } @Override protected int userDefinedWritabilityIndex() { return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX; }
Create a new instance.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • writeGlobalLimit – 0 or a limit in bytes/s
  • readGlobalLimit – 0 or a limit in bytes/s
  • writeChannelLimit – 0 or a limit in bytes/s
  • readChannelLimit – 0 or a limit in bytes/s
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
  • maxTime – The maximum delay to wait in case of traffic excess.
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit * 0 or a limit in bytes/s * @param writeChannelLimit * 0 or a limit in bytes/s * @param readChannelLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. * @param maxTime * The maximum delay to wait in case of traffic excess. */
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) { super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime); createGlobalTrafficCounter(executor); this.writeChannelLimit = writeChannelLimit; this.readChannelLimit = readChannelLimit; }
Create a new instance.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • writeGlobalLimit – 0 or a limit in bytes/s
  • readGlobalLimit – 0 or a limit in bytes/s
  • writeChannelLimit – 0 or a limit in bytes/s
  • readChannelLimit – 0 or a limit in bytes/s
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit * 0 or a limit in bytes/s * @param writeChannelLimit * 0 or a limit in bytes/s * @param readChannelLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) { super(writeGlobalLimit, readGlobalLimit, checkInterval); this.writeChannelLimit = writeChannelLimit; this.readChannelLimit = readChannelLimit; createGlobalTrafficCounter(executor); }
Create a new instance.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • writeGlobalLimit – 0 or a limit in bytes/s
  • readGlobalLimit – 0 or a limit in bytes/s
  • writeChannelLimit – 0 or a limit in bytes/s
  • readChannelLimit – 0 or a limit in bytes/s
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit * 0 or a limit in bytes/s * @param writeChannelLimit * 0 or a limit in bytes/s * @param readChannelLimit * 0 or a limit in bytes/s */
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit) { super(writeGlobalLimit, readGlobalLimit); this.writeChannelLimit = writeChannelLimit; this.readChannelLimit = readChannelLimit; createGlobalTrafficCounter(executor); }
Create a new instance.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); }
Create a new instance.
Params:
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. */
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) { createGlobalTrafficCounter(executor); }
Returns:the current max deviation
/** * @return the current max deviation */
public float maxDeviation() { return maxDeviation; }
Returns:the current acceleration factor
/** * @return the current acceleration factor */
public float accelerationFactor() { return accelerationFactor; }
Returns:the current slow down factor
/** * @return the current slow down factor */
public float slowDownFactor() { return slowDownFactor; }
Params:
  • maxDeviation – the maximum deviation to allow during computation of average, default deviation being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
  • slowDownFactor – the factor set as +x% to the too fast client (minimal value being 0, meaning no slow down factor), default being 40% (0.4).
  • accelerationFactor – the factor set as -x% to the too slow client (maximal value being 0, meaning no acceleration factor), default being -10% (-0.1).
/** * @param maxDeviation * the maximum deviation to allow during computation of average, default deviation * being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4. * @param slowDownFactor * the factor set as +x% to the too fast client (minimal value being 0, meaning no * slow down factor), default being 40% (0.4). * @param accelerationFactor * the factor set as -x% to the too slow client (maximal value being 0, meaning no * acceleration factor), default being -10% (-0.1). */
public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) { if (maxDeviation > MAX_DEVIATION) { throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION); } if (slowDownFactor < 0) { throw new IllegalArgumentException("slowDownFactor must be >= 0"); } if (accelerationFactor > 0) { throw new IllegalArgumentException("accelerationFactor must be <= 0"); } this.maxDeviation = maxDeviation; this.accelerationFactor = 1 + accelerationFactor; this.slowDownFactor = 1 + slowDownFactor; } private void computeDeviationCumulativeBytes() { // compute the maximum cumulativeXxxxBytes among still connected Channels long maxWrittenBytes = 0; long maxReadBytes = 0; long minWrittenBytes = Long.MAX_VALUE; long minReadBytes = Long.MAX_VALUE; for (PerChannel perChannel : channelQueues.values()) { long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes(); if (maxWrittenBytes < value) { maxWrittenBytes = value; } if (minWrittenBytes > value) { minWrittenBytes = value; } value = perChannel.channelTrafficCounter.cumulativeReadBytes(); if (maxReadBytes < value) { maxReadBytes = value; } if (minReadBytes > value) { minReadBytes = value; } } boolean multiple = channelQueues.size() > 1; readDeviationActive = multiple && minReadBytes < maxReadBytes / 2; writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2; cumulativeWrittenBytes.set(maxWrittenBytes); cumulativeReadBytes.set(maxReadBytes); } @Override protected void doAccounting(TrafficCounter counter) { computeDeviationCumulativeBytes(); super.doAccounting(counter); } private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) { if (maxGlobal == 0) { // no change return wait; } float ratio = maxLocal / maxGlobal; // if in the boundaries, same value if (ratio > maxDeviation) { if (ratio < 1 - maxDeviation) { return wait; } else { ratio = slowDownFactor; if (wait < MINIMAL_WAIT) { wait = MINIMAL_WAIT; } } } else { ratio = accelerationFactor; } return (long) (wait * ratio); }
Returns:the maxGlobalWriteSize
/** * @return the maxGlobalWriteSize */
public long getMaxGlobalWriteSize() { return maxGlobalWriteSize; }
Note the change will be taken as best effort, meaning that all already scheduled traffics will not be changed, but only applied to new traffics.
So the expected usage of this method is to be used not too often, accordingly to the traffic shaping configuration.
Params:
  • maxGlobalWriteSize – the maximum Global Write Size allowed in the buffer globally for all channels before write suspended is set.
/** * Note the change will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.<br> * So the expected usage of this method is to be used not too often, * accordingly to the traffic shaping configuration. * * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer * globally for all channels before write suspended is set. */
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) { if (maxGlobalWriteSize <= 0) { throw new IllegalArgumentException("maxGlobalWriteSize must be positive"); } this.maxGlobalWriteSize = maxGlobalWriteSize; }
Returns:the global size of the buffers for all queues.
/** * @return the global size of the buffers for all queues. */
public long queuesSize() { return queuesSize.get(); }
Params:
  • newWriteLimit – Channel write limit
  • newReadLimit – Channel read limit
/** * @param newWriteLimit Channel write limit * @param newReadLimit Channel read limit */
public void configureChannel(long newWriteLimit, long newReadLimit) { writeChannelLimit = newWriteLimit; readChannelLimit = newReadLimit; long now = TrafficCounter.milliSecondFromNano(); for (PerChannel perChannel : channelQueues.values()) { perChannel.channelTrafficCounter.resetAccounting(now); } }
Returns:Channel write limit
/** * @return Channel write limit */
public long getWriteChannelLimit() { return writeChannelLimit; }
Params:
  • writeLimit – Channel write limit
/** * @param writeLimit Channel write limit */
public void setWriteChannelLimit(long writeLimit) { writeChannelLimit = writeLimit; long now = TrafficCounter.milliSecondFromNano(); for (PerChannel perChannel : channelQueues.values()) { perChannel.channelTrafficCounter.resetAccounting(now); } }
Returns:Channel read limit
/** * @return Channel read limit */
public long getReadChannelLimit() { return readChannelLimit; }
Params:
  • readLimit – Channel read limit
/** * @param readLimit Channel read limit */
public void setReadChannelLimit(long readLimit) { readChannelLimit = readLimit; long now = TrafficCounter.milliSecondFromNano(); for (PerChannel perChannel : channelQueues.values()) { perChannel.channelTrafficCounter.resetAccounting(now); } }
Release all internal resources of this instance.
/** * Release all internal resources of this instance. */
public final void release() { trafficCounter.stop(); } private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) { // ensure creation is limited to one thread per channel Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel == null) { perChannel = new PerChannel(); perChannel.messagesQueue = new ArrayDeque<ToSend>(); // Don't start it since managed through the Global one perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" + ctx.channel().hashCode(), checkInterval); perChannel.queueSize = 0L; perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano(); perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp; channelQueues.put(key, perChannel); } return perChannel; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { getOrSetPerChannel(ctx); trafficCounter.resetCumulativeTime(); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { trafficCounter.resetCumulativeTime(); Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.remove(key); if (perChannel != null) { // write operations need synchronization synchronized (perChannel) { if (channel.isActive()) { for (ToSend toSend : perChannel.messagesQueue) { long size = calculateSize(toSend.toSend); trafficCounter.bytesRealWriteFlowControl(size); perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); ctx.write(toSend.toSend, toSend.promise); } } else { queuesSize.addAndGet(-perChannel.queueSize); for (ToSend toSend : perChannel.messagesQueue) { if (toSend.toSend instanceof ByteBuf) { ((ByteBuf) toSend.toSend).release(); } } } perChannel.messagesQueue.clear(); } } releaseWriteSuspended(ctx); releaseReadSuspended(ctx); super.handlerRemoved(ctx); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { long size = calculateSize(msg); long now = TrafficCounter.milliSecondFromNano(); if (size > 0) { // compute the number of ms to wait before reopening the channel long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now); Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); long wait = 0; if (perChannel != null) { wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now); if (readDeviationActive) { // now try to balance between the channels long maxLocalRead; maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes(); long maxGlobalRead = cumulativeReadBytes.get(); if (maxLocalRead <= 0) { maxLocalRead = 0; } if (maxGlobalRead < maxLocalRead) { maxGlobalRead = maxLocalRead; } wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait); } } if (wait < waitGlobal) { wait = waitGlobal; } wait = checkWaitReadTime(ctx, wait, now); if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal // time in order to try to limit the traffic // Only AutoRead AND HandlerActive True means Context Active Channel channel = ctx.channel(); ChannelConfig config = channel.config(); if (logger.isDebugEnabled()) { logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':' + isHandlerActive(ctx)); } if (config.isAutoRead() && isHandlerActive(ctx)) { config.setAutoRead(false); channel.attr(READ_SUSPENDED).set(true); // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation Attribute<Runnable> attr = channel.attr(REOPEN_TASK); Runnable reopenTask = attr.get(); if (reopenTask == null) { reopenTask = new ReopenReadTimerTask(ctx); attr.set(reopenTask); } ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); if (logger.isDebugEnabled()) { logger.debug("Suspend final status => " + config.isAutoRead() + ':' + isHandlerActive(ctx) + " will reopened at: " + wait); } } } } informReadOperation(ctx, now); ctx.fireChannelRead(msg); } @Override protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) { Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel != null) { if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) { wait = maxTime; } } return wait; } @Override protected void informReadOperation(final ChannelHandlerContext ctx, final long now) { Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel != null) { perChannel.lastReadTimestamp = now; } } private static final class ToSend { final long relativeTimeAction; final Object toSend; final ChannelPromise promise; final long size; private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) { relativeTimeAction = delay; this.toSend = toSend; this.size = size; this.promise = promise; } } protected long maximumCumulativeWrittenBytes() { return cumulativeWrittenBytes.get(); } protected long maximumCumulativeReadBytes() { return cumulativeReadBytes.get(); }
To allow for instance doAccounting to use the TrafficCounter per channel.
Returns:the list of TrafficCounters that exists at the time of the call.
/** * To allow for instance doAccounting to use the TrafficCounter per channel. * @return the list of TrafficCounters that exists at the time of the call. */
public Collection<TrafficCounter> channelTrafficCounters() { return new AbstractCollection<TrafficCounter>() { @Override public Iterator<TrafficCounter> iterator() { return new Iterator<TrafficCounter>() { final Iterator<PerChannel> iter = channelQueues.values().iterator(); @Override public boolean hasNext() { return iter.hasNext(); } @Override public TrafficCounter next() { return iter.next().channelTrafficCounter; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } @Override public int size() { return channelQueues.size(); } }; } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { long size = calculateSize(msg); long now = TrafficCounter.milliSecondFromNano(); if (size > 0) { // compute the number of ms to wait before continue with the channel long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now); Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); long wait = 0; if (perChannel != null) { wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now); if (writeDeviationActive) { // now try to balance between the channels long maxLocalWrite; maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes(); long maxGlobalWrite = cumulativeWrittenBytes.get(); if (maxLocalWrite <= 0) { maxLocalWrite = 0; } if (maxGlobalWrite < maxLocalWrite) { maxGlobalWrite = maxLocalWrite; } wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait); } } if (wait < waitGlobal) { wait = waitGlobal; } if (wait >= MINIMAL_WAIT) { if (logger.isDebugEnabled()) { logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':' + isHandlerActive(ctx)); } submitWrite(ctx, msg, size, wait, now, promise); return; } } // to maintain order of write submitWrite(ctx, msg, size, 0, now, promise); } @Override protected void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size, final long writedelay, final long now, final ChannelPromise promise) { Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel == null) { // in case write occurs before handlerAdded is raised for this handler // imply a synchronized only if needed perChannel = getOrSetPerChannel(ctx); } final ToSend newToSend; long delay = writedelay; boolean globalSizeExceeded = false; // write operations need synchronization synchronized (perChannel) { if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { trafficCounter.bytesRealWriteFlowControl(size); perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); ctx.write(msg, promise); perChannel.lastWriteTimestamp = now; return; } if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) { delay = maxTime; } newToSend = new ToSend(delay + now, msg, size, promise); perChannel.messagesQueue.addLast(newToSend); perChannel.queueSize += size; queuesSize.addAndGet(size); checkWriteSuspend(ctx, delay, perChannel.queueSize); if (queuesSize.get() > maxGlobalWriteSize) { globalSizeExceeded = true; } } if (globalSizeExceeded) { setUserDefinedWritability(ctx, false); } final long futureNow = newToSend.relativeTimeAction; final PerChannel forSchedule = perChannel; ctx.executor().schedule(new Runnable() { @Override public void run() { sendAllValid(ctx, forSchedule, futureNow); } }, delay, TimeUnit.MILLISECONDS); } private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) { // write operations need synchronization synchronized (perChannel) { ToSend newToSend = perChannel.messagesQueue.pollFirst(); for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) { if (newToSend.relativeTimeAction <= now) { long size = newToSend.size; trafficCounter.bytesRealWriteFlowControl(size); perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); ctx.write(newToSend.toSend, newToSend.promise); perChannel.lastWriteTimestamp = now; } else { perChannel.messagesQueue.addFirst(newToSend); break; } } if (perChannel.messagesQueue.isEmpty()) { releaseWriteSuspended(ctx); } } ctx.flush(); } @Override public String toString() { return new StringBuilder(340).append(super.toString()) .append(" Write Channel Limit: ").append(writeChannelLimit) .append(" Read Channel Limit: ").append(readChannelLimit).toString(); } }