/*
 * Copyright 2015 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel.pool;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil;

import java.util.Deque;

import static io.netty.util.internal.ObjectUtil.*;

Simple ChannelPool implementation which will create new Channels if someone tries to acquire a Channel but none is in the pool atm. No limit on the maximal concurrent Channels is enforced. This implementation uses LIFO order for Channels in the ChannelPool.
/** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. * * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. * */
public class SimpleChannelPool implements ChannelPool { private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool"); private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)"); private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque(); private final ChannelPoolHandler handler; private final ChannelHealthChecker healthCheck; private final Bootstrap bootstrap; private final boolean releaseHealthCheck; private final boolean lastRecentUsed;
Creates a new instance using the ChannelHealthChecker.ACTIVE.
Params:
  • bootstrap – the Bootstrap that is used for connections
  • handler – the ChannelPoolHandler that will be notified for the different pool actions
/** * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. * * @param bootstrap the {@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions */
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) { this(bootstrap, handler, ChannelHealthChecker.ACTIVE); }
Creates a new instance.
Params:
/** * Creates a new instance. * * @param bootstrap the {@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is * still healthy when obtain from the {@link ChannelPool} */
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) { this(bootstrap, handler, healthCheck, true); }
Creates a new instance.
Params:
  • bootstrap – the Bootstrap that is used for connections
  • handler – the ChannelPoolHandler that will be notified for the different pool actions
  • healthCheck – the ChannelHealthChecker that will be used to check if a Channel is still healthy when obtain from the ChannelPool
  • releaseHealthCheck – will check channel health before offering back if this parameter set to true; otherwise, channel health is only checked at acquisition time
/** * Creates a new instance. * * @param bootstrap the {@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is * still healthy when obtain from the {@link ChannelPool} * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; * otherwise, channel health is only checked at acquisition time */
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck) { this(bootstrap, handler, healthCheck, releaseHealthCheck, true); }
Creates a new instance.
Params:
  • bootstrap – the Bootstrap that is used for connections
  • handler – the ChannelPoolHandler that will be notified for the different pool actions
  • healthCheck – the ChannelHealthChecker that will be used to check if a Channel is still healthy when obtain from the ChannelPool
  • releaseHealthCheck – will check channel health before offering back if this parameter set to true; otherwise, channel health is only checked at acquisition time
  • lastRecentUsed – true Channel selection will be LIFO, if false FIFO.
