/*
 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.ConnectionExpiredException;
import jdk.internal.net.http.common.Utils;


A helper class that will queue up incoming data until the receiving side is ready to handle it.
/** * A helper class that will queue up incoming data until the receiving * side is ready to handle it. */
class Http1AsyncReceiver { final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
A delegate that can asynchronously receive data from an upstream flow, parse, it, then possibly transform it and either store it (response headers) or possibly pass it to a downstream subscriber (response body). Usually, there will be one Http1AsyncDelegate in charge of receiving and parsing headers, and another one in charge of receiving, parsing, and forwarding body. Each will sequentially subscribe with the Http1AsyncReceiver in turn. There may be additional delegates which subscribe to the Http1AsyncReceiver, mainly for the purpose of handling errors while the connection is busy transmitting the request body and the Http1Exchange::readBody method hasn't been called yet, and response delegates haven't subscribed yet.
/** * A delegate that can asynchronously receive data from an upstream flow, * parse, it, then possibly transform it and either store it (response * headers) or possibly pass it to a downstream subscriber (response body). * Usually, there will be one Http1AsyncDelegate in charge of receiving * and parsing headers, and another one in charge of receiving, parsing, * and forwarding body. Each will sequentially subscribe with the * Http1AsyncReceiver in turn. There may be additional delegates which * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling * errors while the connection is busy transmitting the request body and the * Http1Exchange::readBody method hasn't been called yet, and response * delegates haven't subscribed yet. */
static interface Http1AsyncDelegate {
Receives and handles a byte buffer reference.
Params:
  • ref – A byte buffer reference coming from upstream.
Returns:false, if the byte buffer reference should be kept in the queue. Usually, this means that either the byte buffer reference was handled and parsing is finished, or that the receiver didn't handle the byte reference at all. There may or may not be any remaining data in the byte buffer, and the byte buffer reference must not have been cleared. true, if the byte buffer reference was fully read and more data can be received.
/** * Receives and handles a byte buffer reference. * @param ref A byte buffer reference coming from upstream. * @return false, if the byte buffer reference should be kept in the queue. * Usually, this means that either the byte buffer reference * was handled and parsing is finished, or that the receiver * didn't handle the byte reference at all. * There may or may not be any remaining data in the * byte buffer, and the byte buffer reference must not have * been cleared. * true, if the byte buffer reference was fully read and * more data can be received. */
public boolean tryAsyncReceive(ByteBuffer ref);
Called when an exception is raised.
Params:
  • ex – The raised Throwable.
/** * Called when an exception is raised. * @param ex The raised Throwable. */
public void onReadError(Throwable ex);
Must be called before any other method on the delegate. The subscription can be either used directly by the delegate to request more data (e.g. if the delegate is a header parser), or can be forwarded to a downstream subscriber (if the delegate is a body parser that wraps a response BodySubscriber). In all cases, it is the responsibility of the delegate to ensure that request(n) and demand.tryDecrement() are called appropriately. No data will be sent to tryAsyncReceive unless the subscription has some demand.
Params:
  • s – A subscription that allows the delegate to control the data flow.
/** * Must be called before any other method on the delegate. * The subscription can be either used directly by the delegate * to request more data (e.g. if the delegate is a header parser), * or can be forwarded to a downstream subscriber (if the delegate * is a body parser that wraps a response BodySubscriber). * In all cases, it is the responsibility of the delegate to ensure * that request(n) and demand.tryDecrement() are called appropriately. * No data will be sent to {@code tryAsyncReceive} unless * the subscription has some demand. * * @param s A subscription that allows the delegate to control the * data flow. */
public void onSubscribe(AbstractSubscription s);
Returns the subscription that was passed to onSubscribe
Returns:the subscription that was passed to onSubscribe..
/** * Returns the subscription that was passed to {@code onSubscribe} * @return the subscription that was passed to {@code onSubscribe}.. */
public AbstractSubscription subscription();
Called to make sure resources are released when the when the Http1AsyncReceiver is stopped.
Params:
  • error – The Http1AsyncReceiver pending error ref, if any.
/** * Called to make sure resources are released when the * when the Http1AsyncReceiver is stopped. * @param error The Http1AsyncReceiver pending error ref, * if any. */
public void close(Throwable error); }
A simple subclass of AbstractSubscription that ensures the SequentialScheduler will be run when request() is called and demand becomes positive again.
/** * A simple subclass of AbstractSubscription that ensures the * SequentialScheduler will be run when request() is called and demand * becomes positive again. */
private static final class Http1AsyncDelegateSubscription extends AbstractSubscription { private final Runnable onCancel; private final Consumer<Throwable> onError; private final SequentialScheduler scheduler; private volatile boolean cancelled; Http1AsyncDelegateSubscription(SequentialScheduler scheduler, Runnable onCancel, Consumer<Throwable> onError) { this.scheduler = scheduler; this.onCancel = onCancel; this.onError = onError; } @Override public void request(long n) { if (cancelled) return; try { final Demand demand = demand(); if (demand.increase(n)) { scheduler.runOrSchedule(); } } catch (IllegalArgumentException x) { cancelled = true; onError.accept(x); } } @Override public void cancel() { cancelled = true; onCancel.run(); } } private final ConcurrentLinkedDeque<ByteBuffer> queue = new ConcurrentLinkedDeque<>(); private final SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::flush); final MinimalFuture<Void> whenFinished; private final Executor executor; private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber(); private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef; private final AtomicLong received = new AtomicLong(); final AtomicBoolean canRequestMore = new AtomicBoolean(); private volatile Throwable error; private volatile Http1AsyncDelegate delegate; // This reference is only used to prevent early GC of the exchange. private volatile Http1Exchange<?> owner; // Only used for checking whether we run on the selector manager thread. private final HttpClientImpl client; private boolean retry; private volatile boolean stopRequested; public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) { this.pendingDelegateRef = new AtomicReference<>(); this.executor = executor; this.whenFinished = new MinimalFuture<>(); this.owner = owner; this.client = owner.client; } // This is the main loop called by the SequentialScheduler. // It attempts to empty the queue until the scheduler is stopped, // or the delegate is unregistered, or the delegate is unable to // process the data (because it's not ready or already done), which // it signals by returning 'true'; private void flush() { ByteBuffer buf; try { // we should not be running in the selector here, // except if the custom Executor supplied to the client is // something like (r) -> r.run(); assert !client.isSelectorThread() || !(client.theExecutor().delegate() instanceof ExecutorService) : "Http1AsyncReceiver::flush should not run in the selector: " + Thread.currentThread().getName(); // First check whether we have a pending delegate that has // just subscribed, and if so, create a Subscription for it // and call onSubscribe. handlePendingDelegate(); // Then start emptying the queue, if possible. while ((buf = queue.peek()) != null && !stopRequested) { Http1AsyncDelegate delegate = this.delegate; if (debug.on()) debug.log("Got %s bytes for delegate %s", buf.remaining(), delegate); if (!hasDemand(delegate)) { // The scheduler will be invoked again later when the demand // becomes positive. return; } assert delegate != null; if (debug.on()) debug.log("Forwarding %s bytes to delegate %s", buf.remaining(), delegate); // The delegate has demand: feed it the next buffer. if (!delegate.tryAsyncReceive(buf)) { final long remaining = buf.remaining(); if (debug.on()) debug.log(() -> { // If the scheduler is stopped, the queue may already // be empty and the reference may already be released. String remstr = scheduler.isStopped() ? "" : " remaining in ref: " + remaining; remstr += remstr + " total remaining: " + remaining(); return "Delegate done: " + remaining; }); canRequestMore.set(false); // The last buffer parsed may have remaining unparsed bytes. // Don't take it out of the queue. return; // done. } // removed parsed buffer from queue, and continue with next // if available ByteBuffer parsed = queue.remove(); canRequestMore.set(queue.isEmpty() && !stopRequested); assert parsed == buf; } // queue is empty: let's see if we should request more checkRequestMore(); } catch (Throwable t) { Throwable x = error; if (x == null) error = t; // will be handled in the finally block if (debug.on()) debug.log("Unexpected error caught in flush()", t); } finally { // Handles any pending error. // The most recently subscribed delegate will get the error. checkForErrors(); } } private String describe() { Http1Exchange<?> exchange = owner; if (exchange != null) { return String.valueOf(exchange.request()); } return "<uri unavailable>"; }
Must be called from within the scheduler main loop. Handles any pending errors by calling delegate.onReadError(). If the error can be forwarded to the delegate, stops the scheduler.
/** * Must be called from within the scheduler main loop. * Handles any pending errors by calling delegate.onReadError(). * If the error can be forwarded to the delegate, stops the scheduler. */
private void checkForErrors() { // Handles any pending error. // The most recently subscribed delegate will get the error. // If the delegate is null, the error will be handled by the next // delegate that subscribes. // If the queue is not empty, wait until it is empty before // handling the error. Http1AsyncDelegate delegate = pendingDelegateRef.get(); if (delegate == null) delegate = this.delegate; Throwable x = error; if (delegate != null && x != null && (stopRequested || queue.isEmpty())) { // forward error only after emptying the queue. final Object captured = delegate; if (debug.on()) debug.log(() -> "flushing " + x + "\n\t delegate: " + captured + "\t\t queue.isEmpty: " + queue.isEmpty()); scheduler.stop(); delegate.onReadError(x); whenFinished.completeExceptionally(x); if (Log.channel()) { Log.logChannel("HTTP/1 read subscriber stopped for: {0}", describe()); } if (stopRequested) { // This is the special case where the subscriber // has requested an illegal number of items. // In this case, the error doesn't come from // upstream, but from downstream, and we need to // close the upstream connection. Http1Exchange<?> exchg = owner; stop(); if (exchg != null) exchg.connection().close(); } } }
Must be called from within the scheduler main loop. Figure out whether more data should be requested from the Http1TubeSubscriber.
/** * Must be called from within the scheduler main loop. * Figure out whether more data should be requested from the * Http1TubeSubscriber. */
private void checkRequestMore() { Http1AsyncDelegate delegate = this.delegate; boolean more = this.canRequestMore.get(); boolean hasDemand = hasDemand(delegate); if (debug.on()) debug.log("checkRequestMore: " + "canRequestMore=" + more + ", hasDemand=" + hasDemand + (delegate == null ? ", delegate=null" : "")); if (hasDemand) { subscriber.requestMore(); } }
Must be called from within the scheduler main loop. Return true if the delegate is not null and has some demand.
Params:
  • delegate – The Http1AsyncDelegate delegate
Returns:true if the delegate is not null and has some demand
/** * Must be called from within the scheduler main loop. * Return true if the delegate is not null and has some demand. * @param delegate The Http1AsyncDelegate delegate * @return true if the delegate is not null and has some demand */
private boolean hasDemand(Http1AsyncDelegate delegate) { if (delegate == null) return false; AbstractSubscription subscription = delegate.subscription(); long demand = subscription.demand().get(); if (debug.on()) debug.log("downstream subscription demand is %s", demand); return demand > 0; }
Must be called from within the scheduler main loop. Handles pending delegate subscription. Return true if there was some pending delegate subscription and a new delegate was subscribed, false otherwise.
Returns:true if there was some pending delegate subscription and a new delegate was subscribed, false otherwise.
/** * Must be called from within the scheduler main loop. * Handles pending delegate subscription. * Return true if there was some pending delegate subscription and a new * delegate was subscribed, false otherwise. * * @return true if there was some pending delegate subscription and a new * delegate was subscribed, false otherwise. */
private boolean handlePendingDelegate() { Http1AsyncDelegate pending = pendingDelegateRef.get(); if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) { Http1AsyncDelegate delegate = this.delegate; if (delegate != null) unsubscribe(delegate); Consumer<Throwable> onSubscriptionError = (x) -> { setRetryOnError(false); stopRequested = true; onReadError(x); }; Runnable cancel = () -> { if (debug.on()) debug.log("Downstream subscription cancelled by %s", pending); // The connection should be closed, as some data may // be left over in the stream. try { setRetryOnError(false); pending.close(null); onReadError(new IOException("subscription cancelled")); unsubscribe(pending); } finally { Http1Exchange<?> exchg = owner; stop(); if (exchg != null) exchg.connection().close(); } }; // The subscription created by a delegate is only loosely // coupled with the upstream subscription. This is partly because // the header/body parser work with a flow of ByteBuffer, whereas // we have a flow List<ByteBuffer> upstream. Http1AsyncDelegateSubscription subscription = new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError); try { pending.onSubscribe(subscription); } finally { this.delegate = delegate = pending; } final Object captured = delegate; if (debug.on()) debug.log("delegate is now " + captured + ", demand=" + subscription.demand().get() + ", canRequestMore=" + canRequestMore.get() + ", queue.isEmpty=" + queue.isEmpty()); return true; } return false; } synchronized void setRetryOnError(boolean retry) { this.retry = retry; } void clear() { if (debug.on()) debug.log("cleared"); this.pendingDelegateRef.set(null); this.delegate = null; this.owner = null; } void subscribe(Http1AsyncDelegate delegate) { synchronized(this) { pendingDelegateRef.set(delegate); } if (queue.isEmpty()) { canRequestMore.set(true); } if (debug.on()) debug.log("Subscribed pending " + delegate + " queue.isEmpty: " + queue.isEmpty()); // Everything may have been received already. Make sure // we parse it. if (client.isSelectorThread()) { scheduler.runOrSchedule(executor); } else { scheduler.runOrSchedule(); } } // Used for debugging only! long remaining() { return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY)); } void unsubscribe(Http1AsyncDelegate delegate) { synchronized(this) { if (this.delegate == delegate) { if (debug.on()) debug.log("Unsubscribed %s", delegate); this.delegate = null; } } } // Callback: Consumer of ByteBuffer private void asyncReceive(ByteBuffer buf) { if (debug.on()) debug.log("Putting %s bytes into the queue", buf.remaining()); received.addAndGet(buf.remaining()); queue.offer(buf); // This callback is called from within the selector thread. // Use an executor here to avoid doing the heavy lifting in the // selector. scheduler.runOrSchedule(executor); } // Callback: Consumer of Throwable void onReadError(Throwable ex) { Http1AsyncDelegate delegate; Throwable recorded; if (debug.on()) debug.log("onError: %s", (Object) ex); synchronized (this) { delegate = this.delegate; recorded = error; if (recorded == null) { // retry is set to true by HttpExchange when the connection is // already connected, which means it's been retrieved from // the pool. if (retry && (ex instanceof IOException)) { // could be either EOFException, or // IOException("connection reset by peer), or // SSLHandshakeException resulting from the server having // closed the SSL session. if (received.get() == 0) { // If we receive such an exception before having // received any byte, then in this case, we will // throw ConnectionExpiredException // to try & force a retry of the request. retry = false; ex = new ConnectionExpiredException(ex); } } error = ex; } } final Throwable t = (recorded == null ? ex : recorded); if (debug.on()) debug.log("recorded " + t + "\n\t delegate: " + delegate + "\t\t queue.isEmpty: " + queue.isEmpty(), ex); if (Log.errors()) { Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t); } if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) { // This callback is called from within the selector thread. // Use an executor here to avoid doing the heavy lifting in the // selector. if (Log.errors()) { Log.logError("HTTP/1 propagating recorded error: {0} - {1}", describe(), t); } scheduler.runOrSchedule(executor); } } void stop() { if (debug.on()) debug.log("stopping"); if (Log.channel() && !scheduler.isStopped()) { Log.logChannel("HTTP/1 read subscriber stopped for {0}", describe()); } scheduler.stop(); // make sure ref count is handled properly by // closing the delegate. Http1AsyncDelegate previous = delegate; if (previous != null) previous.close(error); delegate = null; owner = null; whenFinished.complete(null); }
Returns the TubeSubscriber for reading from the connection flow.
Returns:the TubeSubscriber for reading from the connection flow.
/** * Returns the TubeSubscriber for reading from the connection flow. * @return the TubeSubscriber for reading from the connection flow. */
TubeSubscriber subscriber() { return subscriber; }
A simple tube subscriber for reading from the connection flow.
/** * A simple tube subscriber for reading from the connection flow. */
final class Http1TubeSubscriber implements TubeSubscriber { volatile Flow.Subscription subscription; volatile boolean completed; volatile boolean dropped; public void onSubscribe(Flow.Subscription subscription) { // supports being called multiple time. // doesn't cancel the previous subscription, since that is // most probably the same as the new subscription. if (debug.on()) debug.log("Received onSubscribed from upstream"); if (Log.channel()) { Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe()); } assert this.subscription == null || dropped == false; this.subscription = subscription; dropped = false; canRequestMore.set(true); if (delegate != null) { scheduler.runOrSchedule(executor); } else { if (debug.on()) debug.log("onSubscribe: read delegate not present yet"); } } void requestMore() { Flow.Subscription s = subscription; if (s == null) return; if (canRequestMore.compareAndSet(true, false)) { if (!completed && !dropped) { if (debug.on()) debug.log("Http1TubeSubscriber: requesting one more from upstream"); s.request(1); return; } } if (debug.on()) debug.log("Http1TubeSubscriber: no need to request more"); } @Override public void onNext(List<ByteBuffer> item) { canRequestMore.set(item.isEmpty()); for (ByteBuffer buffer : item) { asyncReceive(buffer); } } @Override public void onError(Throwable throwable) { onReadError(throwable); completed = true; } @Override public void onComplete() { onReadError(new EOFException("EOF reached while reading")); completed = true; } public void dropSubscription() { if (debug.on()) debug.log("Http1TubeSubscriber: dropSubscription"); // we could probably set subscription to null here... // then we might not need the 'dropped' boolean? dropped = true; } } // Drains the content of the queue into a single ByteBuffer. // The scheduler must be permanently stopped before calling drain(). ByteBuffer drain(ByteBuffer initial) { // Revisit: need to clean that up. // ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial); assert scheduler.isStopped(); if (queue.isEmpty()) return b; // sanity check: we shouldn't have queued the same // buffer twice. ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]); // the assertion looks suspicious, more investigation needed // // assert java.util.stream.Stream.of(qbb) // .collect(Collectors.toSet()) // .size() == qbb.length : debugQBB(qbb); // compute the number of bytes in the queue, the number of bytes // in the initial buffer // TODO: will need revisiting - as it is not guaranteed that all // data will fit in single BB! int size = Utils.remaining(qbb, Integer.MAX_VALUE); int remaining = b.remaining(); int free = b.capacity() - b.position() - remaining; if (debug.on()) debug.log("Flushing %s bytes from queue into initial buffer " + "(remaining=%s, free=%s)", size, remaining, free); // check whether the initial buffer has enough space if (size > free) { if (debug.on()) debug.log("Allocating new buffer for initial: %s", (size + remaining)); // allocates a new buffer and copy initial to it b = ByteBuffer.allocate(size + remaining); Utils.copy(initial, b); assert b.position() == remaining; b.flip(); assert b.position() == 0; assert b.limit() == remaining; assert b.remaining() == remaining; } // store position and limit int pos = b.position(); int limit = b.limit(); assert limit - pos == remaining; assert b.capacity() >= remaining + size : "capacity: " + b.capacity() + ", remaining: " + b.remaining() + ", size: " + size; // prepare to copy the content of the queue b.position(limit); b.limit(pos + remaining + size); assert b.remaining() >= size : "remaining: " + b.remaining() + ", size: " + size; // copy the content of the queue int count = 0; for (int i=0; i<qbb.length; i++) { ByteBuffer b2 = qbb[i]; int r = b2.remaining(); assert b.remaining() >= r : "need at least " + r + " only " + b.remaining() + " available"; int copied = Utils.copy(b2, b); assert copied == r : "copied="+copied+" available="+r; assert b2.remaining() == 0; count += copied; } assert count == size; assert b.position() == pos + remaining + size : "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size; // reset limit and position b.limit(limit+size); b.position(pos); // we can clear the refs queue.clear(); final ByteBuffer bb = b; if (debug.on()) debug.log("Initial buffer now has " + bb.remaining() + " pos=" + bb.position() + " limit=" + bb.limit()); return b; } private String debugQBB(ByteBuffer[] qbb) { StringBuilder msg = new StringBuilder(); List<ByteBuffer> lbb = Arrays.asList(qbb); Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb)); int uniquebb = sbb.size(); msg.append("qbb: ").append(lbb.size()) .append(" (unique: ").append(uniquebb).append("), ") .append("duplicates: "); String sep = ""; for (ByteBuffer b : lbb) { if (!sbb.remove(b)) { msg.append(sep) .append(String.valueOf(b)) .append("[remaining=") .append(b.remaining()) .append(", position=") .append(b.position()) .append(", capacity=") .append(b.capacity()) .append("]"); sep = ", "; } } return msg.toString(); } volatile String dbgTag; String dbgString() { String tag = dbgTag; if (tag == null) { String flowTag = null; Http1Exchange<?> exchg = owner; Object flow = (exchg != null) ? exchg.connection().getConnectionFlow() : null; flowTag = tag = flow == null ? null: (String.valueOf(flow)); if (flowTag != null) { dbgTag = tag = "Http1AsyncReceiver("+ flowTag + ")"; } else { tag = "Http1AsyncReceiver(?)"; } } return tag; } }