package com.barchart.udt.net;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.IllegalBlockingModeException;
import com.barchart.udt.ErrorUDT;
import com.barchart.udt.SocketUDT;
public class NetInputStreamUDT extends InputStream {
protected final SocketUDT socketUDT;
public NetInputStreamUDT(final SocketUDT socketUDT) {
if (!socketUDT.isBlocking()) {
throw new IllegalBlockingModeException();
}
this.socketUDT = socketUDT;
}
@Override
public int read() throws IOException {
final byte[] data = new byte[1];
final int count = read(data);
assert count == 1;
return data[0];
}
@Override
public int read(final byte[] bytes) throws IOException {
return read(bytes, 0, bytes.length);
}
@SuppressWarnings("serial")
@Override
public int read(final byte[] bytes, final int off, final int len)
throws IOException {
final int count = socketUDT.receive(bytes, off, off + len);
if (count > 0) {
assert count <= len;
return count;
}
if (count == 0) {
throw new ExceptionReceiveUDT(socketUDT.id(),
ErrorUDT.USER_DEFINED_MESSAGE, "UDT receive time out") {
};
}
throw new IllegalStateException("should not happen");
}
@Override
public void close() throws IOException {
socketUDT.close();
}
@Override
public int available() throws IOException {
return 0;
}
@Override
public long skip(final long numbytes) throws IOException {
if (numbytes <= 0) {
return 0;
}
long n = numbytes;
final int buflen = (int) Math.min(1024, n);
final byte data[] = new byte[buflen];
while (n > 0) {
final int r = read(data, 0, (int) Math.min(buflen, n));
if (r < 0) {
break;
}
n -= r;
}
return numbytes - n;
}
@Override
public void mark(final int readlimit) {
throw new UnsupportedOperationException("mark not supported");
}
@Override
public void reset() throws IOException {
throw new UnsupportedOperationException("reset not supported");
}
@Override
public boolean markSupported() {
return false;
}
}