package org.glassfish.grizzly.streams;
import java.io.EOFException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.GrizzlyFuture;
import org.glassfish.grizzly.Transformer;
import org.glassfish.grizzly.impl.FutureImpl;
import org.glassfish.grizzly.impl.ReadyFutureImpl;
import org.glassfish.grizzly.impl.SafeFutureImpl;
import org.glassfish.grizzly.utils.CompletionHandlerAdapter;
import org.glassfish.grizzly.utils.ResultAware;
import org.glassfish.grizzly.utils.conditions.Condition;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractStreamReader implements StreamReader {
private static final boolean DEBUG = false;
private static final Logger LOGGER = Grizzly.logger(AbstractStreamReader.class);
protected final Connection connection;
protected final Input input;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
private static void msg(final String msg) {
LOGGER.log(Level.INFO, "READERSTREAM:DEBUG:{0}", msg);
}
private static void displayBuffer(final String str,
final Buffer wrapper) {
msg(str);
msg("\tposition() = " + wrapper.position());
msg("\tlimit() = " + wrapper.limit());
msg("\tcapacity() = " + wrapper.capacity());
}
protected AbstractStreamReader(Connection connection, Input streamInput) {
this.input = streamInput;
this.connection = connection;
}
@Override
public boolean readBoolean() throws IOException {
return readByte() == 1;
}
@Override
public byte readByte() throws IOException {
return input.read();
}
@Override
public char readChar() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 2) {
final char result = buffer.getChar();
buffer.shrink();
return result;
}
}
return (char) ((readByte() & 0xff) << 8 | readByte() & 0xff);
}
@Override
public short readShort() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 2) {
final short result = buffer.getShort();
buffer.shrink();
return result;
}
}
return (short) ((readByte() & 0xff) << 8 | readByte() & 0xff);
}
@Override
public int readInt() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 4) {
final int result = buffer.getInt();
buffer.shrink();
return result;
}
}
return (readShort() & 0xffff) << 16 | readShort() & 0xffff;
}
@Override
public long readLong() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 8) {
final long result = buffer.getLong();
buffer.shrink();
return result;
}
}
return (readInt() & 0xffffffffL) << 32 | readInt() & 0xffffffffL;
}
@Override
final public float readFloat() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 4) {
final float result = buffer.getFloat();
buffer.shrink();
return result;
}
}
return Float.intBitsToFloat(readInt());
}
@Override
final public double readDouble() throws IOException {
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
if (buffer != null && buffer.remaining() >= 8) {
final double result = buffer.getDouble();
buffer.shrink();
return result;
}
}
return Double.longBitsToDouble(readLong());
}
private void arraySizeCheck(final int sizeInBytes) {
if (sizeInBytes > available()) {
throw new BufferUnderflowException();
}
}
@Override
public void readBooleanArray(boolean[] data) throws IOException {
arraySizeCheck(data.length);
for (int ctr = 0; ctr < data.length; ctr++) {
data[ctr] = readBoolean();
}
}
@Override
public void readByteArray(final byte[] data) throws IOException {
readByteArray(data, 0, data.length);
}
@Override
public void readByteArray(byte[] data, int offset, int length) throws IOException {
arraySizeCheck(length);
if (input.isBuffered()) {
final Buffer buffer = input.getBuffer();
buffer.get(data, offset, length);
buffer.shrink();
} else {
for(int i = offset; i < length; i++) {
data[i] = input.read();
}
}
}
@Override
public void readBytes(final Buffer buffer) throws IOException {
if (!buffer.hasRemaining()) {
return;
}
arraySizeCheck(buffer.remaining());
if (input.isBuffered()) {
final Buffer inputBuffer = input.getBuffer();
final int diff = buffer.remaining() - inputBuffer.remaining();
if (diff >= 0) {
buffer.put(inputBuffer);
} else {
final int save = inputBuffer.limit();
inputBuffer.limit(save + diff);
buffer.put(inputBuffer);
inputBuffer.limit(save);
}
inputBuffer.shrink();
} else {
while(buffer.hasRemaining()) {
buffer.put(input.read());
}
}
}
@Override
public void readCharArray(final char[] data) throws IOException {
arraySizeCheck(2 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readChar();
}
}
@Override
public void readShortArray(final short[] data) throws IOException {
arraySizeCheck(2 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readShort();
}
}
@Override
public void readIntArray(final int[] data) throws IOException {
arraySizeCheck(4 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readInt();
}
}
@Override
public void readLongArray(final long[] data) throws IOException {
arraySizeCheck(8 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readLong();
}
}
@Override
public void readFloatArray(final float[] data) throws IOException {
arraySizeCheck(4 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readFloat();
}
}
@Override
public void readDoubleArray(final double[] data) throws IOException {
arraySizeCheck(8 * data.length);
for (int i = 0; i < data.length; i++) {
data[i] = readDouble();
}
}
@Override
public void skip(int length) {
input.skip(length);
}
@Override
public <E> GrizzlyFuture<E> decode(Transformer<Stream, E> decoder) {
return decode(decoder, null);
}
@Override
public <E> GrizzlyFuture<E> decode(Transformer<Stream, E> decoder, CompletionHandler<E> completionHandler) {
final FutureImpl<E> future = SafeFutureImpl.create();
final DecodeCompletionHandler<E, Integer> completionHandlerWrapper =
new DecodeCompletionHandler<E, Integer>(future, completionHandler);
notifyCondition(
new StreamDecodeCondition<E>(this, decoder, completionHandlerWrapper),
completionHandlerWrapper);
return future;
}
@Override
public GrizzlyFuture<Integer> notifyAvailable(int size) {
return notifyAvailable(size, null);
}
@Override
public GrizzlyFuture<Integer> notifyAvailable(final int size,
CompletionHandler<Integer> completionHandler) {
return notifyCondition(new Condition() {
@Override
public boolean check() {
return available() >= size;
}
}, completionHandler);
}
@Override
public GrizzlyFuture<Integer> notifyCondition(Condition condition) {
return notifyCondition(condition, null);
}
@Override
public synchronized GrizzlyFuture<Integer> notifyCondition(
final Condition condition,
final CompletionHandler<Integer> completionHandler) {
if (isClosed()) {
EOFException exception = new EOFException();
if (completionHandler != null) {
completionHandler.failed(exception);
}
return ReadyFutureImpl.create(exception);
}
return input.notifyCondition(condition, completionHandler);
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
if (input != null) {
try {
input.close();
} catch (IOException ignored) {
}
}
}
}
@Override
public boolean isClosed() {
return isClosed.get();
}
@Override
public final boolean hasAvailable() {
return available() > 0;
}
@Override
public int available() {
return input.size();
}
@Override
public boolean isSupportBufferWindow() {
return input.isBuffered();
}
@Override
public Buffer getBufferWindow() {
return input.getBuffer();
}
@Override
public Buffer takeBufferWindow() {
return input.takeBuffer();
}
@Override
public Connection getConnection() {
return connection;
}
private static class DecodeCompletionHandler<A, B> extends CompletionHandlerAdapter<A, B>
implements ResultAware<A> {
private volatile A result;
public DecodeCompletionHandler(FutureImpl<A> future,
CompletionHandler<A> completionHandler) {
super(future, completionHandler);
}
@Override
public void setResult(A result) {
this.result = result;
}
@Override
protected A adapt(B result) {
return this.result;
}
}
}