/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
import jdk.internal.net.http.common.Utils;
A SocketTube is a terminal tube plugged directly into the socket. The read subscriber should call subscribe
on the SocketTube before the SocketTube is subscribed to the write publisher. /**
* A SocketTube is a terminal tube plugged directly into the socket.
* The read subscriber should call {@code subscribe} on the SocketTube before
* the SocketTube is subscribed to the write publisher.
*/
final class SocketTube implements FlowTube {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
static final AtomicLong IDS = new AtomicLong();
private final HttpClientImpl client;
private final SocketChannel channel;
private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
private final InternalWriteSubscriber writeSubscriber;
private final long id = IDS.incrementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
Supplier<ByteBuffer> buffersFactory) {
this.client = client;
this.channel = channel;
this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
this.readPublisher = new InternalReadPublisher();
this.writeSubscriber = new InternalWriteSubscriber();
}
Returns true
if this flow is finished. This happens when this flow internal read subscription is completed, either normally (EOF reading) or exceptionally (EOF writing, or underlying socket closed, or some exception occurred while reading or writing to the socket). Returns: true
if this flow is finished.
/**
* Returns {@code true} if this flow is finished.
* This happens when this flow internal read subscription is completed,
* either normally (EOF reading) or exceptionally (EOF writing, or
* underlying socket closed, or some exception occurred while reading or
* writing to the socket).
*
* @return {@code true} if this flow is finished.
*/
public boolean isFinished() {
InternalReadPublisher.InternalReadSubscription subscription =
readPublisher.subscriptionImpl;
return subscription != null && subscription.completed
|| subscription == null && errorRef.get() != null;
}
// ===================================================================== //
// Flow.Publisher //
// ======================================================================//
{@inheritDoc }
API Note: This method should be called first. In particular, the caller must ensure that this method must be called by the read subscriber before the write publisher can call onSubscribe
. Failure to adhere to this contract may result in assertion errors.
/**
* {@inheritDoc }
* @apiNote This method should be called first. In particular, the caller
* must ensure that this method must be called by the read
* subscriber before the write publisher can call {@code onSubscribe}.
* Failure to adhere to this contract may result in assertion errors.
*/
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
Objects.requireNonNull(s);
assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
readPublisher.subscribe(s);
}
// ===================================================================== //
// Flow.Subscriber //
// ======================================================================//
{@inheritDoc }
API Note: The caller must ensure that subscribe
is called by the read subscriber before onSubscribe
is called by the write publisher. Failure to adhere to this contract may result in assertion errors.
/**
* {@inheritDoc }
* @apiNote The caller must ensure that {@code subscribe} is called by
* the read subscriber before {@code onSubscribe} is called by
* the write publisher.
* Failure to adhere to this contract may result in assertion errors.
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
writeSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(List<ByteBuffer> item) {
writeSubscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
writeSubscriber.onError(throwable);
}
@Override
public void onComplete() {
writeSubscriber.onComplete();
}
// ===================================================================== //
// Events //
// ======================================================================//
void signalClosed() {
// Ensures that the subscriber will be terminated and that future
// subscribers will be notified when the connection is closed.
if (Log.channel()) {
Log.logChannel("Connection close signalled: connection closed locally ({0})",
channelDescr());
}
readPublisher.subscriptionImpl.signalError(
new IOException("connection closed locally"));
}
A restartable task used to process tasks in sequence.
/**
* A restartable task used to process tasks in sequence.
*/
private static class SocketFlowTask implements RestartableTask {
final Runnable task;
private final Object monitor = new Object();
SocketFlowTask(Runnable task) {
this.task = task;
}
@Override
public final void run(DeferredCompleter taskCompleter) {
try {
// non contentious synchronized for visibility.
synchronized(monitor) {
task.run();
}
} finally {
taskCompleter.complete();
}
}
}
// This is best effort - there's no guarantee that the printed set of values
// is consistent. It should only be considered as weakly accurate - in
// particular in what concerns the events states, especially when displaying
// a read event state from a write event callback and conversely.
void debugState(String when) {
if (debug.on()) {
StringBuilder state = new StringBuilder();
InternalReadPublisher.InternalReadSubscription sub =
readPublisher.subscriptionImpl;
InternalReadPublisher.ReadEvent readEvent =
sub == null ? null : sub.readEvent;
Demand rdemand = sub == null ? null : sub.demand;
InternalWriteSubscriber.WriteEvent writeEvent =
writeSubscriber.writeEvent;
Demand wdemand = writeSubscriber.writeDemand;
int rops = readEvent == null ? 0 : readEvent.interestOps();
long rd = rdemand == null ? 0 : rdemand.get();
int wops = writeEvent == null ? 0 : writeEvent.interestOps();
long wd = wdemand == null ? 0 : wdemand.get();
state.append(when).append(" Reading: [ops=")
.append(rops).append(", demand=").append(rd)
.append(", stopped=")
.append((sub == null ? false : sub.readScheduler.isStopped()))
.append("], Writing: [ops=").append(wops)
.append(", demand=").append(wd)
.append("]");
debug.log(state.toString());
}
}
A repeatable event that can be paused or resumed by changing its interestOps. When the event is fired, it is first paused before being signaled. It is the responsibility of the code triggered by signalEvent
to resume the event if required. /**
* A repeatable event that can be paused or resumed by changing its
* interestOps. When the event is fired, it is first paused before being
* signaled. It is the responsibility of the code triggered by
* {@code signalEvent} to resume the event if required.
*/
private static abstract class SocketFlowEvent extends AsyncEvent {
final SocketChannel channel;
final int defaultInterest;
volatile int interestOps;
volatile boolean registered;
SocketFlowEvent(int defaultInterest, SocketChannel channel) {
super(AsyncEvent.REPEATING);
this.defaultInterest = defaultInterest;
this.channel = channel;
}
final boolean registered() {return registered;}
final void resume() {
interestOps = defaultInterest;
registered = true;
}
final void pause() {interestOps = 0;}
@Override
public final SelectableChannel channel() {return channel;}
@Override
public final int interestOps() {return interestOps;}
@Override
public final void handle() {
pause(); // pause, then signal
signalEvent(); // won't be fired again until resumed.
}
@Override
public final void abort(IOException error) {
debug().log(() -> "abort: " + error);
pause(); // pause, then signal
signalError(error); // should not be resumed after abort (not checked)
}
protected abstract void signalEvent();
protected abstract void signalError(Throwable error);
abstract Logger debug();
}
// ===================================================================== //
// Writing //
// ======================================================================//
// This class makes the assumption that the publisher will call onNext
// sequentially, and that onNext won't be called if the demand has not been
// incremented by request(1).
// It has a 'queue of 1' meaning that it will call request(1) in
// onSubscribe, and then only after its 'current' buffer list has been
// fully written and current set to null;
private final class InternalWriteSubscriber
implements Flow.Subscriber<List<ByteBuffer>> {
volatile WriteSubscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
final AsyncTriggerEvent startSubscription =
new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
WriteSubscription previous = this.subscription;
if (debug.on()) debug.log("subscribed for writing");
try {
boolean needEvent = current == null;
if (needEvent) {
if (previous != null && previous.upstreamSubscription != subscription) {
previous.dropSubscription();
}
}
this.subscription = new WriteSubscription(subscription);
if (needEvent) {
if (debug.on())
debug.log("write: registering startSubscription event");
client.registerEvent(startSubscription);
}
} catch (Throwable t) {
signalError(t);
}
}
@Override
public void onNext(List<ByteBuffer> bufs) {
assert current == null : dbgString() // this is a queue of 1.
+ "w.onNext current: " + current;
assert subscription != null : dbgString()
+ "w.onNext: subscription is null";
current = bufs;
tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
// For instance in HTTP/2, a received SETTINGS frame might trigger
// the sending of a SETTINGS frame in turn which might cause
// onNext to be called from within the same selector thread that the
// original SETTINGS frames arrived on. If rs is the read-subscriber
// and ws is the write-subscriber then the following can occur:
// ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
// client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
debugState("leaving w.onNext");
}
// Don't use a SequentialScheduler here: rely on onNext() being invoked
// sequentially, and not being invoked if there is no demand, request(1).
// onNext is usually called from within a user / executor thread.
// Initial writing will be performed in that thread. If for some reason,
// not all the data can be written, a writeEvent will be registered, and
// writing will resume in the selector manager thread when the
// writeEvent is fired.
//
// If this method is invoked in the selector manager thread (because of
// a writeEvent), then the executor will be used to invoke request(1),
// ensuring that onNext() won't be invoked from within the selector
// thread. If not in the selector manager thread, then request(1) is
// invoked directly.
void tryFlushCurrent(boolean inSelectorThread) {
List<ByteBuffer> bufs = current;
if (bufs == null) return;
try {
assert inSelectorThread == client.isSelectorThread() :
"should " + (inSelectorThread ? "" : "not ")
+ " be in the selector thread";
long remaining = Utils.remaining(bufs);
if (debug.on()) debug.log("trying to write: %d", remaining);
long written = writeAvailable(bufs);
if (debug.on()) debug.log("wrote: %d", written);
assert written >= 0 : "negative number of bytes written:" + written;
assert written <= remaining;
if (remaining - written == 0) {
current = null;
if (writeDemand.tryDecrement()) {
Runnable requestMore = this::requestMore;
if (inSelectorThread) {
assert client.isSelectorThread();
client.theExecutor().execute(requestMore);
} else {
assert !client.isSelectorThread();
requestMore.run();
}
}
} else {
resumeWriteEvent(inSelectorThread);
}
} catch (Throwable t) {
signalError(t);
}
}
// Kick off the initial request:1 that will start the writing side.
// Invoked in the selector manager thread.
void startSubscription() {
try {
if (debug.on()) debug.log("write: starting subscription");
if (Log.channel()) {
Log.logChannel("Start requesting bytes for writing to channel: {0}",
channelDescr());
}
assert client.isSelectorThread();
// make sure read registrations are handled before;
readPublisher.subscriptionImpl.handlePending();
if (debug.on()) debug.log("write: offloading requestMore");
// start writing;
client.theExecutor().execute(this::requestMore);
} catch(Throwable t) {
signalError(t);
}
}
void requestMore() {
WriteSubscription subscription = this.subscription;
subscription.requestMore();
}
@Override
public void onError(Throwable throwable) {
signalError(throwable);
}
@Override
public void onComplete() {
completed = true;
// no need to pause the write event here: the write event will
// be paused if there is nothing more to write.
List<ByteBuffer> bufs = current;
long remaining = bufs == null ? 0 : Utils.remaining(bufs);
if (debug.on())
debug.log( "write completed, %d yet to send", remaining);
debugState("InternalWriteSubscriber::onComplete");
}
void resumeWriteEvent(boolean inSelectorThread) {
if (debug.on()) debug.log("scheduling write event");
resumeEvent(writeEvent, this::signalError);
}
void signalWritable() {
if (debug.on()) debug.log("channel is writable");
tryFlushCurrent(true);
}
void signalError(Throwable error) {
debug.log(() -> "write error: " + error);
if (Log.channel()) {
Log.logChannel("Failed to write to channel ({0}: {1})",
channelDescr(), error);
}
completed = true;
readPublisher.signalError(error);
Flow.Subscription subscription = this.subscription;
if (subscription != null) subscription.cancel();
}
// A repeatable WriteEvent which is paused after firing and can
// be resumed if required - see SocketFlowEvent;
final class WriteEvent extends SocketFlowEvent {
final InternalWriteSubscriber sub;
WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
super(SelectionKey.OP_WRITE, channel);
this.sub = sub;
}
@Override
protected final void signalEvent() {
try {
client.eventUpdated(this);
sub.signalWritable();
} catch(Throwable t) {
sub.signalError(t);
}
}
@Override
protected void signalError(Throwable error) {
sub.signalError(error);
}
@Override
Logger debug() { return debug; }
}
final class WriteSubscription implements Flow.Subscription {
final Flow.Subscription upstreamSubscription;
volatile boolean cancelled;
WriteSubscription(Flow.Subscription subscription) {
this.upstreamSubscription = subscription;
}
@Override
public void request(long n) {
if (cancelled) return;
upstreamSubscription.request(n);
}
@Override
public void cancel() {
if (cancelled) return;
if (debug.on()) debug.log("write: cancel");
if (Log.channel()) {
Log.logChannel("Cancelling write subscription");
}
dropSubscription();
upstreamSubscription.cancel();
}
void dropSubscription() {
synchronized (InternalWriteSubscriber.this) {
cancelled = true;
if (debug.on()) debug.log("write: resetting demand to 0");
writeDemand.reset();
}
}
void requestMore() {
try {
if (completed || cancelled) return;
boolean requestMore;
long d;
// don't fiddle with demand after cancel.
// see dropSubscription.
synchronized (InternalWriteSubscriber.this) {
if (cancelled) return;
d = writeDemand.get();
requestMore = writeDemand.increaseIfFulfilled();
}
if (requestMore) {
if (debug.on()) debug.log("write: requesting more...");
upstreamSubscription.request(1);
} else {
if (debug.on())
debug.log("write: no need to request more: %d", d);
}
} catch (Throwable t) {
if (debug.on())
debug.log("write: error while requesting more: " + t);
signalError(t);
} finally {
debugState("leaving requestMore: ");
}
}
}
}
// ===================================================================== //
// Reading //
// ===================================================================== //
// The InternalReadPublisher uses a SequentialScheduler to ensure that
// onNext/onError/onComplete are called sequentially on the caller's
// subscriber.
// However, it relies on the fact that the only time where
// runOrSchedule() is called from a user/executor thread is in signalError,
// right after the errorRef has been set.
// Because the sequential scheduler's task always checks for errors first,
// and always terminate the scheduler on error, then it is safe to assume
// that if it reaches the point where it reads from the channel, then
// it is running in the SelectorManager thread. This is because all
// other invocation of runOrSchedule() are triggered from within a
// ReadEvent.
//
// When pausing/resuming the event, some shortcuts can then be taken
// when we know we're running in the selector manager thread
// (in that case there's no need to call client.eventUpdated(readEvent);
//
private final class InternalReadPublisher
implements Flow.Publisher<List<ByteBuffer>> {
private final InternalReadSubscription subscriptionImpl
= new InternalReadSubscription();
AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
private volatile ReadSubscription subscription;
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
Objects.requireNonNull(s);
TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
ReadSubscription previous = pendingSubscription.getAndSet(target);
if (previous != null && previous != target) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
} else {
previous.subscriber.dropSubscription();
}
}
if (debug.on()) debug.log("read publisher got subscriber");
subscriptionImpl.signalSubscribe();
debugState("leaving read.subscribe: ");
}
void signalError(Throwable error) {
if (debug.on()) debug.log("error signalled " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
if (Log.channel()) {
Log.logChannel("Error signalled on channel {0}: {1}",
channelDescr(), error);
}
subscriptionImpl.handleError();
}
final class ReadSubscription implements Flow.Subscription {
final InternalReadSubscription impl;
final TubeSubscriber subscriber;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final BufferSource bufferSource;
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
this.impl = impl;
this.bufferSource = subscriber.supportsRecycling()
? new SSLDirectBufferSource(client)
: SocketTube.this.sliceBuffersSource;
this.subscriber = subscriber;
}
@Override
public void cancel() {
cancelled = true;
}
@Override
public void request(long n) {
if (!cancelled) {
impl.request(n);
} else {
if (debug.on())
debug.log("subscription cancelled, ignoring request %d", n);
}
}
void signalCompletion() {
assert subscribed || cancelled;
if (completed || cancelled) return;
synchronized (this) {
if (completed) return;
completed = true;
}
Throwable error = errorRef.get();
if (error != null) {
if (debug.on())
debug.log("forwarding error to subscriber: " + error);
subscriber.onError(error);
} else {
if (debug.on()) debug.log("completing subscriber");
subscriber.onComplete();
}
}
void signalOnSubscribe() {
if (subscribed || cancelled) return;
synchronized (this) {
if (subscribed || cancelled) return;
subscribed = true;
}
subscriber.onSubscribe(this);
if (debug.on()) debug.log("onSubscribe called");
if (errorRef.get() != null) {
signalCompletion();
}
}
}
final class InternalReadSubscription implements Flow.Subscription {
private final Demand demand = new Demand();
final SequentialScheduler readScheduler;
private volatile boolean completed;
private final ReadEvent readEvent;
private final AsyncEvent subscribeEvent;
InternalReadSubscription() {
readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
subscribeEvent = new AsyncTriggerEvent(this::signalError,
this::handleSubscribeEvent);
readEvent = new ReadEvent(channel, this);
}
/*
* This method must be invoked before any other method of this class.
*/
final void signalSubscribe() {
if (readScheduler.isStopped() || completed) {
// if already completed or stopped we can handle any
// pending connection directly from here.
if (debug.on())
debug.log("handling pending subscription while completed");
handlePending();
} else {
try {
if (debug.on()) debug.log("registering subscribe event");
client.registerEvent(subscribeEvent);
} catch (Throwable t) {
signalError(t);
handlePending();
}
}
}
final void handleSubscribeEvent() {
assert client.isSelectorThread();
debug.log("subscribe event raised");
if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
readScheduler.runOrSchedule();
if (readScheduler.isStopped() || completed) {
// if already completed or stopped we can handle any
// pending connection directly from here.
if (debug.on())
debug.log("handling pending subscription when completed");
handlePending();
}
}
/*
* Although this method is thread-safe, the Reactive-Streams spec seems
* to not require it to be as such. It's a responsibility of the
* subscriber to signal demand in a thread-safe manner.
*
* See Reactive Streams specification, rules 2.7 and 3.4.
*/
@Override
public final void request(long n) {
if (n > 0L) {
boolean wasFulfilled = demand.increase(n);
if (wasFulfilled) {
if (debug.on()) debug.log("got some demand for reading");
resumeReadEvent();
// if demand has been changed from fulfilled
// to unfulfilled register read event;
}
} else {
signalError(new IllegalArgumentException("non-positive request"));
}
debugState("leaving request("+n+"): ");
}
@Override
public final void cancel() {
pauseReadEvent();
if (Log.channel()) {
Log.logChannel("Read subscription cancelled for channel {0}",
channelDescr());
}
readScheduler.stop();
}
private void resumeReadEvent() {
if (debug.on()) debug.log("resuming read event");
resumeEvent(readEvent, this::signalError);
}
private void pauseReadEvent() {
if (debug.on()) debug.log("pausing read event");
pauseEvent(readEvent, this::signalError);
}
final void handleError() {
assert errorRef.get() != null;
readScheduler.runOrSchedule();
}
final void signalError(Throwable error) {
if (!errorRef.compareAndSet(null, error)) {
return;
}
if (debug.on()) debug.log("got read error: " + error);
if (Log.channel()) {
Log.logChannel("Read error signalled on channel {0}: {1}",
channelDescr(), error);
}
readScheduler.runOrSchedule();
}
final void signalReadable() {
readScheduler.runOrSchedule();
}
The body of the task that runs in SequentialScheduler. /** The body of the task that runs in SequentialScheduler. */
final void read() {
// It is important to only call pauseReadEvent() when stopping
// the scheduler. The event is automatically paused before
// firing, and trying to pause it again could cause a race
// condition between this loop, which calls tryDecrementDemand(),
// and the thread that calls request(n), which will try to resume
// reading.
try {
while(!readScheduler.isStopped()) {
if (completed) return;
// make sure we have a subscriber
if (handlePending()) {
if (debug.on())
debug.log("pending subscriber subscribed");
return;
}
// If an error was signaled, we might not be in the
// the selector thread, and that is OK, because we
// will just call onError and return.
ReadSubscription current = subscription;
Throwable error = errorRef.get();
if (current == null) {
assert error != null;
if (debug.on())
debug.log("error raised before subscriber subscribed: %s",
(Object)error);
return;
}
TubeSubscriber subscriber = current.subscriber;
if (error != null) {
completed = true;
// safe to pause here because we're finished anyway.
pauseReadEvent();
if (debug.on())
debug.log("Sending error " + error
+ " to subscriber " + subscriber);
if (Log.channel()) {
Log.logChannel("Raising error with subscriber for {0}: {1}",
channelDescr(), error);
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
}
// If we reach here then we must be in the selector thread.
assert client.isSelectorThread();
if (demand.tryDecrement()) {
// we have demand.
try {
List<ByteBuffer> bytes = readAvailable(current.bufferSource);
if (bytes == EOF) {
if (!completed) {
if (debug.on()) debug.log("got read EOF");
if (Log.channel()) {
Log.logChannel("EOF read from channel: {0}",
channelDescr());
}
completed = true;
// safe to pause here because we're finished
// anyway.
pauseReadEvent();
current.signalCompletion();
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
return;
} else if (Utils.remaining(bytes) > 0) {
// the subscriber is responsible for offloading
// to another thread if needed.
if (debug.on())
debug.log("read bytes: " + Utils.remaining(bytes));
assert !current.completed;
subscriber.onNext(bytes);
// we could continue looping until the demand
// reaches 0. However, that would risk starving
// other connections (bound to other socket
// channels) - as other selected keys activated
// by the selector manager thread might be
// waiting for this event to terminate.
// So resume the read event and return now...
resumeReadEvent();
debugState("leaving read() loop after onNext: ");
return;
} else {
// nothing available!
if (debug.on()) debug.log("no more bytes available");
// re-increment the demand and resume the read
// event. This ensures that this loop is
// executed again when the socket becomes
// readable again.
demand.increase(1);
resumeReadEvent();
debugState("leaving read() loop with no bytes");
return;
}
} catch (Throwable x) {
signalError(x);
continue;
}
} else {
if (debug.on()) debug.log("no more demand for reading");
// the event is paused just after firing, so it should
// still be paused here, unless the demand was just
// incremented from 0 to n, in which case, the
// event will be resumed, causing this loop to be
// invoked again when the socket becomes readable:
// This is what we want.
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
debugState("leaving read() loop with no demand");
break;
}
}
} catch (Throwable t) {
if (debug.on()) debug.log("Unexpected exception in read loop", t);
signalError(t);
} finally {
if (readScheduler.isStopped()) {
if (debug.on()) debug.log("Read scheduler stopped");
if (Log.channel()) {
Log.logChannel("Stopped reading from channel {0}", channelDescr());
}
}
handlePending();
}
}
boolean handlePending() {
ReadSubscription pending = pendingSubscription.getAndSet(null);
if (pending == null) return false;
if (debug.on())
debug.log("handling pending subscription for %s",
pending.subscriber);
ReadSubscription current = subscription;
if (current != null && current != pending && !completed) {
current.subscriber.dropSubscription();
}
if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get());
if (!readScheduler.isStopped()) {
subscription = pending;
} else {
if (debug.on()) debug.log("socket tube is already stopped");
}
if (debug.on()) debug.log("calling onSubscribe");
pending.signalOnSubscribe();
if (completed) {
pending.errorRef.compareAndSet(null, errorRef.get());
pending.signalCompletion();
}
return true;
}
}
// A repeatable ReadEvent which is paused after firing and can
// be resumed if required - see SocketFlowEvent;
final class ReadEvent extends SocketFlowEvent {
final InternalReadSubscription sub;
ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
super(SelectionKey.OP_READ, channel);
this.sub = sub;
}
@Override
protected final void signalEvent() {
try {
client.eventUpdated(this);
sub.signalReadable();
} catch(Throwable t) {
sub.signalError(t);
}
}
@Override
protected final void signalError(Throwable error) {
sub.signalError(error);
}
@Override
Logger debug() { return debug; }
}
}
// ===================================================================== //
// Buffer Management //
// ===================================================================== //
// This interface is used by readAvailable(BufferSource);
public interface BufferSource {
Returns a buffer to read data from the socket.
Implementation Note:
Different implementation can have different strategies, as to
which kind of buffer to return, or whether to return the same
buffer. The only constraints are that:
a. the buffer returned must not be null
b. the buffer position indicates where to start reading
c. the buffer limit indicates where to stop reading.
d. the buffer is 'free' - that is - it is not used
or retained by anybody else Returns: A buffer to read data from the socket.
/**
* Returns a buffer to read data from the socket.
*
* @implNote
* Different implementation can have different strategies, as to
* which kind of buffer to return, or whether to return the same
* buffer. The only constraints are that:
* a. the buffer returned must not be null
* b. the buffer position indicates where to start reading
* c. the buffer limit indicates where to stop reading.
* d. the buffer is 'free' - that is - it is not used
* or retained by anybody else
*
* @return A buffer to read data from the socket.
*/
ByteBuffer getBuffer();
Appends the read-data in buffer
to the list of buffer to be sent downstream to the subscriber. May return a new list, or append to the given list. Params: - list – The list before adding the data. Can be null.
- buffer – The buffer containing the data to add to the list.
- start – The start position at which data were read.
The current buffer position indicates the end.
Implementation Note:
Different implementation can have different strategies, but
must obviously be consistent with the implementation of the
getBuffer() method. For instance, an implementation could
decide to add the buffer to the list and return a new buffer
next time getBuffer() is called, or could decide to add a buffer
slice to the list and return the same buffer (if remaining
space is available) next time getBuffer() is called. Returns: A possibly new list where a buffer containing the
data read from the socket has been added.
/**
* Appends the read-data in {@code buffer} to the list of buffer to
* be sent downstream to the subscriber. May return a new
* list, or append to the given list.
*
* @implNote
* Different implementation can have different strategies, but
* must obviously be consistent with the implementation of the
* getBuffer() method. For instance, an implementation could
* decide to add the buffer to the list and return a new buffer
* next time getBuffer() is called, or could decide to add a buffer
* slice to the list and return the same buffer (if remaining
* space is available) next time getBuffer() is called.
*
* @param list The list before adding the data. Can be null.
* @param buffer The buffer containing the data to add to the list.
* @param start The start position at which data were read.
* The current buffer position indicates the end.
* @return A possibly new list where a buffer containing the
* data read from the socket has been added.
*/
List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
Returns the given unused buffer
, previously obtained from getBuffer
. Params: - buffer – The unused buffer.
Implementation Note: This method can be used, if necessary, to return
the unused buffer to the pull.
/**
* Returns the given unused {@code buffer}, previously obtained from
* {@code getBuffer}.
*
* @implNote This method can be used, if necessary, to return
* the unused buffer to the pull.
*
* @param buffer The unused buffer.
*/
default void returnUnused(ByteBuffer buffer) { }
}
// An implementation of BufferSource used for unencrypted data.
// This buffer source uses heap buffers and avoids wasting memory
// by forwarding read-only buffer slices downstream.
// Buffers allocated through this source are simply GC'ed when
// they are no longer referenced.
private static final class SliceBufferSource implements BufferSource {
private final Supplier<ByteBuffer> factory;
private volatile ByteBuffer current;
public SliceBufferSource() {
this(Utils::getBuffer);
}
public SliceBufferSource(Supplier<ByteBuffer> factory) {
this.factory = Objects.requireNonNull(factory);
}
// Reuses the same buffer if some space remains available.
// Otherwise, returns a new heap buffer.
@Override
public final ByteBuffer getBuffer() {
ByteBuffer buf = current;
buf = (buf == null || !buf.hasRemaining())
? (current = factory.get()) : buf;
assert buf.hasRemaining();
return buf;
}
// Adds a read-only slice to the list, potentially returning a
// new list with that slice at the end.
@Override
public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
// creates a slice to add to the list
int limit = buf.limit();
buf.limit(buf.position());
buf.position(start);
ByteBuffer slice = buf.slice();
// restore buffer state to what it was before creating the slice
buf.position(buf.limit());
buf.limit(limit);
// add the buffer to the list
return SocketTube.listOf(list, slice.asReadOnlyBuffer());
}
}
// An implementation of BufferSource used for encrypted data.
// This buffer source uses direct byte buffers that will be
// recycled by the SocketTube subscriber.
//
private static final class SSLDirectBufferSource implements BufferSource {
private final BufferSupplier factory;
private final HttpClientImpl client;
private ByteBuffer current;
public SSLDirectBufferSource(HttpClientImpl client) {
this.client = Objects.requireNonNull(client);
this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
}
// Obtains a 'free' byte buffer from the pool, or returns
// the same buffer if nothing was read at the previous cycle.
// The subscriber will be responsible for recycling this
// buffer into the pool (see SSLFlowDelegate.Reader)
@Override
public final ByteBuffer getBuffer() {
assert client.isSelectorThread();
ByteBuffer buf = current;
if (buf == null) {
buf = current = factory.get();
}
assert buf.hasRemaining();
assert buf.position() == 0;
return buf;
}
// Adds the buffer to the list. The buffer will be later returned to the
// pool by the subscriber (see SSLFlowDelegate.Reader).
// The next buffer returned by getBuffer() will be obtained from the
// pool. It might be the same buffer or another one.
// Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
// recycling will happen in the flow before onNext returns, then the
// pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
// it's shared by all SSL connections opened on that client.
@Override
public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
assert client.isSelectorThread();
assert buf.isDirect();
assert start == 0;
assert current == buf;
current = null;
buf.limit(buf.position());
buf.position(start);
// add the buffer to the list
return SocketTube.listOf(list, buf);
}
@Override
public void returnUnused(ByteBuffer buffer) {
// if current is null, then the buffer will have been added to the
// list, through append. Otherwise, current is not null, and needs
// to be returned to prevent the buffer supplier pool from growing
// to more than MAX_BUFFERS.
assert buffer == current;
ByteBuffer buf = current;
if (buf != null) {
assert buf.position() == 0;
current = null;
// the supplier assert if buf has remaining
buf.limit(buf.position());
factory.recycle(buf);
}
}
}
// ===================================================================== //
// Socket Channel Read/Write //
// ===================================================================== //
static final int MAX_BUFFERS = 3;
static final List<ByteBuffer> EOF = List.of();
static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
// readAvailable() will read bytes into the 'current' ByteBuffer until
// the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
// When that happens, a slice of the data that has been read so far
// is inserted into the returned buffer list, and if the current buffer
// has remaining space, that space will be used to read more data when
// the channel becomes readable again.
private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
ByteBuffer buf = buffersSource.getBuffer();
assert buf.hasRemaining();
int read;
int pos = buf.position();
List<ByteBuffer> list = null;
while (buf.hasRemaining()) {
try {
while ((read = channel.read(buf)) > 0) {
if (!buf.hasRemaining())
break;
}
} catch (IOException x) {
if (buf.position() == pos && list == null) {
// make sure that the buffer source will recycle
// 'buf' if needed
buffersSource.returnUnused(buf);
// no bytes have been read, just throw...
throw x;
} else {
// some bytes have been read, return them and fail next time
errorRef.compareAndSet(null, x);
read = 0; // ensures outer loop will exit
}
}
// nothing read;
if (buf.position() == pos) {
// An empty list signals the end of data, and should only be
// returned if read == -1. If some data has already been read,
// then it must be returned. -1 will be returned next time
// the caller attempts to read something.
buffersSource.returnUnused(buf);
if (list == null) {
// nothing read - list was null - return EOF or NOTHING
list = read == -1 ? EOF : NOTHING;
}
break;
}
// check whether this buffer has still some free space available.
// if so, we will keep it for the next round.
list = buffersSource.append(list, buf, pos);
if (read <= 0 || list.size() == MAX_BUFFERS) {
break;
}
buf = buffersSource.getBuffer();
pos = buf.position();
assert buf.hasRemaining();
}
return list;
}
private static <T> List<T> listOf(List<T> list, T item) {
int size = list == null ? 0 : list.size();
switch (size) {
case 0: return List.of(item);
case 1: return List.of(list.get(0), item);
case 2: return List.of(list.get(0), list.get(1), item);
default: // slow path if MAX_BUFFERS > 3
List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
res.add(item);
return res;
}
}
private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
final long remaining = Utils.remaining(srcs);
long written = 0;
while (remaining > written) {
try {
long w = channel.write(srcs);
assert w >= 0 : "negative number of bytes written:" + w;
if (w == 0) {
break;
}
written += w;
} catch (IOException x) {
if (written == 0) {
// no bytes were written just throw
throw x;
} else {
// return how many bytes were written, will fail next time
break;
}
}
}
return written;
}
private void resumeEvent(SocketFlowEvent event,
Consumer<Throwable> errorSignaler) {
boolean registrationRequired;
synchronized(lock) {
registrationRequired = !event.registered();
event.resume();
}
try {
if (registrationRequired) {
client.registerEvent(event);
} else {
client.eventUpdated(event);
}
} catch(Throwable t) {
errorSignaler.accept(t);
}
}
private void pauseEvent(SocketFlowEvent event,
Consumer<Throwable> errorSignaler) {
synchronized(lock) {
event.pause();
}
try {
client.eventUpdated(event);
} catch(Throwable t) {
errorSignaler.accept(t);
}
}
@Override
public void connectFlows(TubePublisher writePublisher,
TubeSubscriber readSubscriber) {
if (debug.on()) debug.log("connecting flows");
this.subscribe(readSubscriber);
writePublisher.subscribe(this);
}
@Override
public String toString() {
return dbgString();
}
final String dbgString() {
return "SocketTube("+id+")";
}
final String channelDescr() {
return String.valueOf(channel);
}
}