/*
* JBoss, Home of Professional Open Source.
*
* Copyright 2010 Red Hat, Inc. and/or its affiliates, and individual
* contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio.streams;
import static org.xnio._private.Messages.msg;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
An in-VM pipe between an input stream and an output stream, which does not suffer from the bugs in PipedInputStream
. Author: David M. Lloyd
/**
* An in-VM pipe between an input stream and an output stream, which does not suffer from the
* bugs in {@link PipedInputStream}.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public final class Pipe {
private final Object lock = new Object();
the point at which a read shall occur /** the point at which a read shall occur **/
private int tail;
the size of the buffer content /** the size of the buffer content **/
private int size;
private final byte[] buffer;
private boolean writeClosed;
private boolean readClosed;
Construct a new instance.
Params: - bufferSize – the buffer size to use
/**
* Construct a new instance.
*
* @param bufferSize the buffer size to use
*/
public Pipe(int bufferSize) {
buffer = new byte[bufferSize];
}
Wait for the read side to close. Used when the writer needs to know when
the reader finishes consuming a message.
/**
* Wait for the read side to close. Used when the writer needs to know when
* the reader finishes consuming a message.
*/
public void await() {
boolean intr = false;
final Object lock = this.lock;
try {
synchronized (lock) {
while (! readClosed) {
try {
lock.wait();
} catch (InterruptedException e) {
intr = true;
}
}
}
} finally {
if (intr) {
Thread.currentThread().interrupt();
}
}
}
private final InputStream in = new InputStream() {
public int read() throws IOException {
final Object lock = Pipe.this.lock;
synchronized (lock) {
if (writeClosed && size == 0) {
return -1;
}
while (size == 0) {
try {
lock.wait();
if (writeClosed && size == 0) {
return -1;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw msg.interruptedIO();
}
}
lock.notifyAll();
int tail= Pipe.this.tail;
try {
return buffer[tail++] & 0xff;
} finally {
Pipe.this.tail = tail == buffer.length ? 0 : tail;
size--;
}
}
}
public int read(final byte[] b, final int off, final int len) throws IOException {
final Object lock = Pipe.this.lock;
synchronized (lock) {
if (writeClosed && size == 0) {
return -1;
}
if (len == 0) {
return 0;
}
int size;
while ((size = Pipe.this.size) == 0) {
try {
lock.wait();
if (writeClosed && (size = Pipe.this.size) == 0) {
return -1;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw msg.interruptedIO();
}
}
final byte[] buffer = Pipe.this.buffer;
final int bufLen = buffer.length;
int cnt;
int tail = Pipe.this.tail;
if (size + tail > bufLen) {
// wrapped
final int lastLen = bufLen - tail;
if (lastLen < len) {
final int firstLen = tail + size - bufLen;
System.arraycopy(buffer, tail, b, off, lastLen);
int rem = Math.min(len - lastLen, firstLen);
System.arraycopy(buffer, 0, b, off + lastLen, rem);
cnt = rem + lastLen;
} else {
System.arraycopy(buffer, tail, b, off, len);
cnt = len;
}
} else {
// not wrapped
cnt = Math.min(len, size);
System.arraycopy(buffer, tail, b, off, cnt);
}
tail += cnt;
size -= cnt;
Pipe.this.tail = tail >= bufLen ? tail - bufLen : tail;
Pipe.this.size = size;
lock.notifyAll();
return cnt;
}
}
public void close() throws IOException {
final Object lock = Pipe.this.lock;
synchronized (lock) {
writeClosed = true;
readClosed = true;
// closing the read side drops the remaining bytes
size = 0;
lock.notifyAll();
return;
}
}
public String toString() {
return "Pipe read half";
}
};
private final OutputStream out = new OutputStream() {
public void write(final int b) throws IOException {
final Object lock = Pipe.this.lock;
synchronized (lock) {
if (writeClosed) {
throw msg.streamClosed();
}
final byte[] buffer = Pipe.this.buffer;
final int bufLen = buffer.length;
while (size == bufLen) {
try {
lock.wait();
if (writeClosed) {
throw msg.streamClosed();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw msg.interruptedIO();
}
}
final int tail = Pipe.this.tail;
int startPos = tail + size;
if (startPos >= bufLen) {
buffer[startPos - bufLen] = (byte) b;
} else {
buffer[startPos] = (byte) b;
}
size ++;
lock.notifyAll();
}
}
public void write(final byte[] b, int off, final int len) throws IOException {
int remaining = len;
final Object lock = Pipe.this.lock;
synchronized (lock) {
if (writeClosed) {
throw msg.streamClosed();
}
final byte[] buffer = Pipe.this.buffer;
final int bufLen = buffer.length;
int size;
int tail;
int cnt;
while (remaining > 0) {
while ((size = Pipe.this.size) == bufLen) {
try {
lock.wait();
if (writeClosed) {
throw msg.streamClosed();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw msg.interruptedIO(len - remaining);
}
}
tail = Pipe.this.tail;
int startPos = tail + size;
if (startPos >= bufLen) {
// read wraps, write doesn't
startPos -= bufLen;
cnt = Math.min(remaining, bufLen - size);
System.arraycopy(b, off, buffer, startPos, cnt);
remaining -= cnt;
off += cnt;
} else {
// write wraps, read doesn't
final int firstPart = Math.min(remaining, bufLen - (tail + size));
System.arraycopy(b, off, buffer, startPos, firstPart);
off += firstPart;
remaining -= firstPart;
if (remaining > 0) {
final int latter = Math.min(remaining, tail);
System.arraycopy(b, off, buffer, 0, latter);
cnt = firstPart + latter;
off += latter;
remaining -= latter;
} else {
cnt = firstPart;
}
}
Pipe.this.size += cnt;
lock.notifyAll();
}
}
}
public void close() throws IOException {
final Object lock = Pipe.this.lock;
synchronized (lock) {
writeClosed = true;
lock.notifyAll();
return;
}
}
public String toString() {
return "Pipe write half";
}
};
Get the input (read) side of the pipe.
Returns: the input side
/**
* Get the input (read) side of the pipe.
*
* @return the input side
*/
public InputStream getIn() {
return in;
}
Get the output (write) side of the pipe.
Returns: the output side
/**
* Get the output (write) side of the pipe.
*
* @return the output side
*/
public OutputStream getOut() {
return out;
}
}