/*
 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http.websocket;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.websocket.OpeningHandshake.Result;

import java.io.IOException;
import java.lang.ref.Reference;
import java.net.ProtocolException;
import java.net.URI;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.CLOSE;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.ERROR;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.IDLE;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.OPEN;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.PING;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.PONG;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.TEXT;
import static jdk.internal.net.http.websocket.WebSocketImpl.State.WAITING;

/*
 * A WebSocket client.
 */
public final class WebSocketImpl implements WebSocket {

    private static final Logger debug =
            Utils.getWebSocketLogger("[WebSocket]"::toString, Utils.DEBUG_WS);
    private final AtomicLong sendCounter = new AtomicLong();
    private final AtomicLong receiveCounter = new AtomicLong();

    enum State {
        OPEN,
        IDLE,
        WAITING,
        TEXT,
        BINARY,
        PING,
        PONG,
        CLOSE,
        ERROR
    }

    private final AtomicReference<ByteBuffer> lastAutomaticPong = new AtomicReference<>();
    private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
    private volatile boolean inputClosed;
    private final AtomicBoolean outputClosed = new AtomicBoolean();

    private final AtomicReference<State> state = new AtomicReference<>(OPEN);

    /* Components of calls to Listener's methods */
    private boolean last;
    private ByteBuffer binaryData;
    private CharSequence text;
    private int statusCode;
    private String reason;
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    private final URI uri;
    private final String subprotocol;
    private final Listener listener;

    private final AtomicBoolean pendingTextOrBinary = new AtomicBoolean();
    private final AtomicBoolean pendingPingOrPong = new AtomicBoolean();
    private final Transport transport;
    private final SequentialScheduler receiveScheduler
            = new SequentialScheduler(new ReceiveTask());
    private final Demand demand = new Demand();

