/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Abstract base class for Channel implementations which use a Selector based approach.
/** * Abstract base class for {@link Channel} implementations which use a Selector based approach. */
public abstract class AbstractNioChannel extends AbstractChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioChannel.class); private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractNioChannel.class, "doClose()"); private final SelectableChannel ch; protected final int readInterestOp; volatile SelectionKey selectionKey; boolean readPending; private final Runnable clearReadPendingRunnable = new Runnable() { @Override public void run() { clearReadPending0(); } };
The future of the current connection attempt. If not null, subsequent connection attempts will fail.
/** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */
private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress;
Create a new instance
Params:
  • parent – the parent Channel by which this instance was created. May be null
  • ch – the underlying SelectableChannel on which it operates
  • readInterestOp – the ops to set to receive data from the SelectableChannel
/** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } @Override public boolean isOpen() { return ch.isOpen(); } @Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); } protected SelectableChannel javaChannel() { return ch; } @Override public NioEventLoop eventLoop() { return (NioEventLoop) super.eventLoop(); }
Return the current SelectionKey
/** * Return the current {@link SelectionKey} */
protected SelectionKey selectionKey() { assert selectionKey != null; return selectionKey; }
Deprecated:No longer supported. No longer supported.
/** * @deprecated No longer supported. * No longer supported. */
@Deprecated protected boolean isReadPending() { return readPending; }
Deprecated:Use clearReadPending() if appropriate instead. No longer supported.
/** * @deprecated Use {@link #clearReadPending()} if appropriate instead. * No longer supported. */
@Deprecated protected void setReadPending(final boolean readPending) { if (isRegistered()) { EventLoop eventLoop = eventLoop(); if (eventLoop.inEventLoop()) { setReadPending0(readPending); } else { eventLoop.execute(new Runnable() { @Override public void run() { setReadPending0(readPending); } }); } } else { // Best effort if we are not registered yet clear readPending. // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is // not set yet so it would produce an assertion failure. this.readPending = readPending; } }
Set read pending to false.
/** * Set read pending to {@code false}. */
protected final void clearReadPending() { if (isRegistered()) { EventLoop eventLoop = eventLoop(); if (eventLoop.inEventLoop()) { clearReadPending0(); } else { eventLoop.execute(clearReadPendingRunnable); } } else { // Best effort if we are not registered yet clear readPending. This happens during channel initialization. // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is // not set yet so it would produce an assertion failure. readPending = false; } } private void setReadPending0(boolean readPending) { this.readPending = readPending; if (!readPending) { ((AbstractNioUnsafe) unsafe()).removeReadOp(); } } private void clearReadPending0() { readPending = false; ((AbstractNioUnsafe) unsafe()).removeReadOp(); }
Special Unsafe sub-type which allows to access the underlying SelectableChannel
/** * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} */
public interface NioUnsafe extends Unsafe {
Return underlying SelectableChannel
/** * Return underlying {@link SelectableChannel} */
SelectableChannel ch();
Finish connect
/** * Finish connect */
void finishConnect();
Read from underlying SelectableChannel
/** * Read from underlying {@link SelectableChannel} */
void read(); void forceFlush(); } protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { protected final void removeReadOp() { SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } int interestOps = key.interestOps(); if ((interestOps & readInterestOp) != 0) { // only remove readInterestOp if needed key.interestOps(interestOps & ~readInterestOp); } } @Override public final SelectableChannel ch() { return javaChannel(); } @Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } } private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. boolean active = isActive(); // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } } private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // Use tryFailure() instead of setFailure() to avoid the race against cancel(). promise.tryFailure(cause); closeIfClosed(); } @Override public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } } @Override protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (!isFlushPending()) { super.flush0(); } } @Override public final void forceFlush() { // directly call super.flush0() to force a flush now super.flush0(); } private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; } } @Override protected boolean isCompatible(EventLoop loop) { return loop instanceof NioEventLoop; } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } @Override protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey()); } @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
Connect to the remote peer
/** * Connect to the remote peer */
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
Finish the connect
/** * Finish the connect */
protected abstract void doFinishConnect() throws Exception;
Returns an off-heap copy of the specified ByteBuf, and releases the original one. Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high, but just returns the original ByteBuf..
/** * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one. * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high, * but just returns the original {@link ByteBuf}.. */
protected final ByteBuf newDirectBuffer(ByteBuf buf) { final int readableBytes = buf.readableBytes(); if (readableBytes == 0) { ReferenceCountUtil.safeRelease(buf); return Unpooled.EMPTY_BUFFER; } final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null) { directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } // Allocating and deallocating an unpooled direct buffer is very expensive; give up. return buf; }
Returns an off-heap copy of the specified ByteBuf, and releases the specified holder. The caller must ensure that the holder releases the original ByteBuf when the holder is released by this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high, but just returns the original ByteBuf..
/** * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder. * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by * this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is * too high, but just returns the original {@link ByteBuf}.. */
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) { final int readableBytes = buf.readableBytes(); if (readableBytes == 0) { ReferenceCountUtil.safeRelease(holder); return Unpooled.EMPTY_BUFFER; } final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(holder); return directBuf; } final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null) { directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(holder); return directBuf; } // Allocating and deallocating an unpooled direct buffer is very expensive; give up. if (holder != buf) { // Ensure to call holder.release() to give the holder a chance to release other resources than its content. buf.retain(); ReferenceCountUtil.safeRelease(holder); } return buf; } @Override protected void doClose() throws Exception { ChannelPromise promise = connectPromise; if (promise != null) { // Use tryFailure() instead of setFailure() to avoid the race against cancel(). promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); connectPromise = null; } ScheduledFuture<?> future = connectTimeoutFuture; if (future != null) { future.cancel(false); connectTimeoutFuture = null; } } }