/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
This implementation of the AbstractTrafficShapingHandler
is for channel traffic shaping, that is to say a per channel limitation of the bandwidth.
Note the index used in OutboundBuffer.setUserDefinedWritability(index, boolean)
is 1.
The general use should be as follow:
Add in your pipeline a new ChannelTrafficShapingHandler.
ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();
pipeline.addLast(myHandler);
Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels..
Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
or the check interval (in millisecond) that represents the delay between two computations of the
bandwidth and so the call back of the doAccounting method (0 means no accounting at all).
A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
it is recommended to set a positive value, even if it is high since the precision of the
Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
the less precise the traffic shaping will be. It is suggested as higher value something close
to 5 or 10 minutes.
maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
- In your handler, you should consider to use the
channel.isWritable()
and channelWritabilityChanged(ctx)
to handle writability, or through future.addListener(new GenericFutureListener())
on the future returned by ctx.write()
.
You shall also consider to have object size in read or write operations relatively adapted to
the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.
Some configuration methods will be taken as best effort, meaning
that all already scheduled traffics will not be
changed, but only applied to new traffics.
So the expected usage of those methods are to be used not too often,
accordingly to the traffic shaping configuration.
/**
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
* <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
*
* <p>The general use should be as follow:</p>
* <ul>
* <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
* <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
* <p><tt>pipeline.addLast(myHandler);</tt></p>
*
* <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
* for each new channel as the counter cannot be shared among all channels.</b>.</p>
*
* <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
*
* <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.</p>
*
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
* {@code future.addListener(new GenericFutureListener())} on the future returned by
* {@code ctx.write()}.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
* <li><p>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</p></li>
* </ul>
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
private long queueSize;
Create a new instance.
Params: - writeLimit –
0 or a limit in bytes/s
- readLimit –
0 or a limit in bytes/s
- checkInterval –
The delay between two computations of performances for
channels or 0 if no stats are to be computed.
- maxTime –
The maximum delay to wait in case of traffic excess.
/**
* Create a new instance.
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
}
Create a new instance using default
max time as delay allowed value of 15000 ms.
Params: - writeLimit –
0 or a limit in bytes/s
- readLimit –
0 or a limit in bytes/s
- checkInterval –
The delay between two computations of performances for
channels or 0 if no stats are to be computed.
/**
* Create a new instance using default
* max time as delay allowed value of 15000 ms.
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public ChannelTrafficShapingHandler(long writeLimit,
long readLimit, long checkInterval) {
super(writeLimit, readLimit, checkInterval);
}
Create a new instance using default Check Interval value of 1000 ms and
max time as delay allowed value of 15000 ms.
Params: - writeLimit –
0 or a limit in bytes/s
- readLimit –
0 or a limit in bytes/s
/**
* Create a new instance using default Check Interval value of 1000 ms and
* max time as delay allowed value of 15000 ms.
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public ChannelTrafficShapingHandler(long writeLimit,
long readLimit) {
super(writeLimit, readLimit);
}
Create a new instance using
default max time as delay allowed value of 15000 ms and no limit.
Params: - checkInterval –
The delay between two computations of performances for
channels or 0 if no stats are to be computed.
/**
* Create a new instance using
* default max time as delay allowed value of 15000 ms and no limit.
*
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public ChannelTrafficShapingHandler(long checkInterval) {
super(checkInterval);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
ctx.channel().hashCode(), checkInterval);
setTrafficCounter(trafficCounter);
trafficCounter.start();
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
trafficCounter.stop();
// write order control
synchronized (this) {
if (ctx.channel().isActive()) {
for (ToSend toSend : messagesQueue) {
long size = calculateSize(toSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(toSend.toSend, toSend.promise);
}
} else {
for (ToSend toSend : messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
}
messagesQueue.clear();
}
releaseWriteSuspended(ctx);
releaseReadSuspended(ctx);
super.handlerRemoved(ctx);
}
private static final class ToSend {
final long relativeTimeAction;
final Object toSend;
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
relativeTimeAction = delay;
this.toSend = toSend;
this.promise = promise;
}
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// write order control
synchronized (this) {
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
// write order control
synchronized (this) {
ToSend newToSend = messagesQueue.pollFirst();
for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
if (newToSend.relativeTimeAction <= now) {
long size = calculateSize(newToSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(newToSend.toSend, newToSend.promise);
} else {
messagesQueue.addFirst(newToSend);
break;
}
}
if (messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
ctx.flush();
}
Returns: current size in bytes of the write buffer.
/**
* @return current size in bytes of the write buffer.
*/
public long queueSize() {
return queueSize;
}
}