    public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
        Function<Result, WebSocket> newWebSocket = r -> {
            WebSocket ws = newInstance(b.getUri(),
                                       r.subprotocol,
                                       b.getListener(),
                                       r.transport);
            // Make sure we don't release the builder until this lambda
            // has been executed. The builder has a strong reference to
            // the HttpClientFacade, and we want to keep that live until
            // after the raw channel is created and passed to WebSocketImpl.
            Reference.reachabilityFence(b);
            return ws;
        };
        OpeningHandshake h;
        try {
            h = new OpeningHandshake(b);
        } catch (Throwable e) {
            return failedFuture(e);
        }
        return h.send().thenApply(newWebSocket);
    }

    /* Exposed for testing purposes */
    static WebSocketImpl newInstance(URI uri,
                                     String subprotocol,
                                     Listener listener,
                                     TransportFactory transport) {
        WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
        // This initialisation is outside of the constructor for the sake of
        // safe publication of WebSocketImpl.this
        ws.signalOpen();
        return ws;
    }

    private WebSocketImpl(URI uri,
                          String subprotocol,
                          Listener listener,
                          TransportFactory transportFactory) {
        this.uri = requireNonNull(uri);
        this.subprotocol = requireNonNull(subprotocol);
        this.listener = requireNonNull(listener);
        // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
        // 2 automatic Ping replies + 1 automatic Close = 6 messages
        // Why 2 automatic Pong replies? One is being sent, but the byte buffer
        // has been set to null, another just has been added.
        this.transport = transportFactory.createTransport(new MessageQueue(6),
                new SignallingMessageConsumer());
    }

    // FIXME: add to action handling of errors -> signalError()

    @Override
    public CompletableFuture<WebSocket> sendText(CharSequence message,
                                                 boolean last) {
        Objects.requireNonNull(message);
        long id = 0;
        if (debug.on()) {
            id = sendCounter.incrementAndGet();
            debug.log("enter send text %s payload length=%s last=%s",
                      id, message.length(), last);
        }
        CompletableFuture<WebSocket> result;
        if (!setPendingTextOrBinary()) {
            result = failedFuture(new IllegalStateException("Send pending"));
        } else {
            result = transport.sendText(message, last, this,
                                        (r, e) -> clearPendingTextOrBinary());
        }
        if (debug.on()) {
            debug.log("exit send text %s returned %s", id, result);
        }

        return replaceNull(result);
    }

    @Override
    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
                                                   boolean last) {
        Objects.requireNonNull(message);
        long id = 0;
        if (debug.on()) {
            id = sendCounter.incrementAndGet();
            debug.log("enter send binary %s payload=%s last=%s",
                      id, message, last);
        }
        CompletableFuture<WebSocket> result;
        if (!setPendingTextOrBinary()) {
            result = failedFuture(new IllegalStateException("Send pending"));
        } else {
            result = transport.sendBinary(message, last, this,
                                          (r, e) -> clearPendingTextOrBinary());
        }
        if (debug.on()) {
            debug.log("exit send binary %s returned %s", id, result);
        }
        return replaceNull(result);
    }

    private void clearPendingTextOrBinary() {
        pendingTextOrBinary.set(false);
    }

    private boolean setPendingTextOrBinary() {
        return pendingTextOrBinary.compareAndSet(false, true);
    }

    private CompletableFuture<WebSocket> replaceNull(
            CompletableFuture<WebSocket> cf)
    {
        if (cf == null) {
            return DONE;
        } else {
            return cf;
        }
    }

    @Override
    public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
        Objects.requireNonNull(message);
        long id = 0;
        if (debug.on()) {
            id = sendCounter.incrementAndGet();
            debug.log("enter send ping %s payload=%s", id, message);
        }
        CompletableFuture<WebSocket> result;
        if (!setPendingPingOrPong()) {
            result = failedFuture(new IllegalStateException("Send pending"));
        } else {
            result = transport.sendPing(message, this,
                                        (r, e) -> clearPendingPingOrPong());
        }
        if (debug.on()) {
            debug.log("exit send ping %s returned %s", id, result);
        }
        return replaceNull(result);
    }

    @Override
    public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
        Objects.requireNonNull(message);
        long id = 0;
        if (debug.on()) {
            id = sendCounter.incrementAndGet();
            debug.log("enter send pong %s payload=%s", id, message);
        }
        CompletableFuture<WebSocket> result;
        if (!setPendingPingOrPong()) {
            result = failedFuture(new IllegalStateException("Send pending"));
        } else {
            result =  transport.sendPong(message, this,
                                         (r, e) -> clearPendingPingOrPong());
        }
        if (debug.on()) {
            debug.log("exit send pong %s returned %s", id, result);
        }
        return replaceNull(result);
    }

    private boolean setPendingPingOrPong() {
        return pendingPingOrPong.compareAndSet(false, true);
    }

    private void clearPendingPingOrPong() {
        pendingPingOrPong.set(false);
    }

    @Override
    public CompletableFuture<WebSocket> sendClose(int statusCode,
                                                  String reason) {
        Objects.requireNonNull(reason);
        long id = 0;
        if (debug.on()) {
            id = sendCounter.incrementAndGet();
            debug.log("enter send close %s statusCode=%s reason.length=%s",
                      id, statusCode, reason.length());
        }
        CompletableFuture<WebSocket> result;
        // Close message is the only type of message whose validity is checked
        // in the corresponding send method. This is made in order to close the
        // output in place. Otherwise the number of Close messages in queue
        // would not be bounded.
        if (!isLegalToSendFromClient(statusCode)) {
            result = failedFuture(new IllegalArgumentException("statusCode"));
        } else if (!isLegalReason(reason)) {
            result = failedFuture(new IllegalArgumentException("reason"));
        } else if (!outputClosed.compareAndSet(false, true)){
            result = failedFuture(new IOException("Output closed"));
        } else {
            result = sendClose0(statusCode, reason);
        }
        if (debug.on()) {
            debug.log("exit send close %s returned %s", id, result);
        }
        return replaceNull(result);
    }

    private static boolean isLegalReason(String reason) {
        if (reason.length() > 123) { // quick check
            return false;
        }
        CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder()
                        .onMalformedInput(CodingErrorAction.REPORT)
                        .onUnmappableCharacter(CodingErrorAction.REPORT);
        ByteBuffer bytes;
        try {
            bytes = encoder.encode(CharBuffer.wrap(reason));
        } catch (CharacterCodingException ignored) {
            return false;
        }
        return bytes.remaining() <= 123;
    }

    /*
     * The implementation uses this method internally to send Close messages
     * with codes that are not allowed to be sent through the API.
     */
    private CompletableFuture<WebSocket> sendClose0(int statusCode,
                                                    String reason) {
        return transport.sendClose(statusCode, reason, this,
                                   (r, e) -> processCloseError(e));
    }

    private void processCloseError(Throwable e) {
        if (e == null) {
            debug.log("send close completed successfully");
        } else {
            debug.log("send close completed with error", e);
        }
        outputClosed.set(true);
        try {
            transport.closeOutput();
        } catch (IOException ignored) { }
    }

    @Override
    public void request(long n) {
        if (debug.on()) {
            debug.log("request %s", n);
        }
        if (demand.increase(n)) {
            receiveScheduler.runOrSchedule();
        }
    }

    @Override
    public String getSubprotocol() {
        return subprotocol;
    }

    @Override
    public boolean isOutputClosed() {
        return outputClosed.get();
    }

    @Override
    public boolean isInputClosed() {
        return inputClosed;
    }

    @Override
    public void abort() {
        if (debug.on()) {
            debug.log("abort");
        }
        inputClosed = true;
        outputClosed.set(true);
        receiveScheduler.stop();
        close();
    }

    @Override
    public String toString() {
        return super.toString()
                + "[uri=" + uri
                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
                + "]";
    }

    /*
     * The assumptions about order is as follows:
     *
     *     - state is never changed more than twice inside the `run` method:
     *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
     *       overwriting parts of messages creating a mess since there's no
     *       queueing)
     *     - OPEN is always the first state
     *     - no messages are requested/delivered before onOpen is called (this
     *       is implemented by making WebSocket instance accessible first in
     *       onOpen)
     *     - after the state has been observed as CLOSE/ERROR, the scheduler
     *       is stopped
     */
    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {

        // Transport only asked here and nowhere else because we must make sure
        // onOpen is invoked first and no messages become pending before onOpen
        // finishes

        @Override
        public void run() {
            if (debug.on()) {
                debug.log("enter receive task");
            }
            loop:
            while (!receiveScheduler.isStopped()) {
                State s = state.get();
                if (debug.on()) {
                    debug.log("receive state: %s", s);
                }
                try {
                    switch (s) {
                        case OPEN:
                            processOpen();
                            tryChangeState(OPEN, IDLE);
                            break;
                        case TEXT:
                            processText();
                            tryChangeState(TEXT, IDLE);
                            break;
                        case BINARY:
                            processBinary();
                            tryChangeState(BINARY, IDLE);
                            break;
                        case PING:
                            processPing();
                            tryChangeState(PING, IDLE);
                            break;
                        case PONG:
                            processPong();
                            tryChangeState(PONG, IDLE);
                            break;
                        case CLOSE:
                            processClose();
                            break loop;
                        case ERROR:
                            processError();
                            break loop;
                        case IDLE:
                            if (demand.tryDecrement()
                                    && tryChangeState(IDLE, WAITING)) {
                                transport.request(1);
                            }
                            break loop;
                        case WAITING:
                            // For debugging spurious signalling: when there was
                            // a signal, but apparently nothing has changed
                            break loop;
                        default:
                            throw new InternalError(String.valueOf(s));
                    }
                } catch (Throwable t) {
                    signalError(t);
                }
            }
            if (debug.on()) {
                debug.log("exit receive task");
            }
        }

        private void processError() throws IOException {
            if (debug.on()) {
                debug.log("processError");
            }
            transport.closeInput();
            receiveScheduler.stop();
            Throwable err = error.get();
            if (err instanceof FailWebSocketException) {
                int code1 = ((FailWebSocketException) err).getStatusCode();
                err = new ProtocolException().initCause(err);
                if (debug.on()) {
                    debug.log("failing %s with error=%s statusCode=%s",
                              WebSocketImpl.this, err, code1);
                }
                sendCloseSilently(code1);
            }
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onError %s error=%s", id, err);
            }
            try {
                listener.onError(WebSocketImpl.this, err);
            } finally {
                if (debug.on()) {
                    debug.log("exit onError %s", id);
                }
            }
        }

        private void processClose() throws IOException {
            debug.log("processClose");
            transport.closeInput();
            receiveScheduler.stop();
            CompletionStage<?> cs = null; // when the listener is ready to close
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onClose %s statusCode=%s reason.length=%s",
                          id, statusCode, reason.length());
            }
            try {
                cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
            } finally {
                debug.log("exit onClose %s returned %s", id, cs);
            }
            if (cs == null) {
                cs = DONE;
            }
            int code;
            if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
                code = NORMAL_CLOSURE;
                debug.log("using statusCode %s instead of %s",
                          statusCode, code);

            } else {
                code = statusCode;
            }
            cs.whenComplete((r, e) -> {
                if (debug.on()) {
                    debug.log("CompletionStage returned by onClose completed result=%s error=%s",
                              r, e);
                }
                sendCloseSilently(code);
            });
        }

        private void processPong() {
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onPong %s payload=%s",
                          id, binaryData);
            }
            CompletionStage<?> cs = null;
            try {
                cs = listener.onPong(WebSocketImpl.this, binaryData);
            } finally {
                if (debug.on()) {
                    debug.log("exit onPong %s returned %s", id, cs);
                }
            }
        }

        private void processPing() {
            if (debug.on()) {
                debug.log("processPing");
            }
            // A full copy of this (small) data is made. This way sending a
            // replying Pong could be done in parallel with the listener
            // handling this Ping.
            ByteBuffer slice = binaryData.slice();
            if (!outputClosed.get()) {
                ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
                        .put(binaryData)
                        .flip();
                if (!trySwapAutomaticPong(copy)) {
                    // Non-exclusive send;
                    BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
                        if (e != null) { // TODO: better error handing. What if already closed?
                            signalError(Utils.getCompletionCause(e));
                        }
                    };
                    transport.sendPong(WebSocketImpl.this::clearAutomaticPong,
                                       WebSocketImpl.this,
                                       reporter);
                }
            }
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onPing %s payload=%s", id, slice);
            }
            CompletionStage<?> cs = null;
            try {
                cs = listener.onPing(WebSocketImpl.this, slice);
            } finally {
                if (debug.on()) {
                    debug.log("exit onPing %s returned %s", id, cs);
                }
            }
        }

        private void processBinary() {
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onBinary %s payload=%s last=%s",
                          id, binaryData, last);
            }
            CompletionStage<?> cs = null;
            try {
                cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
            } finally {
                if (debug.on()) {
                    debug.log("exit onBinary %s returned %s", id, cs);
                }
            }
        }

        private void processText() {
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onText %s payload.length=%s last=%s",
                          id, text.length(), last);
            }
            CompletionStage<?> cs = null;
            try {
                cs = listener.onText(WebSocketImpl.this, text, last);
            } finally {
                if (debug.on()) {
                    debug.log("exit onText %s returned %s", id, cs);
                }
            }
        }

        private void processOpen() {
            long id = 0;
            if (debug.on()) {
                id = receiveCounter.incrementAndGet();
                debug.log("enter onOpen %s", id);
            }
            try {
                listener.onOpen(WebSocketImpl.this);
            } finally {
                if (debug.on()) {
                    debug.log("exit onOpen %s", id);
                }
            }
        }
    }

    private void sendCloseSilently(int statusCode) {
        sendClose0(statusCode, "").whenComplete((r, e) -> {
            if (e != null) {
                if (debug.on()) {
                    debug.log("automatic closure completed with error",
                              (Object) e);
                }
            }
        });
    }

    private ByteBuffer clearAutomaticPong() {
        ByteBuffer data;
        do {
            data = lastAutomaticPong.get();
            if (data == null) {
                // This method must never be called unless a message that is
                // using it has been added previously
                throw new InternalError();
            }
        } while (!lastAutomaticPong.compareAndSet(data, null));
        return data;
    }

    // bound pings
    private boolean trySwapAutomaticPong(ByteBuffer copy) {
        ByteBuffer message;
        boolean swapped;
        while (true) {
            message = lastAutomaticPong.get();
            if (message == null) {
                if (!lastAutomaticPong.compareAndSet(null, copy)) {
                    // It's only this method that can change null to ByteBuffer,
                    // and this method is invoked at most by one thread at a
                    // time. Thus no failure in the atomic operation above is
                    // expected.
                    throw new InternalError();
                }
                swapped = false;
                break;
            } else if (lastAutomaticPong.compareAndSet(message, copy)) {
                swapped = true;
                break;
            }
        }
        if (debug.on()) {
            debug.log("swapped automatic pong from %s to %s",
                      message, copy);
        }
        return swapped;
    }

    private void signalOpen() {
        debug.log("signalOpen");
        receiveScheduler.runOrSchedule();
    }

    private void signalError(Throwable error) {
        if (debug.on()) {
            debug.log("signalError %s", (Object) error);
        }
        inputClosed = true;
        outputClosed.set(true);
        if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
            if (debug.on()) {
                debug.log("signalError", error);
            }
            Log.logError(error);
        } else {
            close();
        }
    }

    private void close() {
        if (debug.on()) {
            debug.log("close");
        }
        Throwable first = null;
        try {
            transport.closeInput();
        } catch (Throwable t1) {
            first = t1;
        } finally {
            Throwable second = null;
            try {
                transport.closeOutput();
            } catch (Throwable t2) {
                second = t2;
            } finally {
                Throwable e = null;
                if (first != null && second != null) {
                    first.addSuppressed(second);
                    e = first;
                } else if (first != null) {
                    e = first;
                } else if (second != null) {
                    e = second;
                }
                if (e != null) {
                    if (debug.on()) {
                        debug.log("exception in close", e);
                    }
                }
            }
        }
    }

    private void signalClose(int statusCode, String reason) {
        // FIXME: make sure no race reason & close are not intermixed
        inputClosed = true;
        this.statusCode = statusCode;
        this.reason = reason;
        boolean managed = trySetState(CLOSE);
        if (debug.on()) {
            debug.log("signalClose statusCode=%s reason.length=%s: %s",
                      statusCode, reason.length(), managed);
        }
        if (managed) {
            try {
                transport.closeInput();
            } catch (Throwable t) {
                if (debug.on()) {
                    debug.log("exception closing input", (Object) t);
                }
            }
        }
    }

    private class SignallingMessageConsumer implements MessageStreamConsumer {

        @Override
        public void onText(CharSequence data, boolean last) {
            transport.acknowledgeReception();
            text = data;
            WebSocketImpl.this.last = last;
            tryChangeState(WAITING, TEXT);
        }

        @Override
        public void onBinary(ByteBuffer data, boolean last) {
            transport.acknowledgeReception();
            binaryData = data;
            WebSocketImpl.this.last = last;
            tryChangeState(WAITING, BINARY);
        }

        @Override
        public void onPing(ByteBuffer data) {
            transport.acknowledgeReception();
            binaryData = data;
            tryChangeState(WAITING, PING);
        }

        @Override
        public void onPong(ByteBuffer data) {
            transport.acknowledgeReception();
            binaryData = data;
            tryChangeState(WAITING, PONG);
        }

        @Override
        public void onClose(int statusCode, CharSequence reason) {
            transport.acknowledgeReception();
            signalClose(statusCode, reason.toString());
        }

        @Override
        public void onComplete() {
            transport.acknowledgeReception();
            signalClose(CLOSED_ABNORMALLY, "");
        }

        @Override
        public void onError(Throwable error) {
            signalError(error);
        }
    }

    private boolean trySetState(State newState) {
        State currentState;
        boolean success = false;
        while (true) {
            currentState = state.get();
            if (currentState == ERROR || currentState == CLOSE) {
                break;
            } else if (state.compareAndSet(currentState, newState)) {
                receiveScheduler.runOrSchedule();
                success = true;
                break;
            }
        }
        if (debug.on()) {
            debug.log("set state %s (previous %s) %s",
                      newState, currentState, success);
        }
        return success;
    }

    private boolean tryChangeState(State expectedState, State newState) {
        State witness = state.compareAndExchange(expectedState, newState);
        boolean success = false;
        if (witness == expectedState) {
            receiveScheduler.runOrSchedule();
            success = true;
        } else if (witness != ERROR && witness != CLOSE) {
            // This should be the only reason for inability to change the state
            // from IDLE to WAITING: the state has changed to terminal
            throw new InternalError();
        }
        if (debug.on()) {
            debug.log("change state from %s to %s %s",
                      expectedState, newState, success);
        }
        return success;
    }

    /* Exposed for testing purposes */
    protected Transport transport() {
        return transport;
    }
}