/*
 * Copyright 2016 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version
 * 2.0 (the "License"); you may not use this file except in compliance with the
 * License. You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package io.netty.handler.flow;

import java.util.ArrayDeque;
import java.util.Queue;

import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

The FlowControlHandler ensures that only one message per read() is sent downstream. Classes such as ByteToMessageDecoder or MessageToByteEncoder are free to emit as many events as they like for any given input. A channel's auto reading configuration doesn't usually apply in these scenarios. This is causing problems in downstream ChannelHandlers that would like to hold subsequent events while they're processing one event. It's a common problem with the HttpObjectDecoder that will very often fire a HttpRequest that is immediately followed by a LastHttpContent event.

ChannelPipeline pipeline = ...;
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new FlowControlHandler());
pipeline.addLast(new MyExampleHandler());
class MyExampleHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
      ctx.channel().config().setAutoRead(false);
      // The FlowControlHandler will hold any subsequent events that
      // were emitted by HttpObjectDecoder until auto reading is turned
      // back on or Channel#read() is being called.
    }
  }
 }
See Also:
/** * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream. * * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as * many events as they like for any given input. A channel's auto reading configuration doesn't usually * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would * like to hold subsequent events while they're processing one event. It's a common problem with the * {@code HttpObjectDecoder} that will very often fire a {@code HttpRequest} that is immediately followed * by a {@code LastHttpContent} event. * * <pre>{@code * ChannelPipeline pipeline = ...; * * pipeline.addLast(new HttpServerCodec()); * pipeline.addLast(new FlowControlHandler()); * * pipeline.addLast(new MyExampleHandler()); * * class MyExampleHandler extends ChannelInboundHandlerAdapter { * @Override * public void channelRead(ChannelHandlerContext ctx, Object msg) { * if (msg instanceof HttpRequest) { * ctx.channel().config().setAutoRead(false); * * // The FlowControlHandler will hold any subsequent events that * // were emitted by HttpObjectDecoder until auto reading is turned * // back on or Channel#read() is being called. * } * } * } * }</pre> * * @see ChannelConfig#setAutoRead(boolean) */
public class FlowControlHandler extends ChannelDuplexHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class); private final boolean releaseMessages; private RecyclableArrayDeque queue; private ChannelConfig config; private boolean shouldConsume; public FlowControlHandler() { this(true); } public FlowControlHandler(boolean releaseMessages) { this.releaseMessages = releaseMessages; }
Determine if the underlying Queue is empty. This method exists for testing, debugging and inspection purposes and it is not Thread safe!
/** * Determine if the underlying {@link Queue} is empty. This method exists for * testing, debugging and inspection purposes and it is not Thread safe! */
boolean isQueueEmpty() { return queue.isEmpty(); }
Releases all messages and destroys the Queue.
/** * Releases all messages and destroys the {@link Queue}. */
private void destroy() { if (queue != null) { if (!queue.isEmpty()) { logger.trace("Non-empty queue: {}", queue); if (releaseMessages) { Object msg; while ((msg = queue.poll()) != null) { ReferenceCountUtil.safeRelease(msg); } } } queue.recycle(); queue = null; } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { config = ctx.channel().config(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { destroy(); ctx.fireChannelInactive(); } @Override public void read(ChannelHandlerContext ctx) throws Exception { if (dequeue(ctx, 1) == 0) { // It seems no messages were consumed. We need to read() some // messages from upstream and once one arrives it need to be // relayed to downstream to keep the flow going. shouldConsume = true; ctx.read(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (queue == null) { queue = RecyclableArrayDeque.newInstance(); } queue.offer(msg); // We just received one message. Do we need to relay it regardless // of the auto reading configuration? The answer is yes if this // method was called as a result of a prior read() call. int minConsume = shouldConsume ? 1 : 0; shouldConsume = false; dequeue(ctx, minConsume); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // Don't relay completion events from upstream as they // make no sense in this context. See dequeue() where // a new set of completion events is being produced. }
Dequeues one or many (or none) messages depending on the channel's auto reading state and returns the number of messages that were consumed from the internal queue. The minConsume argument is used to force dequeue() into consuming that number of messages regardless of the channel's auto reading configuration.
See Also:
/** * Dequeues one or many (or none) messages depending on the channel's auto * reading state and returns the number of messages that were consumed from * the internal queue. * * The {@code minConsume} argument is used to force {@code dequeue()} into * consuming that number of messages regardless of the channel's auto * reading configuration. * * @see #read(ChannelHandlerContext) * @see #channelRead(ChannelHandlerContext, Object) */
private int dequeue(ChannelHandlerContext ctx, int minConsume) { if (queue != null) { int consumed = 0; Object msg; while ((consumed < minConsume) || config.isAutoRead()) { msg = queue.poll(); if (msg == null) { break; } ++consumed; ctx.fireChannelRead(msg); } // We're firing a completion event every time one (or more) // messages were consumed and the queue ended up being drained // to an empty state. if (queue.isEmpty() && consumed > 0) { ctx.fireChannelReadComplete(); } return consumed; } return 0; }
A recyclable ArrayDeque.
/** * A recyclable {@link ArrayDeque}. */
private static final class RecyclableArrayDeque extends ArrayDeque<Object> { private static final long serialVersionUID = 0L;
A value of 2 should be a good choice for most scenarios.
/** * A value of {@code 2} should be a good choice for most scenarios. */
private static final int DEFAULT_NUM_ELEMENTS = 2; private static final Recycler<RecyclableArrayDeque> RECYCLER = new Recycler<RecyclableArrayDeque>() { @Override protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) { return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle); } }; public static RecyclableArrayDeque newInstance() { return RECYCLER.get(); } private final Handle<RecyclableArrayDeque> handle; private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) { super(numElements); this.handle = handle; } public void recycle() { clear(); handle.recycle(this); } } }