/** * Creates a new instance. * * @param bootstrap the {@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is * still healthy when obtain from the {@link ChannelPool} * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; * otherwise, channel health is only checked at acquisition time * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. */
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck, boolean lastRecentUsed) { this.handler = checkNotNull(handler, "handler"); this.healthCheck = checkNotNull(healthCheck, "healthCheck"); this.releaseHealthCheck = releaseHealthCheck; // Clone the original Bootstrap as we want to set our own handler this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); this.bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { assert ch.eventLoop().inEventLoop(); handler.channelCreated(ch); } }); this.lastRecentUsed = lastRecentUsed; }
Returns the Bootstrap this pool will use to open new connections.
Returns:the Bootstrap this pool will use to open new connections
/** * Returns the {@link Bootstrap} this pool will use to open new connections. * * @return the {@link Bootstrap} this pool will use to open new connections */
protected Bootstrap bootstrap() { return bootstrap; }
Returns the ChannelPoolHandler that will be notified for the different pool actions.
Returns:the ChannelPoolHandler that will be notified for the different pool actions
/** * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions. * * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions */
protected ChannelPoolHandler handler() { return handler; }
Returns the ChannelHealthChecker that will be used to check if a Channel is healthy.
Returns:the ChannelHealthChecker that will be used to check if a Channel is healthy
/** * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy. * * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy */
protected ChannelHealthChecker healthChecker() { return healthCheck; }
Indicates whether this pool will check the health of channels before offering them back into the pool.
Returns:true if this pool will check the health of channels before offering them back into the pool, or false if channel health is only checked at acquisition time
/** * Indicates whether this pool will check the health of channels before offering them back into the pool. * * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or * {@code false} if channel health is only checked at acquisition time */
protected boolean releaseHealthCheck() { return releaseHealthCheck; } @Override public final Future<Channel> acquire() { return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { checkNotNull(promise, "promise"); return acquireHealthyFromPoolOrNew(promise); }
Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
Params:
  • promise – the promise to provide acquire result.
Returns:future for acquiring a channel.
/** * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. * @param promise the promise to provide acquire result. * @return future for acquiring a channel. */
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f, promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; } private void notifyConnect(ChannelFuture future, Promise<Channel> promise) { if (future.isSuccess()) { Channel channel = future.channel(); if (!promise.trySuccess(channel)) { // Promise was completed in the meantime (like cancelled), just release the channel again release(channel); } } else { promise.tryFailure(future.cause()); } } private void doHealthCheck(final Channel ch, final Promise<Channel> promise) { assert ch.eventLoop().inEventLoop(); Future<Boolean> f = healthCheck.isHealthy(ch); if (f.isDone()) { notifyHealthCheck(f, ch, promise); } else { f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { notifyHealthCheck(future, ch, promise); } }); } } private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) { assert ch.eventLoop().inEventLoop(); if (future.isSuccess()) { if (future.getNow()) { try { ch.attr(POOL_KEY).set(this); handler.channelAcquired(ch); promise.setSuccess(ch); } catch (Throwable cause) { closeAndFail(ch, cause, promise); } } else { closeChannel(ch); acquireHealthyFromPoolOrNew(promise); } } else { closeChannel(ch); acquireHealthyFromPoolOrNew(promise); } }
Bootstrap a new Channel. The default implementation uses Bootstrap.connect(), sub-classes may override this.

The Bootstrap that is passed in here is cloned via Bootstrap.clone(), so it is safe to modify.

/** * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may * override this. * <p> * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify. */
protected ChannelFuture connectChannel(Bootstrap bs) { return bs.connect(); } @Override public final Future<Void> release(Channel channel) { return release(channel, channel.eventLoop().<Void>newPromise()); } @Override public Future<Void> release(final Channel channel, final Promise<Void> promise) { checkNotNull(channel, "channel"); checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); } }); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; } private void doReleaseChannel(Channel channel, Promise<Void> promise) { assert channel.eventLoop().inEventLoop(); // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. if (channel.attr(POOL_KEY).getAndSet(null) != this) { closeAndFail(channel, // Better include a stacktrace here as this is an user error. new IllegalArgumentException( "Channel " + channel + " was not acquired from this ChannelPool"), promise); } else { try { if (releaseHealthCheck) { doHealthCheckOnRelease(channel, promise); } else { releaseAndOffer(channel, promise); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } } } private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception { final Future<Boolean> f = healthCheck.isHealthy(channel); if (f.isDone()) { releaseAndOfferIfHealthy(channel, promise, f); } else { f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { releaseAndOfferIfHealthy(channel, promise, f); } }); } }
Adds the channel back to the pool only if the channel is healthy.
Params:
  • channel – the channel to put back to the pool
  • promise – offer operation promise.
  • future – the future that contains information fif channel is healthy or not.
Throws:
  • Exception – in case when failed to notify handler about release operation.
/** * Adds the channel back to the pool only if the channel is healthy. * @param channel the channel to put back to the pool * @param promise offer operation promise. * @param future the future that contains information fif channel is healthy or not. * @throws Exception in case when failed to notify handler about release operation. */
private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) throws Exception { if (future.getNow()) { //channel turns out to be healthy, offering and releasing it. releaseAndOffer(channel, promise); } else { //channel not healthy, just releasing it. handler.channelReleased(channel); promise.setSuccess(null); } } private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception { if (offerChannel(channel)) { handler.channelReleased(channel); promise.setSuccess(null); } else { closeAndFail(channel, FULL_EXCEPTION, promise); } } private static void closeChannel(Channel channel) { channel.attr(POOL_KEY).getAndSet(null); channel.close(); } private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) { closeChannel(channel); promise.tryFailure(cause); }
Poll a Channel out of the internal storage to reuse it. This will return null if no Channel is ready to be reused. Sub-classes may override pollChannel() and offerChannel(Channel). Be aware that implementations of these methods needs to be thread-safe!
/** * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no * {@link Channel} is ready to be reused. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */
protected Channel pollChannel() { return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); }
Offer a Channel back to the internal storage. This will return true if the Channel could be added, false otherwise. Sub-classes may override pollChannel() and offerChannel(Channel). Be aware that implementations of these methods needs to be thread-safe!
/** * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} * could be added, {@code false} otherwise. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */
protected boolean offerChannel(Channel channel) { return deque.offer(channel); } @Override public void close() { for (;;) { Channel channel = pollChannel(); if (channel == null) { break; } channel.close(); } } }