/*
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.websocket.RawChannel;

import java.io.EOFException;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.lang.System.Logger.Level;

/*
 * I/O abstraction used to implement WebSocket.
 *
 */
public class RawChannelTube implements RawChannel {

    final HttpConnection connection;
    final FlowTube tube;
    final WritePublisher writePublisher;
    final ReadSubscriber readSubscriber;
    final Supplier<ByteBuffer> initial;
    final AtomicBoolean inited = new AtomicBoolean();
    final AtomicBoolean outputClosed = new AtomicBoolean();
    final AtomicBoolean inputClosed = new AtomicBoolean();
    final AtomicBoolean closed = new AtomicBoolean();
    final String dbgTag;
    final Logger debug;
    private static final Cleaner cleaner =
            Utils.ASSERTIONSENABLED  && Utils.DEBUG_WS ? Cleaner.create() : null;

    RawChannelTube(HttpConnection connection,
                   Supplier<ByteBuffer> initial) {
        this.connection = connection;
        this.tube = connection.getConnectionFlow();
        this.initial = initial;
        this.writePublisher = new WritePublisher();
        this.readSubscriber = new ReadSubscriber();
        dbgTag = "[WebSocket] RawChannelTube(" + tube.toString() +")";
        debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
        connection.client().webSocketOpen();
        connectFlows();
        if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
            // this is just for debug...
            cleaner.register(this, new CleanupChecker(closed, debug));
        }
    }

    // Make sure no back reference to RawChannelTube can exist
    // from this class. In particular it would be dangerous
    // to reference connection, since connection has a reference
    // to SocketTube with which a RawChannelTube is registered.
    // Ditto for HttpClientImpl, which might have a back reference
    // to the connection.
    static final class CleanupChecker implements Runnable {
        final AtomicBoolean closed;
        final System.Logger debug;
        CleanupChecker(AtomicBoolean closed, System.Logger debug) {
            this.closed = closed;
            this.debug = debug;
        }

        @Override
        public void run() {
            if (!closed.get()) {
                debug.log(Level.DEBUG,
                         "RawChannelTube was not closed before being released");
            }
        }
    }

    private void connectFlows() {
        if (debug.on()) debug.log("connectFlows");
        tube.connectFlows(writePublisher, readSubscriber);
    }

    class WriteSubscription implements Flow.Subscription {
        final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
        final Demand demand = new Demand();
        volatile boolean cancelled;
        WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            this.subscriber = subscriber;
        }
        @Override
        public void request(long n) {
            if (debug.on()) debug.log("WriteSubscription::request %d", n);
            demand.increase(n);
            RawEvent event;
            while ((event = writePublisher.events.poll()) != null) {
                if (debug.on()) debug.log("WriteSubscriber: handling event");
                event.handle();
                if (demand.isFulfilled()) break;
            }
        }
        @Override
        public void cancel() {
            cancelled = true;
            if (debug.on()) debug.log("WriteSubscription::cancel");
            shutdownOutput();
            RawEvent event;
            while ((event = writePublisher.events.poll()) != null) {
                if (debug.on()) debug.log("WriteSubscriber: handling event");
                event.handle();
            }
        }
    }

    class WritePublisher implements FlowTube.TubePublisher {
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
        volatile WriteSubscription writeSubscription;
        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            if (debug.on()) debug.log("WritePublisher::subscribe");
            WriteSubscription subscription = new WriteSubscription(subscriber);
            subscriber.onSubscribe(subscription);
            writeSubscription = subscription;
        }
    }

    class ReadSubscriber implements  FlowTube.TubeSubscriber {

        volatile Flow.Subscription readSubscription;
        volatile boolean completed;
        long initialRequest;
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
        final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
        final AtomicReference<Throwable> errorRef = new AtomicReference<>();

        void checkEvents() {
            Flow.Subscription subscription = readSubscription;
            if (subscription != null) {
                Throwable error = errorRef.get();
                while (!buffers.isEmpty() || error != null || closed.get() || completed) {
                    RawEvent event = events.poll();
                    if (event == null) break;
                    if (debug.on()) debug.log("ReadSubscriber: handling event");
                    event.handle();
                }
            }
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            //buffers.add(initial.get());
            long n;
            synchronized (this) {
                readSubscription = subscription;
                n = initialRequest;
                initialRequest = 0;
            }
            if (debug.on()) debug.log("ReadSubscriber::onSubscribe");
            if (n > 0) {
                Throwable error = errorRef.get();
                if (error == null && !closed.get() && !completed) {
                    if (debug.on()) debug.log("readSubscription: requesting " + n);
                    subscription.request(n);
                }
            }
            checkEvents();
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "
                    + Utils.remaining(item) + " bytes");
            buffers.addAll(item);
            checkEvents();
        }

        @Override
        public void onError(Throwable throwable) {
            if (closed.get() || errorRef.compareAndSet(null, throwable)) {
                if (debug.on()) debug.log("ReadSubscriber::onError", throwable);
                if (buffers.isEmpty()) {
                    checkEvents();
                    shutdownInput();
                }
            }
        }

        @Override
        public void onComplete() {
            if (debug.on()) debug.log("ReadSubscriber::onComplete");
            completed = true;
            if (buffers.isEmpty()) {
                checkEvents();
                shutdownInput();
            }
        }
    }


    /*
     * Registers given event whose callback will be called once only (i.e.
     * register new event for each callback).
     *
     * Memory consistency effects: actions in a thread calling registerEvent
     * happen-before any subsequent actions in the thread calling event.handle
     */
    public void registerEvent(RawEvent event) throws IOException {
        int interestOps = event.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            if (debug.on()) debug.log("register write event");
            if (outputClosed.get()) throw new IOException("closed output");
            writePublisher.events.add(event);
            WriteSubscription writeSubscription = writePublisher.writeSubscription;
            if (writeSubscription != null) {
                while (!writeSubscription.demand.isFulfilled()) {
                    event = writePublisher.events.poll();
                    if (event == null) break;
                    event.handle();
                }
            }
        }
        if ((interestOps & SelectionKey.OP_READ) != 0) {
            if (debug.on()) debug.log("register read event");
            if (inputClosed.get()) throw new IOException("closed input");
            readSubscriber.events.add(event);
            readSubscriber.checkEvents();
            if (readSubscriber.buffers.isEmpty()
                    && !readSubscriber.events.isEmpty()) {
                Flow.Subscription readSubscription =
                        readSubscriber.readSubscription;
                if (readSubscription == null) {
                    synchronized (readSubscriber) {
                        readSubscription = readSubscriber.readSubscription;
                        if (readSubscription == null) {
                            readSubscriber.initialRequest = 1;
                            return;
                        }
                    }
                }
                assert  readSubscription != null;
                if (debug.on()) debug.log("readSubscription: requesting 1");
                readSubscription.request(1);
            }
        }
    }

    
