/*
 * Copyright (c) 2002-2017, the original author or authors.
 *
 * This software is distributable under the BSD license. See the terms of the
 * BSD license in the documentation provided with this software.
 *
 * http://www.opensource.org/licenses/bsd-license.php
 */
package jdk.internal.org.jline.utils;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class NonBlockingPumpInputStream extends NonBlockingInputStream {

    private static final int DEFAULT_BUFFER_SIZE = 4096;

    // Read and write buffer are backed by the same array
    private final ByteBuffer readBuffer;
    private final ByteBuffer writeBuffer;

    private final OutputStream output;

    private boolean closed;

    private IOException ioException;

    public NonBlockingPumpInputStream() {
        this(DEFAULT_BUFFER_SIZE);
    }

    public NonBlockingPumpInputStream(int bufferSize) {
        byte[] buf = new byte[bufferSize];
        this.readBuffer = ByteBuffer.wrap(buf);
        this.writeBuffer = ByteBuffer.wrap(buf);
        this.output = new NbpOutputStream();
        // There are no bytes available to read after initialization
        readBuffer.limit(0);
    }

    public OutputStream getOutputStream() {
        return this.output;
    }

    private int wait(ByteBuffer buffer, long timeout) throws IOException {
        boolean isInfinite = (timeout <= 0L);
        long end = 0;
        if (!isInfinite) {
            end = System.currentTimeMillis() + timeout;
        }
        while (!closed && !buffer.hasRemaining() && (isInfinite || timeout > 0L)) {
            // Wake up waiting readers/writers
            notifyAll();
            try {
                wait(timeout);
                checkIoException();
            } catch (InterruptedException e) {
                checkIoException();
                throw new InterruptedIOException();
            }
            if (!isInfinite) {
                timeout = end - System.currentTimeMillis();
            }
        }
        return buffer.hasRemaining()
                ? 0
                : closed
                    ? EOF
                    : READ_EXPIRED;
    }

    private static boolean rewind(ByteBuffer buffer, ByteBuffer other) {
        // Extend limit of other buffer if there is additional input/output available
        if (buffer.position() > other.position()) {
            other.limit(buffer.position());
        }
        // If we have reached the end of the buffer, rewind and set the new limit
        if (buffer.position() == buffer.capacity()) {
            buffer.rewind();
            buffer.limit(other.position());
            return true;
        } else {
            return false;
        }
    }

    public synchronized int available() {
        int count = readBuffer.remaining();
        if (writeBuffer.position() < readBuffer.position()) {
            count += writeBuffer.position();
        }
        return count;
    }

    @Override
    public synchronized int read(long timeout, boolean isPeek) throws IOException {
        checkIoException();
        // Blocks until more input is available or the reader is closed.
        int res = wait(readBuffer, timeout);
        if (res >= 0) {
            res = readBuffer.get() & 0x00FF;
        }
        rewind(readBuffer, writeBuffer);
        return res;
    }

    public synchronized void setIoException(IOException exception) {
        this.ioException = exception;
        notifyAll();
    }

    protected synchronized void checkIoException() throws IOException {
        if (ioException != null) {
            throw ioException;
        }
    }

    synchronized void write(byte[] cbuf, int off, int len) throws IOException {
        while (len > 0) {
            // Blocks until there is new space available for buffering or the
            // reader is closed.
            if (wait(writeBuffer, 0L) == EOF) {
                throw new ClosedException();
            }
            // Copy as much characters as we can
            int count = Math.min(len, writeBuffer.remaining());
            writeBuffer.put(cbuf, off, count);
            off += count;
            len -= count;
            // Update buffer states and rewind if necessary
            rewind(writeBuffer, readBuffer);
        }
    }

    synchronized void flush() {
        // Avoid waking up readers when there is nothing to read
        if (readBuffer.hasRemaining()) {
            // Notify readers
            notifyAll();
        }
    }

    @Override
    public synchronized void close() throws IOException {
        this.closed = true;
        notifyAll();
    }

    private class NbpOutputStream extends OutputStream {

        @Override
        public void write(int b) throws IOException {
            NonBlockingPumpInputStream.this.write(new byte[] { (byte) b }, 0, 1);
        }

        @Override
        public void write(byte[] cbuf, int off, int len) throws IOException {
            NonBlockingPumpInputStream.this.write(cbuf, off, len);
        }

        @Override
        public void flush() throws IOException {
            NonBlockingPumpInputStream.this.flush();
        }

        @Override
        public void close() throws IOException {
            NonBlockingPumpInputStream.this.close();
        }

    }

}