package org.glassfish.grizzly.nio.transport;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.memory.BufferArray;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.ByteBufferArray;
import org.glassfish.grizzly.memory.CompositeBuffer;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.nio.DirectByteBufferRecord;
import org.glassfish.grizzly.utils.Exceptions;
public class TCPNIOUtils {
static final Logger LOGGER = TCPNIOTransport.LOGGER;
public static int writeCompositeBuffer(final TCPNIOConnection connection,
final CompositeBuffer buffer) throws IOException {
final int bufferSize = calcWriteBufferSize(connection, buffer.remaining());
final int oldPos = buffer.position();
final int oldLim = buffer.limit();
buffer.limit(oldPos + bufferSize);
final SocketChannel socketChannel = (SocketChannel) connection.getChannel();
int written = 0;
final BufferArray bufferArray = buffer.toBufferArray();
final DirectByteBufferRecord ioRecord = DirectByteBufferRecord.get();
try {
fill(bufferArray, bufferSize, ioRecord);
ioRecord.finishBufferSlice();
final int arraySize = ioRecord.getArraySize();
written = arraySize != 1
? flushByteBuffers(socketChannel, ioRecord.getArray(), 0, arraySize)
: flushByteBuffer(socketChannel, ioRecord.getArray()[0]);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "TCPNIOConnection ({0}) (composite) write {1} bytes", new Object[]{
connection, written
});
}
} finally {
ioRecord.release();
bufferArray.restore();
bufferArray.recycle();
}
Buffers.setPositionLimit(buffer, oldPos + written, oldLim);
return written;
}
public static int writeSimpleBuffer(final TCPNIOConnection connection,
final Buffer buffer) throws IOException {
final SocketChannel socketChannel = (SocketChannel) connection.getChannel();
final int oldPos = buffer.position();
final int oldLim = buffer.limit();
final int written;
if (buffer.isDirect()) {
final ByteBuffer directByteBuffer = buffer.toByteBuffer();
final int pos = directByteBuffer.position();
try {
written = flushByteBuffer(socketChannel, directByteBuffer);
} finally {
directByteBuffer.position(pos);
}
} else {
final int bufferSize = calcWriteBufferSize(connection, buffer.remaining());
buffer.limit(oldPos + bufferSize);
final DirectByteBufferRecord ioRecord =
DirectByteBufferRecord.get();
final ByteBuffer directByteBuffer = ioRecord.allocate(bufferSize);
fill(buffer, bufferSize, directByteBuffer);
try {
written = flushByteBuffer(socketChannel, directByteBuffer);
} finally {
ioRecord.release();
}
}
Buffers.setPositionLimit(buffer, oldPos + written, oldLim);
if(LOGGER.isLoggable(Level.FINE))
LOGGER.log(Level.FINE, "TCPNIOConnection ({0}) (plain) write {1} bytes", new Object[] {
connection, written
});
return written;
}
public static int flushByteBuffer(final SocketChannel channel,
final ByteBuffer byteBuffer) throws IOException {
return channel.write(byteBuffer);
}
public static int flushByteBuffers(final SocketChannel channel,
final ByteBuffer byteBuffer[], final int firstBufferOffest,
final int numberOfBuffers) throws IOException {
return (int) channel.write(byteBuffer, firstBufferOffest, numberOfBuffers);
}
private static void fill(Buffer src, int size, ByteBuffer dstByteBuffer)
{
dstByteBuffer.limit(size);
int oldPos = src.position();
src.get(dstByteBuffer);
dstByteBuffer.position(0);
src.position(oldPos);
}
static void fill(final BufferArray bufferArray,
final int totalBufferSize, final DirectByteBufferRecord ioRecord) {
final Buffer buffers[] = bufferArray.getArray();
final int size = bufferArray.size();
int totalRemaining = totalBufferSize;
for (int i = 0; i < size; i++) {
final Buffer buffer = buffers[i];
assert !buffer.isComposite();
final int bufferSize = buffer.remaining();
if (bufferSize == 0) {
continue;
}
if (buffer.isDirect()) {
ioRecord.finishBufferSlice();
ioRecord.putToArray(buffer.toByteBuffer());
} else {
ByteBuffer currentDirectBufferSlice = ioRecord.getDirectBufferSlice();
if (currentDirectBufferSlice == null) {
final ByteBuffer directByteBuffer = ioRecord.getDirectBuffer();
if (directByteBuffer == null) {
ioRecord.allocate(totalRemaining);
}
currentDirectBufferSlice = ioRecord.sliceBuffer();
}
final int oldLim = currentDirectBufferSlice.limit();
currentDirectBufferSlice.limit(currentDirectBufferSlice.position() + bufferSize);
buffer.get(currentDirectBufferSlice);
currentDirectBufferSlice.limit(oldLim);
}
totalRemaining -= bufferSize;
}
}
private static int calcWriteBufferSize(final TCPNIOConnection connection,
final int bufferSize) {
return Math.min(TCPNIOTransport.MAX_SEND_BUFFER_SIZE,
Math.min(bufferSize, (connection.getWriteBufferSize() * 3) / 2));
}
public static Buffer allocateAndReadBuffer(final TCPNIOConnection connection)
throws IOException {
final MemoryManager memoryManager = connection.getMemoryManager();
int read;
Throwable error = null;
Buffer buffer = null;
try {
final int receiveBufferSize =
Math.min(TCPNIOTransport.MAX_RECEIVE_BUFFER_SIZE,
connection.getReadBufferSize());
if (!memoryManager.willAllocateDirect(receiveBufferSize)) {
final DirectByteBufferRecord ioRecord =
DirectByteBufferRecord.get();
final ByteBuffer directByteBuffer =
ioRecord.allocate(receiveBufferSize);
try {
read = readSimpleByteBuffer(connection, directByteBuffer);
if (read > 0) {
directByteBuffer.flip();
buffer = memoryManager.allocate(read);
buffer.put(directByteBuffer);
}
} finally {
ioRecord.release();
}
} else {
buffer = memoryManager.allocateAtLeast(receiveBufferSize);
read = readBuffer(connection, buffer);
}
} catch (Throwable e) {
error = e;
read = -1;
}
if (read > 0) {
buffer.position(read);
buffer.allowBufferDispose(true);
} else {
if (buffer != null) {
buffer.dispose();
}
if (read < 0) {
throw error != null
? Exceptions.makeIOException(error)
: new EOFException();
}
buffer = Buffers.EMPTY_BUFFER;
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "TCPNIOConnection ({0}) (allocated) read {1} bytes", new Object[]{
connection, read
});
}
return buffer;
}
public static int readBuffer(final TCPNIOConnection connection,
final Buffer buffer) throws IOException {
return buffer.isComposite()
? readCompositeBuffer(connection, (CompositeBuffer) buffer)
: readSimpleBuffer(connection, buffer);
}
public static int readCompositeBuffer(final TCPNIOConnection connection,
final CompositeBuffer buffer) throws IOException {
final SocketChannel socketChannel = (SocketChannel) connection.getChannel();
final int oldPos = buffer.position();
final ByteBufferArray array = buffer.toByteBufferArray();
final ByteBuffer byteBuffers[] = array.getArray();
final int size = array.size();
final int read = (int) socketChannel.read(byteBuffers, 0, size);
array.restore();
array.recycle();
if (read > 0) {
buffer.position(oldPos + read);
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "TCPNIOConnection ({0}) (nonallocated, composite) read {1} bytes", new Object[]{
connection, read
});
}
return read;
}
public static int readSimpleBuffer(final TCPNIOConnection connection,
final Buffer buffer) throws IOException {
final SocketChannel socketChannel = (SocketChannel) connection.getChannel();
final int oldPos = buffer.position();
final ByteBuffer byteBuffer = buffer.toByteBuffer();
final int bbOldPos = byteBuffer.position();
final int read = socketChannel.read(byteBuffer);
if (read > 0) {
byteBuffer.position(bbOldPos);
buffer.position(oldPos + read);
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "TCPNIOConnection ({0}) (nonallocated, simple) read {1} bytes", new Object[]{
connection, read
});
}
return read;
}
private static int readSimpleByteBuffer(final TCPNIOConnection tcpConnection,
final ByteBuffer byteBuffer) throws IOException {
final SocketChannel socketChannel = (SocketChannel) tcpConnection.getChannel();
return socketChannel.read(byteBuffer);
}
}