/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel.oio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;

import java.io.IOException;

Abstract base class for OIO which reads and writes bytes from/to a Socket
/** * Abstract base class for OIO which reads and writes bytes from/to a Socket */
public abstract class AbstractOioByteChannel extends AbstractOioChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final String EXPECTED_TYPES = " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(FileRegion.class) + ')';
See Also:
  • AbstractOioByteChannel.AbstractOioByteChannel(Channel)
/** * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) */
protected AbstractOioByteChannel(Channel parent) { super(parent); } @Override public ChannelMetadata metadata() { return METADATA; }
Determine if the input side of this channel is shutdown.
Returns:true if the input side of this channel is shutdown.
/** * Determine if the input side of this channel is shutdown. * @return {@code true} if the input side of this channel is shutdown. */
protected abstract boolean isInputShutdown();
Shutdown the input side of this channel.
Returns:A channel future that will complete when the shutdown is complete.
/** * Shutdown the input side of this channel. * @return A channel future that will complete when the shutdown is complete. */
protected abstract ChannelFuture shutdownInput(); private void closeOnRead(ChannelPipeline pipeline) { if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { shutdownInput(); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { unsafe().close(unsafe().voidPromise()); } pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); } } private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } } allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { closeOnRead(pipeline); } } @Override protected void doRead() { final ChannelConfig config = config(); if (isInputShutdown() || !readPending) { // We have to check readPending here because the Runnable to read could have been scheduled and later // during the same read loop readPending was set to false. return; } // In OIO we should set readPending to false even if the read was not successful so we can schedule // another read on the event loop if no reads are done. readPending = false; final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; boolean readData = false; try { byteBuf = allocHandle.allocate(allocator); do { allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { if (!byteBuf.isReadable()) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } } break; } else { readData = true; } final int available = available(); if (available <= 0) { break; } // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline. if (!byteBuf.isWritable()) { final int capacity = byteBuf.capacity(); final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = allocHandle.allocate(allocator); } else { final int writerIndex = byteBuf.writerIndex(); if (writerIndex + available > maxCapacity) { byteBuf.capacity(maxCapacity); } else { byteBuf.ensureWritable(available); } } } } while (allocHandle.continueReading()); if (byteBuf != null) { // It is possible we allocated a buffer because the previous one was not writable, but then didn't use // it because allocHandle.continueReading() returned false. if (byteBuf.isReadable()) { readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } byteBuf = null; } if (readData) { allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (readPending || config.isAutoRead() || !readData && isActive()) { // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we // should execute read() again because no data may have been read. read(); } } } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { Object msg = in.current(); if (msg == null) { // nothing left to write break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); while (readableBytes > 0) { doWriteBytes(buf); int newReadableBytes = buf.readableBytes(); in.progress(readableBytes - newReadableBytes); readableBytes = newReadableBytes; } in.remove(); } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; long transferred = region.transferred(); doWriteFileRegion(region); in.progress(region.transferred() - transferred); in.remove(); } else { in.remove(new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg))); } } } @Override protected final Object filterOutboundMessage(Object msg) throws Exception { if (msg instanceof ByteBuf || msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
Return the number of bytes ready to read from the underlying Socket.
/** * Return the number of bytes ready to read from the underlying Socket. */
protected abstract int available();
Read bytes from the underlying Socket.
Params:
  • buf – the ByteBuf into which the read bytes will be written
Throws:
Returns:amount the number of bytes read. This may return a negative amount if the underlying Socket was closed
/** * Read bytes from the underlying Socket. * * @param buf the {@link ByteBuf} into which the read bytes will be written * @return amount the number of bytes read. This may return a negative amount if the underlying * Socket was closed * @throws Exception is thrown if an error occurred */
protected abstract int doReadBytes(ByteBuf buf) throws Exception;
Write the data which is hold by the ByteBuf to the underlying Socket.
Params:
  • buf – the ByteBuf which holds the data to transfer
Throws:
/** * Write the data which is hold by the {@link ByteBuf} to the underlying Socket. * * @param buf the {@link ByteBuf} which holds the data to transfer * @throws Exception is thrown if an error occurred */
protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
Write the data which is hold by the FileRegion to the underlying Socket.
Params:
  • region – the FileRegion which holds the data to transfer
Throws:
/** * Write the data which is hold by the {@link FileRegion} to the underlying Socket. * * @param region the {@link FileRegion} which holds the data to transfer * @throws Exception is thrown if an error occurred */
protected abstract void doWriteFileRegion(FileRegion region) throws Exception; }