/*
 * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package java.io;

A piped input stream should be connected to a piped output stream; the piped input stream then provides whatever data bytes are written to the piped output stream. Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread. The piped input stream contains a buffer, decoupling read operations from write operations, within limits. A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.
Author: James Gosling
See Also:
Since: 1.0
/** * A piped input stream should be connected * to a piped output stream; the piped input * stream then provides whatever data bytes * are written to the piped output stream. * Typically, data is read from a <code>PipedInputStream</code> * object by one thread and data is written * to the corresponding <code>PipedOutputStream</code> * by some other thread. Attempting to use * both objects from a single thread is not * recommended, as it may deadlock the thread. * The piped input stream contains a buffer, * decoupling read operations from write operations, * within limits. * A pipe is said to be <a id="BROKEN"> <i>broken</i> </a> if a * thread that was providing data bytes to the connected * piped output stream is no longer alive. * * @author James Gosling * @see java.io.PipedOutputStream * @since 1.0 */
public class PipedInputStream extends InputStream { boolean closedByWriter; volatile boolean closedByReader; boolean connected; /* REMIND: identification of the read and write sides needs to be more sophisticated. Either using thread groups (but what about pipes within a thread?) or using finalization (but it may be a long time until the next GC). */ Thread readSide; Thread writeSide; private static final int DEFAULT_PIPE_SIZE = 1024;
The default size of the pipe's circular input buffer.
Since: 1.1
/** * The default size of the pipe's circular input buffer. * @since 1.1 */
// This used to be a constant before the pipe size was allowed // to change. This field will continue to be maintained // for backward compatibility. protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
The circular buffer into which incoming data is placed.
Since: 1.1
/** * The circular buffer into which incoming data is placed. * @since 1.1 */
protected byte buffer[];
The index of the position in the circular buffer at which the next byte of data will be stored when received from the connected piped output stream. in<0 implies the buffer is empty, in==out implies the buffer is full
Since: 1.1
/** * The index of the position in the circular buffer at which the * next byte of data will be stored when received from the connected * piped output stream. <code>in&lt;0</code> implies the buffer is empty, * <code>in==out</code> implies the buffer is full * @since 1.1 */
protected int in = -1;
The index of the position in the circular buffer at which the next byte of data will be read by this piped input stream.
Since: 1.1
/** * The index of the position in the circular buffer at which the next * byte of data will be read by this piped input stream. * @since 1.1 */
protected int out = 0;
Creates a PipedInputStream so that it is connected to the piped output stream src. Data bytes written to src will then be available as input from this stream.
Params:
  • src – the stream to connect to.
Throws:
/** * Creates a <code>PipedInputStream</code> so * that it is connected to the piped output * stream <code>src</code>. Data bytes written * to <code>src</code> will then be available * as input from this stream. * * @param src the stream to connect to. * @exception IOException if an I/O error occurs. */
public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); }
Creates a PipedInputStream so that it is connected to the piped output stream src and uses the specified pipe size for the pipe's buffer. Data bytes written to src will then be available as input from this stream.
Params:
  • src – the stream to connect to.
  • pipeSize – the size of the pipe's buffer.
Throws:
Since: 1.6
/** * Creates a <code>PipedInputStream</code> so that it is * connected to the piped output stream * <code>src</code> and uses the specified pipe size for * the pipe's buffer. * Data bytes written to <code>src</code> will then * be available as input from this stream. * * @param src the stream to connect to. * @param pipeSize the size of the pipe's buffer. * @exception IOException if an I/O error occurs. * @exception IllegalArgumentException if {@code pipeSize <= 0}. * @since 1.6 */
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); }
Creates a PipedInputStream so that it is not yet connected. It must be connected to a PipedOutputStream before being used.
/** * Creates a <code>PipedInputStream</code> so * that it is not yet {@linkplain #connect(java.io.PipedOutputStream) * connected}. * It must be {@linkplain java.io.PipedOutputStream#connect( * java.io.PipedInputStream) connected} to a * <code>PipedOutputStream</code> before being used. */
public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); }
Creates a PipedInputStream so that it is not yet connected and uses the specified pipe size for the pipe's buffer. It must be connected to a PipedOutputStream before being used.
Params:
  • pipeSize – the size of the pipe's buffer.
Throws:
Since: 1.6
/** * Creates a <code>PipedInputStream</code> so that it is not yet * {@linkplain #connect(java.io.PipedOutputStream) connected} and * uses the specified pipe size for the pipe's buffer. * It must be {@linkplain java.io.PipedOutputStream#connect( * java.io.PipedInputStream) * connected} to a <code>PipedOutputStream</code> before being used. * * @param pipeSize the size of the pipe's buffer. * @exception IllegalArgumentException if {@code pipeSize <= 0}. * @since 1.6 */
public PipedInputStream(int pipeSize) { initPipe(pipeSize); } private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; }
Causes this piped input stream to be connected to the piped output stream src. If this object is already connected to some other piped output stream, an IOException is thrown.

If src is an unconnected piped output stream and snk is an unconnected piped input stream, they may be connected by either the call:

snk.connect(src) 

or the call:

src.connect(snk) 

