package org.glassfish.grizzly.nio;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.AbstractReader;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Context;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.Interceptor;
import org.glassfish.grizzly.ReadResult;
import org.glassfish.grizzly.Reader;
import org.glassfish.grizzly.asyncqueue.AsyncQueueReader;
import org.glassfish.grizzly.asyncqueue.AsyncReadQueueRecord;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
@SuppressWarnings("unchecked")
public abstract class AbstractNIOAsyncQueueReader extends AbstractReader<SocketAddress> implements AsyncQueueReader<SocketAddress> {
private static final Logger LOGGER = Grizzly.logger(AbstractNIOAsyncQueueReader.class);
public static final int DEFAULT_BUFFER_SIZE = 8192;
protected int defaultBufferSize = DEFAULT_BUFFER_SIZE;
protected final NIOTransport transport;
private EOFException cachedEOFException;
public AbstractNIOAsyncQueueReader(NIOTransport transport) {
this.transport = transport;
}
@Override
public void read(final Connection<SocketAddress> connection, Buffer buffer, final CompletionHandler<ReadResult<Buffer, SocketAddress>> completionHandler,
final Interceptor<ReadResult> interceptor) {
if (connection == null) {
failure(new IOException("Connection is null"), completionHandler);
return;
}
if (!connection.isOpen()) {
failure(new IOException("Connection is closed"), completionHandler);
return;
}
final TaskQueue<AsyncReadQueueRecord> connectionQueue = ((NIOConnection) connection).getAsyncReadQueue();
final AsyncReadQueueRecord queueRecord = AsyncReadQueueRecord.create(connection, buffer, completionHandler, interceptor);
final ReadResult<Buffer, SocketAddress> currentResult = queueRecord.getCurrentResult();
final boolean isCurrent = connectionQueue.reserveSpace(1) == 1;
try {
if (isCurrent) {
doRead(connection, queueRecord);
final int interceptInstructions = intercept(Reader.READ_EVENT, queueRecord, currentResult);
if ((interceptInstructions & Interceptor.COMPLETED) != 0 || interceptor == null && queueRecord.isFinished()) {
final boolean isQueueEmpty = connectionQueue.releaseSpaceAndNotify(1) == 0;
queueRecord.notifyComplete();
if (!isQueueEmpty) {
onReadyToRead(connection);
}
intercept(COMPLETE_EVENT, queueRecord, null);
queueRecord.recycle();
} else {
if ((interceptInstructions & Interceptor.RESET) != 0) {
currentResult.setMessage(null);
currentResult.setReadSize(0);
queueRecord.setMessage(null);
}
connectionQueue.setCurrentElement(queueRecord);
queueRecord.notifyIncomplete();
onReadyToRead(connection);
intercept(INCOMPLETE_EVENT, queueRecord, null);
}
} else {
connectionQueue.offer(queueRecord);
if (!connection.isOpen() && connectionQueue.remove(queueRecord)) {
onReadFailure(connection, queueRecord, new EOFException("Connection is closed"));
}
}
} catch (IOException e) {
onReadFailure(connection, queueRecord, e);
}
}
@Override
public final boolean isReady(final Connection connection) {
final TaskQueue connectionQueue = ((NIOConnection) connection).getAsyncReadQueue();
return connectionQueue != null && !connectionQueue.isEmpty();
}
@Override
public AsyncResult processAsync(final Context context) {
final NIOConnection nioConnection = (NIOConnection) context.getConnection();
if (!nioConnection.isOpen()) {
return AsyncResult.COMPLETE;
}
final TaskQueue<AsyncReadQueueRecord> connectionQueue = nioConnection.getAsyncReadQueue();
boolean done = false;
AsyncReadQueueRecord queueRecord = null;
try {
while ((queueRecord = connectionQueue.poll()) != null) {
final ReadResult currentResult = queueRecord.getCurrentResult();
doRead(nioConnection, queueRecord);
final Interceptor<ReadResult> interceptor = queueRecord.getInterceptor();
final int interceptInstructions = intercept(Reader.READ_EVENT, queueRecord, currentResult);
if ((interceptInstructions & Interceptor.COMPLETED) != 0 || interceptor == null && queueRecord.isFinished()) {
if (!context.isManualIOEventControl() && connectionQueue.spaceInBytes() - 1 <= 0) {
context.setManualIOEventControl();
}
done = connectionQueue.releaseSpaceAndNotify(1) == 0;
queueRecord.notifyComplete();
intercept(Reader.COMPLETE_EVENT, queueRecord, null);
queueRecord.recycle();
if (done) {
break;
}
} else {
if ((interceptInstructions & Interceptor.RESET) != 0) {
currentResult.setMessage(null);
currentResult.setReadSize(0);
queueRecord.setMessage(null);
}
connectionQueue.setCurrentElement(queueRecord);
queueRecord.notifyIncomplete();
intercept(Reader.INCOMPLETE_EVENT, queueRecord, null);
return AsyncResult.INCOMPLETE;
}
}
if (!done) {
return AsyncResult.EXPECTING_MORE;
}
} catch (IOException e) {
onReadFailure(nioConnection, queueRecord, e);
} catch (Exception e) {
String message = "Unexpected exception occurred in AsyncQueueReader";
LOGGER.log(Level.SEVERE, message, e);
IOException ioe = new IOException(e.getClass() + ": " + message);
onReadFailure(nioConnection, queueRecord, ioe);
}
return AsyncResult.COMPLETE;
}
@Override
public void onClose(Connection connection) {
final NIOConnection nioConnection = (NIOConnection) connection;
final TaskQueue<AsyncReadQueueRecord> readQueue = nioConnection.getAsyncReadQueue();
if (!readQueue.isEmpty()) {
EOFException error = cachedEOFException;
if (error == null) {
error = new EOFException("Connection closed");
cachedEOFException = error;
}
AsyncReadQueueRecord record;
while ((record = readQueue.poll()) != null) {
record.notifyFailure(error);
}
}
}
@Override
public final void close() {
}
final protected int doRead(final Connection connection, final AsyncReadQueueRecord queueRecord) throws IOException {
final Object message = queueRecord.getMessage();
final Buffer buffer = (Buffer) message;
final ReadResult currentResult = queueRecord.getCurrentResult();
final int readBytes = read0(connection, buffer, currentResult);
if (readBytes == -1) {
throw new EOFException();
}
return readBytes;
}
protected final void onReadFailure(final Connection connection, final AsyncReadQueueRecord failedRecord, final IOException e) {
if (failedRecord != null) {
failedRecord.notifyFailure(e);
}
connection.closeSilently();
}
private static void failure(final Throwable failure, final CompletionHandler<ReadResult<Buffer, SocketAddress>> completionHandler) {
if (completionHandler != null) {
completionHandler.failed(failure);
}
}
private int intercept(final int event, final AsyncReadQueueRecord asyncQueueRecord, final ReadResult currentResult) {
final Interceptor<ReadResult> interceptor = asyncQueueRecord.getInterceptor();
if (interceptor != null) {
return interceptor.intercept(event, asyncQueueRecord, currentResult);
}
return Interceptor.DEFAULT;
}
protected abstract int read0(Connection connection, Buffer buffer, ReadResult<Buffer, SocketAddress> currentResult) throws IOException;
protected abstract void onReadyToRead(Connection connection) throws IOException;
}