Hands over the initial bytes. Once the bytes have been returned they are no longer available and the method will throw an IllegalStateException on each subsequent invocation.
Throws:
Returns:the initial bytes
/** * Hands over the initial bytes. Once the bytes have been returned they are * no longer available and the method will throw an {@link * IllegalStateException} on each subsequent invocation. * * @return the initial bytes * @throws IllegalStateException * if the method has been already invoked */
public ByteBuffer initialByteBuffer() throws IllegalStateException { if (inited.compareAndSet(false, true)) { return initial.get(); } else throw new IllegalStateException("initial buffer already drained"); } /* * Returns a ByteBuffer with the data read or null if EOF is reached. Has no * remaining bytes if no data available at the moment. */ public ByteBuffer read() throws IOException { if (debug.on()) debug.log("read"); Flow.Subscription readSubscription = readSubscriber.readSubscription; if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER; ByteBuffer buffer = readSubscriber.buffers.poll(); if (buffer != null) { if (debug.on()) debug.log("read: " + buffer.remaining()); return buffer; } Throwable error = readSubscriber.errorRef.get(); if (error != null) error = Utils.getIOException(error); if (error instanceof EOFException) { if (debug.on()) debug.log("read: EOFException"); shutdownInput(); return null; } if (error != null) { if (debug.on()) debug.log("read: " + error); if (closed.get()) { return null; } shutdownInput(); throw Utils.getIOException(error); } if (readSubscriber.completed) { if (debug.on()) debug.log("read: EOF"); shutdownInput(); return null; } if (inputClosed.get()) { if (debug.on()) debug.log("read: CLOSED"); throw new IOException("closed output"); } if (debug.on()) debug.log("read: nothing to read"); return Utils.EMPTY_BYTEBUFFER; } /* * Writes a sequence of bytes to this channel from a subsequence of the * given buffers. */ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { if (outputClosed.get()) { if (debug.on()) debug.log("write: CLOSED"); throw new IOException("closed output"); } WriteSubscription writeSubscription = writePublisher.writeSubscription; if (writeSubscription == null) { if (debug.on()) debug.log("write: unsubscribed: 0"); return 0; } if (writeSubscription.cancelled) { if (debug.on()) debug.log("write: CANCELLED"); shutdownOutput(); throw new IOException("closed output"); } if (writeSubscription.demand.tryDecrement()) { List<ByteBuffer> buffers = copy(srcs, offset, length); long res = Utils.remaining(buffers); if (debug.on()) debug.log("write: writing %d", res); writeSubscription.subscriber.onNext(buffers); return res; } else { if (debug.on()) debug.log("write: no demand: 0"); return 0; } }
Shutdown the connection for reading without closing the channel.