The two calls have the same effect.

Params:
  • src – The piped output stream to connect to.
Throws:
/** * Causes this piped input stream to be connected * to the piped output stream <code>src</code>. * If this object is already connected to some * other piped output stream, an <code>IOException</code> * is thrown. * <p> * If <code>src</code> is an * unconnected piped output stream and <code>snk</code> * is an unconnected piped input stream, they * may be connected by either the call: * * <pre><code>snk.connect(src)</code> </pre> * <p> * or the call: * * <pre><code>src.connect(snk)</code> </pre> * <p> * The two calls have the same effect. * * @param src The piped output stream to connect to. * @exception IOException if an I/O error occurs. */
public void connect(PipedOutputStream src) throws IOException { src.connect(this); }
Receives a byte of data. This method will block if no input is available.
Params:
  • b – the byte being received
Throws:
Since: 1.1
/** * Receives a byte of data. This method will block if no input is * available. * @param b the byte being received * @exception IOException If the pipe is <a href="#BROKEN"> <code>broken</code></a>, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed, or if an I/O error occurs. * @since 1.1 */
protected synchronized void receive(int b) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } buffer[in++] = (byte)(b & 0xFF); if (in >= buffer.length) { in = 0; } }
Receives data into an array of bytes. This method will block until some input is available.
Params:
  • b – the buffer into which the data is received
  • off – the start offset of the data
  • len – the maximum number of bytes received
Throws:
/** * Receives data into an array of bytes. This method will * block until some input is available. * @param b the buffer into which the data is received * @param off the start offset of the data * @param len the maximum number of bytes received * @exception IOException If the pipe is <a href="#BROKEN"> broken</a>, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed,or if an I/O error occurs. */
synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out) awaitSpace(); int nextTransferAmount = 0; if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } } private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } }
Notifies all waiting threads that the last byte of data has been received.
/** * Notifies all waiting threads that the last byte of data has been * received. */
synchronized void receivedLast() { closedByWriter = true; notifyAll(); }
Reads the next byte of data from this piped input stream. The value byte is returned as an int in the range 0 to 255. This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
Throws:
Returns: the next byte of data, or -1 if the end of the stream is reached.
/** * Reads the next byte of data from this piped input stream. The * value byte is returned as an <code>int</code> in the range * <code>0</code> to <code>255</code>. * This method blocks until input data is available, the end of the * stream is detected, or an exception is thrown. * * @return the next byte of data, or <code>-1</code> if the end of the * stream is reached. * @exception IOException if the pipe is * {@link #connect(java.io.PipedOutputStream) unconnected}, * <a href="#BROKEN"> <code>broken</code></a>, closed, * or if an I/O error occurs. */
public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; }
Reads up to len bytes of data from this piped input stream into an array of bytes. Less than len bytes will be read if the end of the data stream is reached or if len exceeds the pipe's buffer size. If len is zero, then no bytes are read and 0 is returned; otherwise, the method blocks until at least 1 byte of input is available, end of the stream has been detected, or an exception is thrown.
Params:
  • b – the buffer into which the data is read.
  • off – the start offset in the destination array b
  • len – the maximum number of bytes read.
Throws:
Returns: the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached.
/** * Reads up to <code>len</code> bytes of data from this piped input * stream into an array of bytes. Less than <code>len</code> bytes * will be read if the end of the data stream is reached or if * <code>len</code> exceeds the pipe's buffer size. * If <code>len </code> is zero, then no bytes are read and 0 is returned; * otherwise, the method blocks until at least 1 byte of input is * available, end of the stream has been detected, or an exception is * thrown. * * @param b the buffer into which the data is read. * @param off the start offset in the destination array <code>b</code> * @param len the maximum number of bytes read. * @return the total number of bytes read into the buffer, or * <code>-1</code> if there is no more data because the end of * the stream has been reached. * @exception NullPointerException If <code>b</code> is <code>null</code>. * @exception IndexOutOfBoundsException If <code>off</code> is negative, * <code>len</code> is negative, or <code>len</code> is greater than * <code>b.length - off</code> * @exception IOException if the pipe is <a href="#BROKEN"> <code>broken</code></a>, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed, or if an I/O error occurs. */
public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; }
Returns the number of bytes that can be read from this input stream without blocking.
Throws:
Returns:the number of bytes that can be read from this input stream without blocking, or 0 if this input stream has been closed by invoking its close() method, or if the pipe is unconnected, or broken.
Since: 1.0.2
/** * Returns the number of bytes that can be read from this input * stream without blocking. * * @return the number of bytes that can be read from this input stream * without blocking, or {@code 0} if this input stream has been * closed by invoking its {@link #close()} method, or if the pipe * is {@link #connect(java.io.PipedOutputStream) unconnected}, or * <a href="#BROKEN"> <code>broken</code></a>. * * @exception IOException if an I/O error occurs. * @since 1.0.2 */
public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; }
Closes this piped input stream and releases any system resources associated with the stream.
Throws:
  • IOException – if an I/O error occurs.
/** * Closes this piped input stream and releases any system resources * associated with the stream. * * @exception IOException if an I/O error occurs. */
public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } } }