/*
* Copyright (c) 2002-2018, the original author or authors.
*
* This software is distributable under the BSD license. See the terms of the
* BSD license in the documentation provided with this software.
*
* http://www.opensource.org/licenses/bsd-license.php
*/
package jdk.internal.org.jline.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
This class wraps a regular input stream and allows it to appear as if it
is non-blocking; that is, reads can be performed against it that timeout
if no data is seen for a period of time. This effect is achieved by having
a separate thread perform all non-blocking read requests and then
waiting on the thread to complete.
VERY IMPORTANT NOTES
- This class is not thread safe. It expects at most one reader.
- The
shutdown()
method must be called in order to shut down the thread that handles blocking I/O.
/**
* This class wraps a regular input stream and allows it to appear as if it
* is non-blocking; that is, reads can be performed against it that timeout
* if no data is seen for a period of time. This effect is achieved by having
* a separate thread perform all non-blocking read requests and then
* waiting on the thread to complete.
*
* <p>VERY IMPORTANT NOTES
* <ul>
* <li> This class is not thread safe. It expects at most one reader.
* <li> The {@link #shutdown()} method must be called in order to shut down
* the thread that handles blocking I/O.
* </ul>
*/
public class NonBlockingInputStreamImpl
extends NonBlockingInputStream
{
private InputStream in; // The actual input stream
private int b = READ_EXPIRED; // Recently read byte
private String name;
private boolean threadIsReading = false;
private IOException exception = null;
private long threadDelay = 60 * 1000;
private Thread thread;
Creates a NonBlockingReader
out of a normal blocking reader. Note that this call also spawn a separate thread to perform the blocking I/O on behalf of the thread that is using this class. The shutdown()
method must be called in order to shut this thread down. Params: - name – The stream name
- in – The reader to wrap
/**
* Creates a <code>NonBlockingReader</code> out of a normal blocking
* reader. Note that this call also spawn a separate thread to perform the
* blocking I/O on behalf of the thread that is using this class. The
* {@link #shutdown()} method must be called in order to shut this thread down.
* @param name The stream name
* @param in The reader to wrap
*/
public NonBlockingInputStreamImpl(String name, InputStream in) {
this.in = in;
this.name = name;
}
private synchronized void startReadingThreadIfNeeded() {
if (thread == null) {
thread = new Thread(this::run);
thread.setName(name + " non blocking reader thread");
thread.setDaemon(true);
thread.start();
}
}
Shuts down the thread that is handling blocking I/O. Note that if the
thread is currently blocked waiting for I/O it will not actually
shut down until the I/O is received.
/**
* Shuts down the thread that is handling blocking I/O. Note that if the
* thread is currently blocked waiting for I/O it will not actually
* shut down until the I/O is received.
*/
public synchronized void shutdown() {
if (thread != null) {
notify();
}
}
@Override
public void close() throws IOException {
/*
* The underlying input stream is closed first. This means that if the
* I/O thread was blocked waiting on input, it will be woken for us.
*/
in.close();
shutdown();
}
Attempts to read a byte from the input stream for a specific
period of time.
Params: - timeout – The amount of time to wait for the character
- isPeek –
true
if the byte read must not be consumed
Throws: - IOException – if anything wrong happens
Returns: The byte read, -1 if EOF is reached, or -2 if the
read timed out.
/**
* Attempts to read a byte from the input stream for a specific
* period of time.
* @param timeout The amount of time to wait for the character
* @param isPeek <code>true</code>if the byte read must not be consumed
* @return The byte read, -1 if EOF is reached, or -2 if the
* read timed out.
* @throws IOException if anything wrong happens
*/
public synchronized int read(long timeout, boolean isPeek) throws IOException {
/*
* If the thread hit an IOException, we report it.
*/
if (exception != null) {
assert b == READ_EXPIRED;
IOException toBeThrown = exception;
if (!isPeek)
exception = null;
throw toBeThrown;
}
/*
* If there was a pending character from the thread, then
* we send it. If the timeout is 0L or the thread was shut down
* then do a local read.
*/
if (b >= -1) {
assert exception == null;
}
else if (!isPeek && timeout <= 0L && !threadIsReading) {
b = in.read();
}
else {
/*
* If the thread isn't reading already, then ask it to do so.
*/
if (!threadIsReading) {
threadIsReading = true;
startReadingThreadIfNeeded();
notifyAll();
}
boolean isInfinite = (timeout <= 0L);
/*
* So the thread is currently doing the reading for us. So
* now we play the waiting game.
*/
while (isInfinite || timeout > 0L) {
long start = System.currentTimeMillis ();
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
wait(timeout);
}
catch (InterruptedException e) {
exception = (IOException) new InterruptedIOException().initCause(e);
}
if (exception != null) {
assert b == READ_EXPIRED;
IOException toBeThrown = exception;
if (!isPeek)
exception = null;
throw toBeThrown;
}
if (b >= -1) {
assert exception == null;
break;
}
if (!isInfinite) {
timeout -= System.currentTimeMillis() - start;
}
}
}
/*
* b is the character that was just read. Either we set it because
* a local read was performed or the read thread set it (or failed to
* change it). We will return it's value, but if this was a peek
* operation, then we leave it in place.
*/
int ret = b;
if (!isPeek) {
b = READ_EXPIRED;
}
return ret;
}
private void run () {
Log.debug("NonBlockingInputStream start");
boolean needToRead;
try {
while (true) {
/*
* Synchronize to grab variables accessed by both this thread
* and the accessing thread.
*/
synchronized (this) {
needToRead = this.threadIsReading;
try {
/*
* Nothing to do? Then wait.
*/
if (!needToRead) {
wait(threadDelay);
}
} catch (InterruptedException e) {
/* IGNORED */
}
needToRead = this.threadIsReading;
if (!needToRead) {
return;
}
}
/*
* We're not shutting down, but we need to read. This cannot
* happen while we are holding the lock (which we aren't now).
*/
int byteRead = READ_EXPIRED;
IOException failure = null;
try {
byteRead = in.read();
} catch (IOException e) {
failure = e;
}
/*
* Re-grab the lock to update the state.
*/
synchronized (this) {
exception = failure;
b = byteRead;
threadIsReading = false;
notify();
}
// If end of stream, exit the loop thread
if (byteRead < 0) {
return;
}
}
} catch (Throwable t) {
Log.warn("Error in NonBlockingInputStream thread", t);
} finally {
Log.debug("NonBlockingInputStream shutdown");
synchronized (this) {
thread = null;
threadIsReading = false;
}
}
}
}