/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.traffic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;

import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

This implementation of the AbstractTrafficShapingHandler is for global traffic shaping, that is to say a global limitation of the bandwidth, whatever the number of opened channels.

Note the index used in OutboundBuffer.setUserDefinedWritability(index, boolean) is 2.

The general use should be as follow:

  • Create your unique GlobalTrafficShapingHandler like:

    GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);

    The executor could be the underlying IO worker pool

    pipeline.addLast(myHandler);

    Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created and shared among all channels as the counter must be shared among all channels.

    Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) or the check interval (in millisecond) that represents the delay between two computations of the bandwidth and so the call back of the doAccounting method (0 means no accounting at all).

    A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, it is recommended to set a positive value, even if it is high since the precision of the Traffic Shaping depends on the period where the traffic is computed. The highest the interval, the less precise the traffic shaping will be. It is suggested as higher value something close to 5 or 10 minutes.

    maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.

  • In your handler, you should consider to use the channel.isWritable() and channelWritabilityChanged(ctx) to handle writability, or through future.addListener(new GenericFutureListener()) on the future returned by ctx.write().
  • You shall also consider to have object size in read or write operations relatively adapted to the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.

  • Some configuration methods will be taken as best effort, meaning that all already scheduled traffics will not be changed, but only applied to new traffics.

    So the expected usage of those methods are to be used not too often, accordingly to the traffic shaping configuration.
Be sure to call release() once this handler is not needed anymore to release all internal resources. This will not shutdown the EventExecutor as it may be shared, so you need to do this by your own.
/** * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global * traffic shaping, that is to say a global limitation of the bandwidth, whatever * the number of opened channels.</p> * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p> * * <p>The general use should be as follow:</p> * <ul> * <li><p>Create your unique GlobalTrafficShapingHandler like:</p> * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p> * <p>The executor could be the underlying IO worker pool</p> * <p><tt>pipeline.addLast(myHandler);</tt></p> * * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * and shared among all channels as the counter must be shared among all channels.</b></p> * * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p> * * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close * to 5 or 10 minutes.</p> * * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p> * </li> * <li>In your handler, you should consider to use the {@code channel.isWritable()} and * {@code channelWritabilityChanged(ctx)} to handle writability, or through * {@code future.addListener(new GenericFutureListener())} on the future returned by * {@code ctx.write()}.</li> * <li><p>You shall also consider to have object size in read or write operations relatively adapted to * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li> * <li><p>Some configuration methods will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.</p> * So the expected usage of those methods are to be used not too often, * accordingly to the traffic shaping configuration.</li> * </ul> * * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. */
@Sharable public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
All queues per channel
/** * All queues per channel */
private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
Global queues size
/** * Global queues size */
private final AtomicLong queuesSize = new AtomicLong();
Max size in the list before proposing to stop writing new objects from next handlers for all channel (global)
/** * Max size in the list before proposing to stop writing new objects from next handlers * for all channel (global) */
long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB private static final class PerChannel { ArrayDeque<ToSend> messagesQueue; long queueSize; long lastWriteTimestamp; long lastReadTimestamp; }
Create the global TrafficCounter.
/** * Create the global TrafficCounter. */
void createGlobalTrafficCounter(ScheduledExecutorService executor) { if (executor == null) { throw new NullPointerException("executor"); } TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval); setTrafficCounter(tc); tc.start(); } @Override protected int userDefinedWritabilityIndex() { return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX; }
Create a new instance.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • writeLimit – 0 or a limit in bytes/s
  • readLimit – 0 or a limit in bytes/s
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
  • maxTime – The maximum delay to wait in case of traffic excess.
/** * Create a new instance. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. * @param maxTime * The maximum delay to wait in case of traffic excess. */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, long checkInterval, long maxTime) { super(writeLimit, readLimit, checkInterval, maxTime); createGlobalTrafficCounter(executor); }
Create a new instance using default max time as delay allowed value of 15000 ms.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • writeLimit – 0 or a limit in bytes/s
  • readLimit – 0 or a limit in bytes/s
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
/** * Create a new instance using * default max time as delay allowed value of 15000 ms. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, long checkInterval) { super(writeLimit, readLimit, checkInterval); createGlobalTrafficCounter(executor); }
Create a new instance using default Check Interval value of 1000 ms and default max time as delay allowed value of 15000 ms.
Params:
/** * Create a new instance using default Check Interval value of 1000 ms and * default max time as delay allowed value of 15000 ms. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit) { super(writeLimit, readLimit); createGlobalTrafficCounter(executor); }
Create a new instance using default max time as delay allowed value of 15000 ms and no limit.
Params:
  • executor – the ScheduledExecutorService to use for the TrafficCounter.
  • checkInterval – The delay between two computations of performances for channels or 0 if no stats are to be computed.
/** * Create a new instance using * default max time as delay allowed value of 15000 ms and no limit. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); }
Create a new instance using default Check Interval value of 1000 ms and default max time as delay allowed value of 15000 ms and no limit.
Params:
/** * Create a new instance using default Check Interval value of 1000 ms and * default max time as delay allowed value of 15000 ms and no limit. * * @param executor * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. */
public GlobalTrafficShapingHandler(EventExecutor executor) { createGlobalTrafficCounter(executor); }
Returns:the maxGlobalWriteSize default value being 400 MB.
/** * @return the maxGlobalWriteSize default value being 400 MB. */
public long getMaxGlobalWriteSize() { return maxGlobalWriteSize; }
Note the change will be taken as best effort, meaning that all already scheduled traffics will not be changed, but only applied to new traffics.
So the expected usage of this method is to be used not too often, accordingly to the traffic shaping configuration.
Params:
  • maxGlobalWriteSize – the maximum Global Write Size allowed in the buffer globally for all channels before write suspended is set, default value being 400 MB.
/** * Note the change will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.<br> * So the expected usage of this method is to be used not too often, * accordingly to the traffic shaping configuration. * * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer * globally for all channels before write suspended is set, * default value being 400 MB. */
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) { this.maxGlobalWriteSize = maxGlobalWriteSize; }
Returns:the global size of the buffers for all queues.
/** * @return the global size of the buffers for all queues. */
public long queuesSize() { return queuesSize.get(); }
Release all internal resources of this instance.
/** * Release all internal resources of this instance. */
public final void release() { trafficCounter.stop(); } private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) { // ensure creation is limited to one thread per channel Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel == null) { perChannel = new PerChannel(); perChannel.messagesQueue = new ArrayDeque<ToSend>(); perChannel.queueSize = 0L; perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano(); perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp; channelQueues.put(key, perChannel); } return perChannel; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { getOrSetPerChannel(ctx); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.remove(key); if (perChannel != null) { // write operations need synchronization synchronized (perChannel) { if (channel.isActive()) { for (ToSend toSend : perChannel.messagesQueue) { long size = calculateSize(toSend.toSend); trafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); ctx.write(toSend.toSend, toSend.promise); } } else { queuesSize.addAndGet(-perChannel.queueSize); for (ToSend toSend : perChannel.messagesQueue) { if (toSend.toSend instanceof ByteBuf) { ((ByteBuf) toSend.toSend).release(); } } } perChannel.messagesQueue.clear(); } } releaseWriteSuspended(ctx); releaseReadSuspended(ctx); super.handlerRemoved(ctx); } @Override long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) { Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel != null) { if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) { wait = maxTime; } } return wait; } @Override void informReadOperation(final ChannelHandlerContext ctx, final long now) { Integer key = ctx.channel().hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel != null) { perChannel.lastReadTimestamp = now; } } private static final class ToSend { final long relativeTimeAction; final Object toSend; final long size; final ChannelPromise promise; private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) { relativeTimeAction = delay; this.toSend = toSend; this.size = size; this.promise = promise; } } @Override void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size, final long writedelay, final long now, final ChannelPromise promise) { Channel channel = ctx.channel(); Integer key = channel.hashCode(); PerChannel perChannel = channelQueues.get(key); if (perChannel == null) { // in case write occurs before handlerAdded is raised for this handler // imply a synchronized only if needed perChannel = getOrSetPerChannel(ctx); } final ToSend newToSend; long delay = writedelay; boolean globalSizeExceeded = false; // write operations need synchronization synchronized (perChannel) { if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { trafficCounter.bytesRealWriteFlowControl(size); ctx.write(msg, promise); perChannel.lastWriteTimestamp = now; return; } if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) { delay = maxTime; } newToSend = new ToSend(delay + now, msg, size, promise); perChannel.messagesQueue.addLast(newToSend); perChannel.queueSize += size; queuesSize.addAndGet(size); checkWriteSuspend(ctx, delay, perChannel.queueSize); if (queuesSize.get() > maxGlobalWriteSize) { globalSizeExceeded = true; } } if (globalSizeExceeded) { setUserDefinedWritability(ctx, false); } final long futureNow = newToSend.relativeTimeAction; final PerChannel forSchedule = perChannel; ctx.executor().schedule(new Runnable() { @Override public void run() { sendAllValid(ctx, forSchedule, futureNow); } }, delay, TimeUnit.MILLISECONDS); } private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) { // write operations need synchronization synchronized (perChannel) { ToSend newToSend = perChannel.messagesQueue.pollFirst(); for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) { if (newToSend.relativeTimeAction <= now) { long size = newToSend.size; trafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); ctx.write(newToSend.toSend, newToSend.promise); perChannel.lastWriteTimestamp = now; } else { perChannel.messagesQueue.addFirst(newToSend); break; } } if (perChannel.messagesQueue.isEmpty()) { releaseWriteSuspended(ctx); } } ctx.flush(); } }