/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
import io.netty.util.internal.ThrowableUtil;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*/
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
private final Object[] childArgs;
private final int maxChannels;
final Executor executor;
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
private final ChannelException tooManyChannels;
private volatile boolean shuttingDown;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// Inefficient, but works.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
};
Create a new ThreadPerChannelEventLoopGroup
with no limit in place. /**
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
*/
protected ThreadPerChannelEventLoopGroup() {
this(0);
}
Create a new ThreadPerChannelEventLoopGroup
. Params: - maxChannels – the maximum number of channels to handle with this instance. Once you try to register a new
Channel
and the maximum is exceed it will throw an ChannelException
. on the register(Channel)
and register(ChannelPromise)
method. Use 0
to use no limit
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
Create a new ThreadPerChannelEventLoopGroup
. Params: - maxChannels – the maximum number of channels to handle with this instance. Once you try to register a new
Channel
and the maximum is exceed it will throw an ChannelException
on the register(Channel)
and register(ChannelPromise)
method. Use 0
to use no limit - threadFactory – the
ThreadFactory
used to create new Thread
instances that handle the registered Channel
s - args – arguments which will passed to each
newChild(Object...)
call.
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
}
Create a new ThreadPerChannelEventLoopGroup
. Params: - maxChannels – the maximum number of channels to handle with this instance. Once you try to register a new
Channel
and the maximum is exceed it will throw an ChannelException
on the register(Channel)
and register(ChannelPromise)
method. Use 0
to use no limit - executor – the
Executor
used to create new Thread
instances that handle the registered Channel
s - args – arguments which will passed to each
newChild(Object...)
call.
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (executor == null) {
throw new NullPointerException("executor");
}
if (args == null) {
childArgs = EmptyArrays.EMPTY_OBJECTS;
} else {
childArgs = args.clone();
}
this.maxChannels = maxChannels;
this.executor = executor;
tooManyChannels = ThrowableUtil.unknownStackTrace(
new ChannelException("too many channels (max: " + maxChannels + ')'),
ThreadPerChannelEventLoopGroup.class, "nextChild()");
}
Creates a new EventLoop
. The default implementation creates a new ThreadPerChannelEventLoop
. /**
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
*/
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
return new ThreadPerChannelEventLoop(this);
}
@Override
public Iterator<EventExecutor> iterator() {
return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
}
@Override
public EventLoop next() {
throw new UnsupportedOperationException();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
for (EventLoop l: idleChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated
public void shutdown() {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
@Override
public boolean isShuttingDown() {
for (EventLoop l: activeChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
EventLoop l = nextChild();
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;
}
}