Once shutdown for reading then further reads on the channel will return null, the end-of-stream indication. If the input side of the connection is already shutdown then invoking this method has no effect.

Throws:
/** * Shutdown the connection for reading without closing the channel. * * <p> Once shutdown for reading then further reads on the channel will * return {@code null}, the end-of-stream indication. If the input side of * the connection is already shutdown then invoking this method has no * effect. * * @throws ClosedChannelException * If this channel is closed * @throws IOException * If some other I/O error occurs */
public void shutdownInput() { if (inputClosed.compareAndSet(false, true)) { if (debug.on()) debug.log("shutdownInput"); // TransportImpl will eventually call RawChannel::close. // We must not call it here as this would close the socket // and can cause an exception to back fire before // TransportImpl and WebSocketImpl have updated their state. } }
Shutdown the connection for writing without closing the channel.

Once shutdown for writing then further attempts to write to the channel will throw ClosedChannelException. If the output side of the connection is already shutdown then invoking this method has no effect.

Throws:
/** * Shutdown the connection for writing without closing the channel. * * <p> Once shutdown for writing then further attempts to write to the * channel will throw {@link ClosedChannelException}. If the output side of * the connection is already shutdown then invoking this method has no * effect. * * @throws ClosedChannelException * If this channel is closed * @throws IOException * If some other I/O error occurs */
public void shutdownOutput() { if (outputClosed.compareAndSet(false, true)) { if (debug.on()) debug.log("shutdownOutput"); // TransportImpl will eventually call RawChannel::close. // We must not call it here as this would close the socket // and can cause an exception to back fire before // TransportImpl and WebSocketImpl have updated their state. } }
Closes this channel.
Throws:
  • IOException – If an I/O error occurs
/** * Closes this channel. * * @throws IOException * If an I/O error occurs */
@Override public void close() { if (closed.compareAndSet(false, true)) { if (debug.on()) debug.log("close"); connection.client().webSocketClose(); connection.close(); } } private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) { int count = Math.min(len, src.length - offset); if (count <= 0) return Utils.EMPTY_BB_LIST; if (count == 1) return List.of(Utils.copy(src[offset])); if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1])); List<ByteBuffer> list = new ArrayList<>(count); for (int i = 0; i < count; i++) { list.add(Utils.copy(src[offset + i])); } return list; } }