package jdk.internal.net.http;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodySubscriber;
import jdk.internal.net.http.common.*;
import jdk.internal.net.http.frame.*;
import jdk.internal.net.http.hpack.DecodingCallback;
class Stream<T> extends ExchangeImpl<T> {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
final SubscriptionBase userSubscription =
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
protected volatile int streamid;
long requestContentLen;
final Http2Connection connection;
final HttpRequestImpl request;
final HeadersConsumer ;
final HttpHeadersBuilder ;
final HttpHeaders ;
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
final HttpRequest.BodyPublisher requestPublisher;
volatile RequestSubscriber requestSubscriber;
volatile int responseCode;
volatile Response response;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
volatile CompletableFuture<T> responseBodyCF;
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
volatile boolean stopRequested;
private volatile boolean remotelyClosed;
private volatile boolean closed;
private volatile boolean endStreamSent;
private volatile int streamState;
private volatile boolean deRegistered;
private boolean requestSent, responseReceived;
private final Object sendLock = new Object();
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;
@Override
HttpConnection connection() {
return connection.connection;
}
private void schedule() {
boolean onCompleteCalled = false;
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
try {
if (subscriber == null) {
subscriber = responseSubscriber = pendingResponseSubscriber;
if (subscriber == null) {
return;
} else {
if (debug.on()) debug.log("subscribing user subscriber");
subscriber.onSubscribe(userSubscription);
}
}
while (!inputQ.isEmpty()) {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
inputQ.remove();
handleReset((ResetFrame)frame, subscriber);
return;
}
DataFrame df = (DataFrame)frame;
boolean finished = df.getFlag(DataFrame.END_STREAM);
List<ByteBuffer> buffers = df.getData();
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
if (size == 0 && finished) {
inputQ.remove();
connection.ensureWindowUpdated(df);
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
} else if (userSubscription.tryDecrement()) {
inputQ.remove();
Log.logTrace("responseSubscriber.onNext {0}", size);
if (debug.on()) debug.log("incoming: onNext(%d)", size);
try {
subscriber.onNext(dsts);
} catch (Throwable t) {
connection.dropDataFrame(df);
throw t;
}
if (consumed(df)) {
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
}
} else {
if (stopRequested) break;
return;
}
}
} catch (Throwable throwable) {
errorRef.compareAndSet(null, throwable);
} finally {
if (sched.isStopped()) drainInputQueue();
}
Throwable t = errorRef.get();
if (t != null) {
sched.stop();
try {
if (!onCompleteCalled) {
if (debug.on())
debug.log("calling subscriber.onError: %s", (Object) t);
subscriber.onError(t);
} else {
if (debug.on())
debug.log("already completed: dropping error %s", (Object) t);
}
} catch (Throwable x) {
Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
} finally {
cancelImpl(t);
drainInputQueue();
}
}
}
private void drainInputQueue() {
Http2Frame frame;
while ((frame = inputQ.poll()) != null) {
if (frame instanceof DataFrame) {
connection.dropDataFrame((DataFrame)frame);
}
}
}
@Override
void nullBody(HttpResponse<T> resp, Throwable t) {
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
sched.runOrSchedule();
}
private boolean consumed(DataFrame df) {
int len = df.payloadLength();
boolean endStream = df.getFlag(DataFrame.END_STREAM);
if (len == 0) return endStream;
connection.windowUpdater.update(len);
if (!endStream) {
windowUpdater.update(len);
}
return endStream;
}
boolean deRegister() {
return DEREGISTERED.compareAndSet(this, false, true);
}
@Override
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
try {
Log.logTrace("Reading body on stream {0}", streamid);
debug.log("Getting BodySubscriber for: " + response);
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
cf = cf.whenComplete((t, e) -> pg.pushError(e));
}
return cf;
} catch (Throwable t) {
cancelImpl(t);
return MinimalFuture.failedFuture(t);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("streamid: ")
.append(streamid);
return sb.toString();
}
private void receiveDataFrame(DataFrame df) {
inputQ.add(df);
sched.runOrSchedule();
}
private void receiveResetFrame(ResetFrame frame) {
inputQ.add(frame);
sched.runOrSchedule();
}
int markStream(int code) {
if (code == 0) return streamState;
synchronized (sendLock) {
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
}
}
private void sendDataFrame(DataFrame frame) {
synchronized (sendLock) {
if (streamState == 0) {
connection.sendDataFrame(frame);
}
}
}
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
new MinimalFuture<>(), this::cancelImpl);
if (isCanceled()) {
Throwable t = getCancelCause();
responseBodyCF.completeExceptionally(t);
} else {
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule();
}
return responseBodyCF;
}
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
Stream(Http2Connection connection,
Exchange<T> e,
WindowController windowController)
{
super(e);
this.connection = connection;
this.windowController = windowController;
this.request = e.request();
this.requestPublisher = request.requestPublisher;
this.responseHeadersBuilder = new HttpHeadersBuilder();
this.rspHeadersConsumer = new HeadersConsumer();
this.requestPseudoHeaders = createPseudoHeaders(request);
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
private boolean checkRequestCancelled() {
if (exchange.multi.requestCancelled()) {
if (errorRef.get() == null) cancel();
else sendCancelStreamFrame();
return true;
}
return false;
}
void incoming(Http2Frame frame) throws IOException {
if (debug.on()) debug.log("incoming: %s", frame);
var cancelled = checkRequestCancelled() || closed;
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame) frame;
if (hframe.endHeaders()) {
Log.logTrace("handling response (streamid={0})", streamid);
handleResponse();
}
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame) {
if (cancelled) connection.dropDataFrame((DataFrame) frame);
else receiveDataFrame((DataFrame) frame);
} else {
if (!cancelled) otherFrame(frame);
}
}
void otherFrame(Http2Frame frame) throws IOException {
switch (frame.type()) {
case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);
case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);
case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);
default -> throw new IOException("Unexpected frame: " + frame.toString());
}
}
DecodingCallback () {
return rspHeadersConsumer;
}
protected void handleResponse() throws IOException {
HttpHeaders responseHeaders = responseHeadersBuilder.build();
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
response = new Response(
request, exchange, responseHeaders, connection(),
responseCode, HttpClient.Version.HTTP_2);
responseHeaders.firstValueAsLong("content-length");
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
rspHeadersConsumer.reset();
completeResponse(response);
}
void incoming_reset(ResetFrame frame) {
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
if (endStreamReceived()) {
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
if (response == null && subscriber == null) {
handleReset(frame, subscriber);
} else {
receiveResetFrame(frame);
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
}
}
}
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
synchronized (this) {
if (closed) {
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
return;
}
closed = true;
}
try {
int error = frame.getErrorCode();
IOException e = new IOException("Received RST_STREAM: "
+ ErrorFrame.stringForCode(error));
if (errorRef.compareAndSet(null, e)) {
if (subscriber != null) {
subscriber.onError(e);
}
}
completeResponseExceptionally(e);
if (!requestBodyCF.isDone()) {
requestBodyCF.completeExceptionally(errorRef.get());
}
if (responseBodyCF != null) {
responseBodyCF.completeExceptionally(errorRef.get());
}
} finally {
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
}
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
}
void incoming_priority(PriorityFrame frame) {
throw new UnsupportedOperationException("Not implemented");
}
private void incoming_windowUpdate(WindowUpdateFrame frame)
throws IOException
{
int amount = frame.getUpdate();
if (amount <= 0) {
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
streamid, amount);
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
} else {
assert streamid != 0;
boolean success = windowController.increaseStreamWindow(amount, streamid);
if (!success) {
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
}
}
}
void incoming_pushPromise(HttpRequestImpl pushRequest,
PushedStream<T> pushStream)
throws IOException
{
if (Log.requests()) {
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
}
PushGroup<T> pushGroup = exchange.getPushGroup();
if (pushGroup == null || exchange.multi.requestCancelled()) {
Log.logTrace("Rejecting push promise stream " + streamid);
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
pushStream.close();
return;
}
PushGroup.Acceptor<T> acceptor = null;
boolean accepted = false;
try {
acceptor = pushGroup.acceptPushRequest(pushRequest);
accepted = acceptor.accepted();
} catch (Throwable t) {
if (debug.on())
debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
(Object)t);
}
if (!accepted) {
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
if (Log.trace()) {
Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
ex.getMessage());
}
pushStream.cancelImpl(ex);
return;
}
assert accepted && acceptor != null;
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
assert pushHandler != null;
pushStream.requestSent();
pushStream.setPushHandler(pushHandler);
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
t = Utils.getCompletionCause(t);
if (Log.trace()) {
Log.logTrace("Push completed on stream {0} for {1}{2}",
pushStream.streamid, resp,
((t==null) ? "": " with exception " + t));
}
if (t != null) {
pushGroup.pushError(t);
pushResponseCF.completeExceptionally(t);
} else {
pushResponseCF.complete(resp);
}
pushGroup.pushCompleted();
});
}
private OutgoingHeaders<Stream<T>> (long contentLength) {
HttpHeadersBuilder h = request.getSystemHeadersBuilder();
if (contentLength > 0) {
h.setHeader("content-length", Long.toString(contentLength));
}
HttpHeaders sysh = filterHeaders(h.build());
HttpHeaders userh = filterHeaders(request.getUserHeaders());
userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));
final HttpHeaders uh = userh;
sysh = HttpHeaders.of(sysh.map(), (k,v) -> uh.firstValue(k).isEmpty());
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
if (contentLength == 0) {
f.setFlag(HeadersFrame.END_STREAM);
endStreamSent = true;
}
return f;
}
private boolean (HttpHeaders headers) {
return headers.firstValue("proxy-authorization")
.isPresent();
}
private boolean (HttpHeaders headers,
BiPredicate<String, String> filter) {
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
&& hasProxyAuthorization(headers);
} else {
return hasProxyAuthorization(headers);
}
}
private HttpHeaders (HttpHeaders headers) {
HttpConnection conn = connection();
BiPredicate<String, String> filter = conn.headerFilter(request);
if (needsFiltering(headers, filter)) {
return HttpHeaders.of(headers.map(), filter);
}
return headers;
}
private static HttpHeaders (HttpRequest request) {
HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
String method = request.method();
hdrs.setHeader(":method", method);
URI uri = request.uri();
hdrs.setHeader(":scheme", uri.getScheme());
hdrs.setHeader(":authority", uri.getAuthority());
String query = uri.getRawQuery();
String path = uri.getRawPath();
if (path == null || path.isEmpty()) {
if (method.equalsIgnoreCase("OPTIONS")) {
path = "*";
} else {
path = "/";
}
}
if (query != null) {
path += "?" + query;
}
hdrs.setHeader(":path", Utils.encode(path));
return hdrs.build();
}
HttpHeaders () {
return requestPseudoHeaders;
}
void setEndStreamReceived() {
if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
assert remotelyClosed == false: "Unexpected endStream already set";
remotelyClosed = true;
responseReceived();
}
private boolean endStreamReceived() {
return remotelyClosed;
}
@Override
CompletableFuture<ExchangeImpl<T>> () {
if (debug.on()) debug.log("sendHeadersOnly()");
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
if (requestPublisher != null) {
requestContentLen = requestPublisher.contentLength();
} else {
requestContentLen = 0;
}
Throwable t = errorRef.get();
if (t != null) {
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
return MinimalFuture.failedFuture(t);
}
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
cf.complete(this);
return cf;
}
@Override
void released() {
if (streamid > 0) {
if (debug.on()) debug.log("Released stream %d", streamid);
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
if (debug.on()) debug.log("Can't release stream %d", streamid);
}
}
@Override
void completed() {
}
boolean registerStream(int id, boolean registerIfCancelled) {
boolean cancelled = closed || exchange.multi.requestCancelled();
if (!cancelled || registerIfCancelled) {
this.streamid = id;
connection.putStream(this, streamid);
if (debug.on()) {
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
streamid, cancelled, registerIfCancelled);
}
}
return !cancelled;
}
void signalWindowUpdate() {
RequestSubscriber subscriber = requestSubscriber;
assert subscriber != null;
if (debug.on()) debug.log("Signalling window update");
subscriber.sendScheduler.runOrSchedule();
}
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
private final long contentLength;
private volatile long remainingContentLength;
private volatile Subscription subscription;
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final SequentialScheduler sendScheduler;
RequestSubscriber(long contentLen) {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
this.sendScheduler =
SequentialScheduler.synchronizedScheduler(this::trySend);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
if (debug.on())
debug.log("RequestSubscriber: onSubscribe, request 1");
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
if (debug.on())
debug.log("RequestSubscriber: onNext(%d)", item.remaining());
int size = outgoing.size();
assert size == 0 : "non-zero size: " + size;
onNextImpl(item);
}
private void onNextImpl(ByteBuffer item) {
if (requestBodyCF.isDone()) {
sendScheduler.stop();
subscription.cancel();
return;
}
outgoing.add(item);
sendScheduler.runOrSchedule();
}
@Override
public void onError(Throwable throwable) {
if (debug.on())
debug.log(() -> "RequestSubscriber: onError: " + throwable);
if (errorRef.compareAndSet(null, throwable)) {
sendScheduler.runOrSchedule();
}
}
@Override
public void onComplete() {
if (debug.on()) debug.log("RequestSubscriber: onComplete");
int size = outgoing.size();
assert size == 0 || size == 1 : "non-zero or one size: " + size;
onNextImpl(COMPLETED);
}
void trySend() {
try {
Throwable t = errorRef.get();
if (t != null) {
sendScheduler.stop();
if (requestBodyCF.isDone()) return;
subscription.cancel();
requestBodyCF.completeExceptionally(t);
cancelImpl(t);
return;
}
int state = streamState;
do {
ByteBuffer item = outgoing.peekFirst();
if (item == null) return;
else if (item == COMPLETED) {
sendScheduler.stop();
complete();
return;
}
while (item.hasRemaining() && state == 0) {
if (debug.on()) debug.log("trySend: %d", item.remaining());
assert !endStreamSent : "internal error, send data after END_STREAM flag";
DataFrame df = getDataFrame(item);
if (df == null) {
if (debug.on())
debug.log("trySend: can't send yet: %d", item.remaining());
return;
}
if (contentLength > 0) {
remainingContentLength -= df.getDataLength();
if (remainingContentLength < 0) {
String msg = connection().getConnectionFlow()
+ " stream=" + streamid + " "
+ "[" + Thread.currentThread().getName() + "] "
+ "Too many bytes in request body. Expected: "
+ contentLength + ", got: "
+ (contentLength - remainingContentLength);
assert streamid > 0;
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
throw new IOException(msg);
} else if (remainingContentLength == 0) {
df.setFlag(DataFrame.END_STREAM);
endStreamSent = true;
}
}
if ((state = streamState) != 0) {
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
break;
}
if (debug.on())
debug.log("trySend: sending: %d", df.getDataLength());
sendDataFrame(df);
}
if (state != 0) break;
assert !item.hasRemaining();
ByteBuffer b = outgoing.removeFirst();
assert b == item;
} while (outgoing.peekFirst() != null);
if (state != 0) {
t = errorRef.get();
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
throw t;
}
if (debug.on()) debug.log("trySend: request 1");
subscription.request(1);
} catch (Throwable ex) {
if (debug.on()) debug.log("trySend: ", ex);
sendScheduler.stop();
subscription.cancel();
requestBodyCF.completeExceptionally(ex);
cancelImpl(ex);
}
}
private void complete() throws IOException {
long remaining = remainingContentLength;
long written = contentLength - remaining;
if (remaining > 0) {
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
throw new IOException(connection().getConnectionFlow()
+ " stream=" + streamid + " "
+ "[" + Thread.currentThread().getName() +"] "
+ "Too few bytes returned by the publisher ("
+ written + "/"
+ contentLength + ")");
}
if (!endStreamSent) {
endStreamSent = true;
connection.sendDataFrame(getEmptyEndStreamDataFrame());
}
requestBodyCF.complete(null);
}
}
@Override
public CompletableFuture<Void> ignoreBody() {
try {
connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
return MinimalFuture.completedFuture(null);
} catch (Throwable e) {
Log.logTrace("Error resetting stream {0}", e.toString());
return MinimalFuture.failedFuture(e);
}
}
DataFrame getDataFrame(ByteBuffer buffer) {
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
if (actualAmount <= 0) return null;
ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);
DataFrame df = new DataFrame(streamid, 0 , outBuf);
return df;
}
private DataFrame getEmptyEndStreamDataFrame() {
return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
}
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf;
synchronized (response_cfs) {
if (!response_cfs.isEmpty()) {
cf = response_cfs.remove(0);
assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
} else {
cf = new MinimalFuture<>();
response_cfs.add(cf);
}
}
if (executor != null && !cf.isDone()) {
cf = cf.thenApplyAsync(r -> r, executor);
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
}
return cf;
}
void completeResponse(Response resp) {
synchronized (response_cfs) {
CompletableFuture<Response> cf;
int cfs_len = response_cfs.size();
for (int i=0; i<cfs_len; i++) {
cf = response_cfs.get(i);
if (!cf.isDone()) {
Log.logTrace("Completing response (streamid={0}): {1}",
streamid, cf);
if (debug.on())
debug.log("Completing responseCF(%d) with response headers", i);
response_cfs.remove(cf);
cf.complete(resp);
return;
}
}
cf = MinimalFuture.completedFuture(resp);
Log.logTrace("Created completed future (streamid={0}): {1}",
streamid, cf);
if (debug.on())
debug.log("Adding completed responseCF(0) with response headers");
response_cfs.add(cf);
}
}
synchronized void requestSent() {
requestSent = true;
if (responseReceived) {
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("requestSent: streamid=%d but response not received", streamid);
}
}
}
synchronized void responseReceived() {
responseReceived = true;
if (requestSent) {
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
close();
} else {
if (debug.on()) {
debug.log("responseReceived: streamid=%d but request not sent", streamid);
}
}
}
void completeResponseExceptionally(Throwable t) {
synchronized (response_cfs) {
for (int i = 0; i < response_cfs.size(); i++) {
CompletableFuture<Response> cf = response_cfs.get(i);
if (!cf.isDone()) {
response_cfs.remove(i);
cf.completeExceptionally(t);
return;
}
}
response_cfs.add(MinimalFuture.failedFuture(t));
}
}
CompletableFuture<Void> sendBodyImpl() {
requestBodyCF.whenComplete((v, t) -> requestSent());
try {
if (requestPublisher != null) {
final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
requestPublisher.subscribe(requestSubscriber = subscriber);
} else {
requestBodyCF.complete(null);
}
} catch (Throwable t) {
cancelImpl(t);
requestBodyCF.completeExceptionally(t);
}
return requestBodyCF;
}
@Override
void cancel() {
if ((streamid == 0)) {
cancel(new IOException("Stream cancelled before streamid assigned"));
} else {
cancel(new IOException("Stream " + streamid + " cancelled"));
}
}
void onSubscriptionError(Throwable t) {
errorRef.compareAndSet(null, t);
if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
stopRequested = true;
sched.runOrSchedule();
}
@Override
void cancel(IOException cause) {
cancelImpl(cause);
}
void connectionClosing(Throwable cause) {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
errorRef.compareAndSet(null, cause);
if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
sched.runOrSchedule();
} else cancelImpl(cause);
}
void cancelImpl(Throwable e) {
errorRef.compareAndSet(null, e);
if (debug.on()) {
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
else debug.log("cancelling stream %d: %s", streamid, e);
}
if (Log.trace()) {
if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
}
boolean closing;
if (closing = !closed) {
synchronized (this) {
if (closing = !closed) {
closed=true;
}
}
}
if (closing) {
if (responseSubscriber != null || pendingResponseSubscriber != null)
sched.runOrSchedule();
}
completeResponseExceptionally(e);
if (!requestBodyCF.isDone()) {
requestBodyCF.completeExceptionally(errorRef.get());
}
if (responseBodyCF != null) {
responseBodyCF.completeExceptionally(errorRef.get());
}
try {
if (streamid != 0 && streamState == 0) {
e = Utils.getCompletionCause(e);
if (e instanceof EOFException) {
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
sendCancelStreamFrame();
}
}
} catch (Throwable ex) {
Log.logError(ex);
}
}
void sendCancelStreamFrame() {
if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {
connection.resetStream(streamid, ResetFrame.CANCEL);
}
close();
}
void close() {
if (closed) return;
synchronized(this) {
if (closed) return;
closed = true;
}
if (debug.on()) debug.log("close stream %d", streamid);
Log.logTrace("Closing stream {0}", streamid);
connection.closeStream(streamid);
Log.logTrace("Stream {0} closed", streamid);
}
static class PushedStream<T> extends Stream<T> {
final PushGroup<T> pushGroup;
final CompletableFuture<Response> pushCF;
CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
PushedStream(PushGroup<T> pushGroup,
Http2Connection connection,
Exchange<T> pushReq) {
super(connection, pushReq, null);
this.pushGroup = pushGroup;
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
}
CompletableFuture<HttpResponse<T>> responseCF() {
return responseCF;
}
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
this.pushHandler = pushHandler;
}
synchronized HttpResponse.BodyHandler<T> getPushHandler() {
return this.pushHandler;
}
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return super.sendBodyAsync()
.whenComplete((ExchangeImpl<T> v, Throwable t)
-> pushGroup.pushError(Utils.getCompletionCause(t)));
}
@Override
CompletableFuture<ExchangeImpl<T>> () {
return super.sendHeadersAsync()
.whenComplete((ExchangeImpl<T> ex, Throwable t)
-> pushGroup.pushError(Utils.getCompletionCause(t)));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf = pushCF.whenComplete(
(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
if(executor!=null && !cf.isDone()) {
cf = cf.thenApplyAsync( r -> r, executor);
}
return cf;
}
@Override
CompletableFuture<T> readBodyAsync(
HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
return super.readBodyAsync(handler, returnConnectionToPool, executor)
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
void completeResponse(Response r) {
Log.logResponse(r::toString);
pushCF.complete(r);
CompletableFuture<Void> start = new MinimalFuture<>();
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
} else {
HttpResponseImpl<T> resp =
new HttpResponseImpl<>(r.request, r, null, body, getExchange());
responseCF.complete(resp);
}
});
start.completeAsync(() -> null, getExchange().executor());
}
@Override
void completeResponseExceptionally(Throwable t) {
pushCF.completeExceptionally(t);
}
@Override
protected void handleResponse() {
HttpHeaders responseHeaders = responseHeadersBuilder.build();
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElse(-1);
if (responseCode == -1) {
completeResponseExceptionally(new IOException("No status code"));
}
this.response = new Response(
pushReq, exchange, responseHeaders, connection(),
responseCode, HttpClient.Version.HTTP_2);
responseHeaders.firstValueAsLong("content-length");
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
sb.append(" (streamid=").append(streamid).append("):\n");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
rspHeadersConsumer.reset();
completeResponse(response);
}
}
final class StreamWindowUpdateSender extends WindowUpdateSender {
StreamWindowUpdateSender(Http2Connection connection) {
super(connection);
}
@Override
int getStreamId() {
return streamid;
}
@Override
String dbgString() {
String dbg = dbgString;
if (dbg != null) return dbg;
if (streamid == 0) {
return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
} else {
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
return dbgString = dbg;
}
}
}
synchronized boolean isCanceled() {
return errorRef.get() != null;
}
synchronized Throwable getCancelCause() {
return errorRef.get();
}
final String dbgString() {
return connection.dbgString() + "/Stream("+streamid+")";
}
private class extends Http2Connection.ValidatingHeadersConsumer {
void () {
super.reset();
responseHeadersBuilder.clear();
debug.log("Response builder cleared, ready to receive new headers.");
}
@Override
public void (CharSequence name, CharSequence value)
throws UncheckedIOException
{
String n = name.toString();
String v = value.toString();
super.onDecoded(n, v);
responseHeadersBuilder.addHeader(n, v);
if (Log.headers() && Log.trace()) {
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
streamid, n, v);
}
}
}
private static final VarHandle STREAM_STATE;
private static final VarHandle DEREGISTERED;
static {
try {
STREAM_STATE = MethodHandles.lookup()
.findVarHandle(Stream.class, "streamState", int.class);
DEREGISTERED = MethodHandles.lookup()
.findVarHandle(Stream.class, "deRegistered", boolean.class);
} catch (Exception x) {
throw new ExceptionInInitializerError(x);
}
}
}