package org.xnio.conduits;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import org.xnio.Buffers;
import org.xnio.Pooled;
import org.xnio.channels.StreamSinkChannel;
public final class PushBackStreamSourceConduit extends AbstractStreamSourceConduit<StreamSourceConduit> implements StreamSourceConduit {
private StreamSourceConduit current = next;
private boolean shutdown;
public PushBackStreamSourceConduit(final StreamSourceConduit next) {
super(next);
}
public void resumeReads() {
current.resumeReads();
}
public int read(final ByteBuffer dst) throws IOException {
return current.read(dst);
}
public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
return current.read(dsts, offs, len);
}
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
return current.transferTo(position, count, target);
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
return current.transferTo(count, throughBuffer, target);
}
public void awaitReadable() throws IOException {
current.awaitReadable();
}
public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
current.awaitReadable(time, timeUnit);
}
public void terminateReads() throws IOException {
shutdown = true;
current.terminateReads();
}
public void setReadReadyHandler(final ReadReadyHandler handler) {
current.setReadReadyHandler(handler);
}
public void pushBack(Pooled<ByteBuffer> pooledBuffer) {
if (pooledBuffer == null) {
return;
}
if (shutdown || ! pooledBuffer.getResource().hasRemaining()) {
pooledBuffer.free();
} else {
current = new BufferConduit(current, pooledBuffer);
}
}
class BufferConduit extends AbstractStreamSourceConduit<StreamSourceConduit> implements StreamSourceConduit {
private final Pooled<ByteBuffer> pooledBuffer;
BufferConduit(final StreamSourceConduit next, final Pooled<ByteBuffer> pooledBuffer) {
super(next);
this.pooledBuffer = pooledBuffer;
}
public void resumeReads() {
next.wakeupReads();
}
public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
}
public void awaitReadable() throws IOException {
}
public int read(final ByteBuffer dst) throws IOException {
int cnt;
if (! dst.hasRemaining()) {
return 0;
}
final StreamSourceConduit next = this.next;
try {
final ByteBuffer src = pooledBuffer.getResource();
cnt = Buffers.copy(dst, src);
if (src.hasRemaining()) {
return cnt;
}
current = next;
pooledBuffer.free();
if (cnt > 0 && next == PushBackStreamSourceConduit.this.next) {
return cnt;
}
} catch (IllegalStateException ignored) {
current = next;
cnt = 0;
}
final int res = next.read(dst);
return res > 0 ? res + cnt : cnt > 0 ? cnt : res;
}
public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
long cnt;
final StreamSourceConduit next = this.next;
try {
final ByteBuffer src = pooledBuffer.getResource();
cnt = Buffers.copy(dsts, offs, len, src);
if (src.hasRemaining()) {
return cnt;
}
current = next;
pooledBuffer.free();
if (cnt > 0L && next == PushBackStreamSourceConduit.this.next) {
return cnt;
}
} catch (IllegalStateException ignored) {
current = next;
cnt = 0;
}
final long res = next.read(dsts, offs, len);
return res > 0 ? res + cnt : cnt > 0 ? cnt : res;
}
public long transferTo(long position, long count, final FileChannel target) throws IOException {
long cnt;
final ByteBuffer src;
try {
src = pooledBuffer.getResource();
final int pos = src.position();
final int rem = src.remaining();
if (rem > count) try {
src.limit(pos + (int) count);
return target.write(src, position);
} finally {
src.limit(pos + rem);
} else {
cnt = target.write(src, position);
if (cnt == rem) {
current = next;
pooledBuffer.free();
} else {
return cnt;
}
position += cnt;
count -= cnt;
}
} catch (IllegalStateException ignored) {
current = next;
cnt = 0L;
}
return cnt + next.transferTo(position, count, target);
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
long cnt;
final ByteBuffer src;
try {
src = pooledBuffer.getResource();
final int pos = src.position();
final int rem = src.remaining();
if (rem > count) try {
src.limit(pos + (int) count);
int res = target.write(src);
if(res == 0) {
throughBuffer.clear();
Buffers.copy(throughBuffer, src);
throughBuffer.flip();
} else {
throughBuffer.clear();
throughBuffer.flip();
}
return res;
} finally {
src.limit(pos + rem);
} else {
cnt = target.write(src);
if (cnt == rem) {
current = next;
pooledBuffer.free();
} else {
if (cnt == 0) {
throughBuffer.clear();
Buffers.copy(throughBuffer, src);
throughBuffer.flip();
} else {
throughBuffer.clear();
throughBuffer.flip();
}
return cnt;
}
}
} catch (IllegalStateException ignored) {
current = next;
cnt = 0L;
}
final long res = next.transferTo(count - cnt, throughBuffer, target);
return res > 0L ? cnt + res : cnt > 0L ? cnt : res;
}
}
}