/*
 * Copyright (c) 2002-2018, the original author or authors.
 *
 * This software is distributable under the BSD license. See the terms of the
 * BSD license in the documentation provided with this software.
 *
 * https://opensource.org/licenses/BSD-3-Clause
 */
package jdk.internal.org.jline.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

This class wraps a regular input stream and allows it to appear as if it is non-blocking; that is, reads can be performed against it that timeout if no data is seen for a period of time. This effect is achieved by having a separate thread perform all non-blocking read requests and then waiting on the thread to complete.

VERY IMPORTANT NOTES

  • This class is not thread safe. It expects at most one reader.
  • The shutdown() method must be called in order to shut down the thread that handles blocking I/O.
/** * This class wraps a regular input stream and allows it to appear as if it * is non-blocking; that is, reads can be performed against it that timeout * if no data is seen for a period of time. This effect is achieved by having * a separate thread perform all non-blocking read requests and then * waiting on the thread to complete. * * <p>VERY IMPORTANT NOTES * <ul> * <li> This class is not thread safe. It expects at most one reader. * <li> The {@link #shutdown()} method must be called in order to shut down * the thread that handles blocking I/O. * </ul> */
public class NonBlockingInputStreamImpl extends NonBlockingInputStream { private InputStream in; // The actual input stream private int b = READ_EXPIRED; // Recently read byte private String name; private boolean threadIsReading = false; private IOException exception = null; private long threadDelay = 60 * 1000; private Thread thread;
Creates a NonBlockingReader out of a normal blocking reader. Note that this call also spawn a separate thread to perform the blocking I/O on behalf of the thread that is using this class. The shutdown() method must be called in order to shut this thread down.
Params:
  • name – The stream name
  • in – The reader to wrap
/** * Creates a <code>NonBlockingReader</code> out of a normal blocking * reader. Note that this call also spawn a separate thread to perform the * blocking I/O on behalf of the thread that is using this class. The * {@link #shutdown()} method must be called in order to shut this thread down. * @param name The stream name * @param in The reader to wrap */
public NonBlockingInputStreamImpl(String name, InputStream in) { this.in = in; this.name = name; } private synchronized void startReadingThreadIfNeeded() { if (thread == null) { thread = new Thread(this::run); thread.setName(name + " non blocking reader thread"); thread.setDaemon(true); thread.start(); } }
Shuts down the thread that is handling blocking I/O. Note that if the thread is currently blocked waiting for I/O it will not actually shut down until the I/O is received.
/** * Shuts down the thread that is handling blocking I/O. Note that if the * thread is currently blocked waiting for I/O it will not actually * shut down until the I/O is received. */
public synchronized void shutdown() { if (thread != null) { notify(); } } @Override public void close() throws IOException { /* * The underlying input stream is closed first. This means that if the * I/O thread was blocked waiting on input, it will be woken for us. */ in.close(); shutdown(); }
Attempts to read a byte from the input stream for a specific period of time.
Params:
  • timeout – The amount of time to wait for the character
  • isPeek – trueif the byte read must not be consumed
Throws:
Returns:The byte read, -1 if EOF is reached, or -2 if the read timed out.
/** * Attempts to read a byte from the input stream for a specific * period of time. * @param timeout The amount of time to wait for the character * @param isPeek <code>true</code>if the byte read must not be consumed * @return The byte read, -1 if EOF is reached, or -2 if the * read timed out. * @throws IOException if anything wrong happens */
public synchronized int read(long timeout, boolean isPeek) throws IOException { /* * If the thread hit an IOException, we report it. */ if (exception != null) { assert b == READ_EXPIRED; IOException toBeThrown = exception; if (!isPeek) exception = null; throw toBeThrown; } /* * If there was a pending character from the thread, then * we send it. If the timeout is 0L or the thread was shut down * then do a local read. */ if (b >= -1) { assert exception == null; } else if (!isPeek && timeout <= 0L && !threadIsReading) { b = in.read(); } else { /* * If the thread isn't reading already, then ask it to do so. */ if (!threadIsReading) { threadIsReading = true; startReadingThreadIfNeeded(); notifyAll(); } boolean isInfinite = (timeout <= 0L); /* * So the thread is currently doing the reading for us. So * now we play the waiting game. */ while (isInfinite || timeout > 0L) { long start = System.currentTimeMillis (); try { if (Thread.interrupted()) { throw new InterruptedException(); } wait(timeout); } catch (InterruptedException e) { exception = (IOException) new InterruptedIOException().initCause(e); } if (exception != null) { assert b == READ_EXPIRED; IOException toBeThrown = exception; if (!isPeek) exception = null; throw toBeThrown; } if (b >= -1) { assert exception == null; break; } if (!isInfinite) { timeout -= System.currentTimeMillis() - start; } } } /* * b is the character that was just read. Either we set it because * a local read was performed or the read thread set it (or failed to * change it). We will return it's value, but if this was a peek * operation, then we leave it in place. */ int ret = b; if (!isPeek) { b = READ_EXPIRED; } return ret; } private void run () { Log.debug("NonBlockingInputStream start"); boolean needToRead; try { while (true) { /* * Synchronize to grab variables accessed by both this thread * and the accessing thread. */ synchronized (this) { needToRead = this.threadIsReading; try { /* * Nothing to do? Then wait. */ if (!needToRead) { wait(threadDelay); } } catch (InterruptedException e) { /* IGNORED */ } needToRead = this.threadIsReading; if (!needToRead) { return; } } /* * We're not shutting down, but we need to read. This cannot * happen while we are holding the lock (which we aren't now). */ int byteRead = READ_EXPIRED; IOException failure = null; try { byteRead = in.read(); } catch (IOException e) { failure = e; } /* * Re-grab the lock to update the state. */ synchronized (this) { exception = failure; b = byteRead; threadIsReading = false; notify(); } // If end of stream, exit the loop thread if (byteRead < 0) { return; } } } catch (Throwable t) { Log.warn("Error in NonBlockingInputStream thread", t); } finally { Log.debug("NonBlockingInputStream shutdown"); synchronized (this) { thread = null; threadIsReading = false; } } } }