/*
* Copyright (c) 2002-2018, the original author or authors.
*
* This software is distributable under the BSD license. See the terms of the
* BSD license in the documentation provided with this software.
*
* http://www.opensource.org/licenses/bsd-license.php
*/
package jdk.internal.org.jline.utils;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Reader;
This class wraps a regular reader and allows it to appear as if it
is non-blocking; that is, reads can be performed against it that timeout
if no data is seen for a period of time. This effect is achieved by having
a separate thread perform all non-blocking read requests and then
waiting on the thread to complete.
VERY IMPORTANT NOTES
- This class is not thread safe. It expects at most one reader.
- The
shutdown()
method must be called in order to shut down the thread that handles blocking I/O.
Author: Scott C. Gray <scottgray1@gmail.com> Since: 2.7
/**
* This class wraps a regular reader and allows it to appear as if it
* is non-blocking; that is, reads can be performed against it that timeout
* if no data is seen for a period of time. This effect is achieved by having
* a separate thread perform all non-blocking read requests and then
* waiting on the thread to complete.
*
* <p>VERY IMPORTANT NOTES
* <ul>
* <li> This class is not thread safe. It expects at most one reader.
* <li> The {@link #shutdown()} method must be called in order to shut down
* the thread that handles blocking I/O.
* </ul>
* @since 2.7
* @author Scott C. Gray <scottgray1@gmail.com>
*/
public class NonBlockingReaderImpl
extends NonBlockingReader
{
public static final int READ_EXPIRED = -2;
private Reader in; // The actual input stream
private int ch = READ_EXPIRED; // Recently read character
private String name;
private boolean threadIsReading = false;
private IOException exception = null;
private long threadDelay = 60 * 1000;
private Thread thread;
Creates a NonBlockingReader
out of a normal blocking reader. Note that this call also spawn a separate thread to perform the blocking I/O on behalf of the thread that is using this class. The shutdown()
method must be called in order to shut this thread down. Params: - name – The reader name
- in – The reader to wrap
/**
* Creates a <code>NonBlockingReader</code> out of a normal blocking
* reader. Note that this call also spawn a separate thread to perform the
* blocking I/O on behalf of the thread that is using this class. The
* {@link #shutdown()} method must be called in order to shut this thread down.
* @param name The reader name
* @param in The reader to wrap
*/
public NonBlockingReaderImpl(String name, Reader in) {
this.in = in;
this.name = name;
}
private synchronized void startReadingThreadIfNeeded() {
if (thread == null) {
thread = new Thread(this::run);
thread.setName(name + " non blocking reader thread");
thread.setDaemon(true);
thread.start();
}
}
Shuts down the thread that is handling blocking I/O. Note that if the
thread is currently blocked waiting for I/O it will not actually
shut down until the I/O is received.
/**
* Shuts down the thread that is handling blocking I/O. Note that if the
* thread is currently blocked waiting for I/O it will not actually
* shut down until the I/O is received.
*/
public synchronized void shutdown() {
if (thread != null) {
notify();
}
}
@Override
public void close() throws IOException {
/*
* The underlying input stream is closed first. This means that if the
* I/O thread was blocked waiting on input, it will be woken for us.
*/
in.close();
shutdown();
}
@Override
public synchronized boolean ready() throws IOException {
return ch >= 0 || in.ready();
}
Attempts to read a character from the input stream for a specific
period of time.
Params: - timeout – The amount of time to wait for the character
Returns: The character read, -1 if EOF is reached, or -2 if the
read timed out.
/**
* Attempts to read a character from the input stream for a specific
* period of time.
* @param timeout The amount of time to wait for the character
* @return The character read, -1 if EOF is reached, or -2 if the
* read timed out.
*/
protected synchronized int read(long timeout, boolean isPeek) throws IOException {
/*
* If the thread hit an IOException, we report it.
*/
if (exception != null) {
assert ch == READ_EXPIRED;
IOException toBeThrown = exception;
if (!isPeek)
exception = null;
throw toBeThrown;
}
/*
* If there was a pending character from the thread, then
* we send it. If the timeout is 0L or the thread was shut down
* then do a local read.
*/
if (ch >= -1) {
assert exception == null;
}
else if (!isPeek && timeout <= 0L && !threadIsReading) {
ch = in.read();
}
else {
/*
* If the thread isn't reading already, then ask it to do so.
*/
if (!threadIsReading) {
threadIsReading = true;
startReadingThreadIfNeeded();
notifyAll();
}
boolean isInfinite = (timeout <= 0L);
/*
* So the thread is currently doing the reading for us. So
* now we play the waiting game.
*/
while (isInfinite || timeout > 0L) {
long start = System.currentTimeMillis ();
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
wait(timeout);
}
catch (InterruptedException e) {
exception = (IOException) new InterruptedIOException().initCause(e);
}
if (exception != null) {
assert ch == READ_EXPIRED;
IOException toBeThrown = exception;
if (!isPeek)
exception = null;
throw toBeThrown;
}
if (ch >= -1) {
assert exception == null;
break;
}
if (!isInfinite) {
timeout -= System.currentTimeMillis() - start;
}
}
}
/*
* ch is the character that was just read. Either we set it because
* a local read was performed or the read thread set it (or failed to
* change it). We will return it's value, but if this was a peek
* operation, then we leave it in place.
*/
int ret = ch;
if (!isPeek) {
ch = READ_EXPIRED;
}
return ret;
}
private void run () {
Log.debug("NonBlockingReader start");
boolean needToRead;
try {
while (true) {
/*
* Synchronize to grab variables accessed by both this thread
* and the accessing thread.
*/
synchronized (this) {
needToRead = this.threadIsReading;
try {
/*
* Nothing to do? Then wait.
*/
if (!needToRead) {
wait(threadDelay);
}
} catch (InterruptedException e) {
/* IGNORED */
}
needToRead = this.threadIsReading;
if (!needToRead) {
return;
}
}
/*
* We're not shutting down, but we need to read. This cannot
* happen while we are holding the lock (which we aren't now).
*/
int charRead = READ_EXPIRED;
IOException failure = null;
try {
charRead = in.read();
// if (charRead < 0) {
// continue;
// }
} catch (IOException e) {
failure = e;
// charRead = -1;
}
/*
* Re-grab the lock to update the state.
*/
synchronized (this) {
exception = failure;
ch = charRead;
threadIsReading = false;
notify();
}
}
} catch (Throwable t) {
Log.warn("Error in NonBlockingReader thread", t);
} finally {
Log.debug("NonBlockingReader shutdown");
synchronized (this) {
thread = null;
threadIsReading = false;
}
}
}
public synchronized void clear() throws IOException {
while (ready()) {
read();
}
}
}