package org.xnio.channels;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import org.xnio.Buffers;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.Option;
import org.xnio.Pooled;
import org.xnio.XnioExecutor;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
public final class PushBackStreamChannel implements StreamSourceChannel, WrappedChannel<StreamSourceChannel> {
private final StreamSourceChannel firstChannel;
private StreamSourceChannel channel;
private ChannelListener<? super PushBackStreamChannel> readListener;
private ChannelListener<? super PushBackStreamChannel> closeListener;
public PushBackStreamChannel(final StreamSourceChannel channel) {
this.channel = firstChannel = channel;
firstChannel.getReadSetter().set(new ChannelListener<StreamSourceChannel>() {
public void handleEvent(final StreamSourceChannel channel) {
ChannelListeners.invokeChannelListener(PushBackStreamChannel.this, readListener);
}
});
firstChannel.getCloseSetter().set(new ChannelListener<StreamSourceChannel>() {
public void handleEvent(final StreamSourceChannel channel) {
ChannelListeners.invokeChannelListener(PushBackStreamChannel.this, closeListener);
}
});
}
public void setReadListener(final ChannelListener<? super PushBackStreamChannel> readListener) {
this.readListener = readListener;
}
public void setCloseListener(final ChannelListener<? super PushBackStreamChannel> closeListener) {
this.closeListener = closeListener;
}
public ChannelListener.Setter<? extends PushBackStreamChannel> getReadSetter() {
return new ChannelListener.Setter<PushBackStreamChannel>() {
public void set(final ChannelListener<? super PushBackStreamChannel> listener) {
setReadListener(listener);
}
};
}
public ChannelListener.Setter<? extends PushBackStreamChannel> getCloseSetter() {
return new ChannelListener.Setter<PushBackStreamChannel>() {
public void set(final ChannelListener<? super PushBackStreamChannel> listener) {
setCloseListener(listener);
}
};
}
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel == null) {
return 0;
}
return channel.transferTo(position, count, target);
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel == null) {
return -1L;
}
return channel.transferTo(count, throughBuffer, target);
}
public int read(final ByteBuffer dst) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel == null) {
return -1;
}
return channel.read(dst);
}
public long read(final ByteBuffer[] dsts) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel == null) {
return -1L;
}
return channel.read(dsts);
}
public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel == null) {
return -1L;
}
return channel.read(dsts, offset, length);
}
public void unget(Pooled<ByteBuffer> buffer) {
StreamSourceChannel old;
old = channel;
if (old == null) {
buffer.free();
return;
}
channel = new BufferHolder(old, buffer);
}
public void suspendReads() {
firstChannel.suspendReads();
}
public void resumeReads() {
final StreamSourceChannel channel = this.channel;
if (channel != null) {
channel.resumeReads();
}
}
public boolean isReadResumed() {
return firstChannel.isReadResumed();
}
public void wakeupReads() {
firstChannel.wakeupReads();
}
public void shutdownReads() throws IOException {
final StreamSourceChannel old = channel;
if (old != null) {
channel = null;
old.shutdownReads();
}
}
public void awaitReadable() throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel != null) {
channel.awaitReadable();
}
}
public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
final StreamSourceChannel channel = this.channel;
if (channel != null) {
channel.awaitReadable(time, timeUnit);
}
}
@Deprecated
public XnioExecutor getReadThread() {
return firstChannel.getReadThread();
}
public XnioIoThread getIoThread() {
return firstChannel.getIoThread();
}
public XnioWorker getWorker() {
return firstChannel.getWorker();
}
public boolean isOpen() {
return firstChannel.isOpen();
}
public void close() throws IOException {
final StreamSourceChannel old = channel;
if (old != null) {
channel = null;
old.close();
}
}
public boolean supportsOption(final Option<?> option) {
return firstChannel.supportsOption(option);
}
public <T> T getOption(final Option<T> option) throws IOException {
return firstChannel.getOption(option);
}
public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
return firstChannel.setOption(option, value);
}
public StreamSourceChannel getChannel() {
return firstChannel;
}
class BufferHolder implements StreamSourceChannel {
private final StreamSourceChannel next;
private final Pooled<ByteBuffer> buffer;
BufferHolder(final StreamSourceChannel next, final Pooled<ByteBuffer> buffer) {
this.next = next;
this.buffer = buffer;
}
public long transferTo(long position, long count, FileChannel target) throws IOException {
long cnt;
final ByteBuffer src;
try {
src = buffer.getResource();
final int pos = src.position();
final int rem = src.remaining();
if (rem > count) try {
src.limit(pos + (int) count);
return target.write(src, position);
} finally {
src.limit(pos + rem);
} else {
cnt = target.write(src, position);
if (cnt == rem) {
channel = next;
buffer.free();
} else {
return cnt;
}
position += cnt;
count -= cnt;
}
} catch (IllegalStateException ignored) {
channel = next;
cnt = 0L;
}
return cnt + next.transferTo(position, count, target);
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
long cnt;
throughBuffer.clear();
final ByteBuffer src;
try {
src = buffer.getResource();
final int pos = src.position();
final int rem = src.remaining();
if (rem > count) try {
src.limit(pos + (int) count);
throughBuffer.limit(0);
return target.write(src);
} finally {
src.limit(pos + rem);
} else {
cnt = target.write(src);
if (cnt == rem) {
channel = next;
buffer.free();
} else {
return cnt;
}
}
} catch (IllegalStateException ignored) {
channel = next;
cnt = 0L;
}
final long res = next.transferTo(count - cnt, throughBuffer, target);
return res > 0L ? cnt + res : cnt > 0L ? cnt : res;
}
public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
long cnt;
try {
final ByteBuffer src = buffer.getResource();
cnt = Buffers.copy(dsts, offset, length, src);
if (src.hasRemaining()) {
return cnt;
}
final StreamSourceChannel next = channel = this.next;
buffer.free();
if (cnt > 0L && next == firstChannel) {
return cnt;
}
} catch (IllegalStateException ignored) {
channel = next;
cnt = 0;
}
final long res = next.read(dsts, offset, length);
return res > 0 ? res + cnt : cnt > 0 ? cnt : res;
}
public long read(final ByteBuffer[] dsts) throws IOException {
return read(dsts, 0, dsts.length);
}
public int read(final ByteBuffer dst) throws IOException {
int cnt;
if (! dst.hasRemaining()) {
return 0;
}
try {
final ByteBuffer src = buffer.getResource();
cnt = Buffers.copy(dst, src);
if (src.hasRemaining()) {
return cnt;
}
final StreamSourceChannel next = channel = this.next;
buffer.free();
if (cnt > 0 && next == firstChannel) {
return cnt;
}
} catch (IllegalStateException ignored) {
channel = next;
cnt = 0;
}
final int res = next.read(dst);
return res > 0 ? res + cnt : cnt > 0 ? cnt : res;
}
public void close() throws IOException {
buffer.free();
next.close();
}
public void resumeReads() {
firstChannel.wakeupReads();
}
public void shutdownReads() throws IOException {
buffer.free();
next.shutdownReads();
}
public void awaitReadable() throws IOException {
}
public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
}
public boolean isOpen() {
throw new UnsupportedOperationException();
}
public ChannelListener.Setter<? extends StreamSourceChannel> getReadSetter() {
throw new UnsupportedOperationException();
}
public ChannelListener.Setter<? extends StreamSourceChannel> getCloseSetter() {
throw new UnsupportedOperationException();
}
public void suspendReads() {
throw new UnsupportedOperationException();
}
public boolean isReadResumed() {
throw new UnsupportedOperationException();
}
public void wakeupReads() {
throw new UnsupportedOperationException();
}
@Deprecated
public XnioExecutor getReadThread() {
throw new UnsupportedOperationException();
}
public XnioIoThread getIoThread() {
throw new UnsupportedOperationException();
}
public XnioWorker getWorker() {
throw new UnsupportedOperationException();
}
public boolean supportsOption(final Option<?> option) {
throw new UnsupportedOperationException();
}
public <T> T getOption(final Option<T> option) throws IOException {
throw new UnsupportedOperationException();
}
public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
throw new UnsupportedOperationException();
}
}
}