package jdk.incubator.http;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
class Stream<T> extends ExchangeImpl<T> {
final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
protected volatile int streamid;
long responseContentLen = -1;
long responseBytesProcessed = 0;
long requestContentLen;
final Http2Connection connection;
HttpClientImpl client;
final HttpRequestImpl request;
final DecodingCallback ;
HttpHeadersImpl ;
final HttpHeadersImpl ;
final HttpHeadersImpl ;
HttpResponse.BodyProcessor<T> responseProcessor;
final HttpRequest.BodyProcessor requestProcessor;
volatile int responseCode;
volatile Response response;
volatile CompletableFuture<Response> responseCF;
final AbstractPushPublisher<ByteBuffer> publisher;
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
private volatile boolean remotelyClosed;
private volatile boolean closed;
private volatile boolean endStreamSent;
boolean requestSent, responseReceived, ;
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;
@Override
HttpConnection connection() {
return connection.connection;
}
@Override
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
Log.logTrace("Reading body on stream {0}", streamid);
responseProcessor = handler.apply(responseCode, responseHeaders);
setClientForResponse(responseProcessor);
publisher.subscribe(responseProcessor);
CompletableFuture<T> cf = receiveData(executor);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
cf = cf.whenComplete((t,e) -> pg.pushError(e));
}
return cf;
}
@Override
T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
throws IOException
{
CompletableFuture<T> cf = readBodyAsync(handler,
returnConnectionToPool,
null);
try {
return cf.join();
} catch (CompletionException e) {
throw Utils.getIOException(e);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("streamid: ")
.append(streamid);
return sb.toString();
}
private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
if (frame instanceof ResetFrame) {
handleReset((ResetFrame) frame);
return true;
} else if (!(frame instanceof DataFrame)) {
assert false;
return true;
}
DataFrame df = (DataFrame) frame;
int len = df.payloadLength();
ByteBufferReference[] buffers = df.getData();
for (ByteBufferReference b : buffers) {
ByteBuffer buf = b.get();
if (buf.hasRemaining()) {
publisher.acceptData(Optional.of(buf));
}
}
connection.windowUpdater.update(len);
if (df.getFlag(DataFrame.END_STREAM)) {
setEndStreamReceived();
publisher.acceptData(Optional.empty());
return false;
}
windowUpdater.update(len);
return true;
}
CompletableFuture<T> receiveData(Executor executor) {
CompletableFuture<T> cf = responseProcessor
.getBody()
.toCompletableFuture();
Consumer<Throwable> onError = e -> {
Log.logTrace("receiveData: {0}", e.toString());
e.printStackTrace();
cf.completeExceptionally(e);
publisher.acceptError(e);
};
if (executor == null) {
inputQ.blockingReceive(this::receiveDataFrame, onError);
} else {
inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
}
return cf;
}
@Override
void sendBody() throws IOException {
try {
sendBodyImpl().join();
} catch (CompletionException e) {
throw Utils.getIOException(e);
}
}
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
Stream(HttpClientImpl client,
Http2Connection connection,
Exchange<T> e,
WindowController windowController)
{
super(e);
this.client = client;
this.connection = connection;
this.windowController = windowController;
this.request = e.request();
this.requestProcessor = request.requestProcessor;
responseHeaders = new HttpHeadersImpl();
requestHeaders = new HttpHeadersImpl();
rspHeadersConsumer = (name, value) -> {
responseHeaders.addHeader(name.toString(), value.toString());
if (Log.headers() && Log.trace()) {
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
streamid, name, value);
}
};
this.requestPseudoHeaders = new HttpHeadersImpl();
this.publisher = new BlockingPushPublisher<>();
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
void incoming(Http2Frame frame) throws IOException {
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame)frame;
if (hframe.endHeaders()) {
Log.logTrace("handling response (streamid={0})", streamid);
handleResponse();
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
}
}
} else if (frame instanceof DataFrame) {
inputQ.put(frame);
} else {
otherFrame(frame);
}
}
void otherFrame(Http2Frame frame) throws IOException {
switch (frame.type()) {
case WindowUpdateFrame.TYPE:
incoming_windowUpdate((WindowUpdateFrame) frame);
break;
case ResetFrame.TYPE:
incoming_reset((ResetFrame) frame);
break;
case PriorityFrame.TYPE:
incoming_priority((PriorityFrame) frame);
break;
default:
String msg = "Unexpected frame: " + frame.toString();
throw new IOException(msg);
}
}
DecodingCallback () {
return rspHeadersConsumer;
}
protected void handleResponse() throws IOException {
synchronized(this) {
responseHeadersReceived = true;
}
HttpConnection c = connection.connection;
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
response = new Response(
request, exchange, responseHeaders,
responseCode, HttpClient.Version.HTTP_2);
this.responseContentLen = responseHeaders
.firstValueAsLong("content-length")
.orElse(-1L);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
completeResponse(response);
}
void incoming_reset(ResetFrame frame) throws IOException {
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
if (endStreamReceived()) {
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
boolean pushedToQueue = false;
synchronized(this) {
pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
}
if (!pushedToQueue) {
try {
handleReset(frame);
} catch (IOException io) {
completeResponseExceptionally(io);
}
} else {
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
}
}
}
void handleReset(ResetFrame frame) throws IOException {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
close();
int error = frame.getErrorCode();
throw new IOException(ErrorFrame.stringForCode(error));
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
}
void incoming_priority(PriorityFrame frame) {
throw new UnsupportedOperationException("Not implemented");
}
private void incoming_windowUpdate(WindowUpdateFrame frame)
throws IOException
{
int amount = frame.getUpdate();
if (amount <= 0) {
Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
streamid, streamid, amount);
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
} else {
assert streamid != 0;
boolean success = windowController.increaseStreamWindow(amount, streamid);
if (!success) {
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
}
}
}
void incoming_pushPromise(HttpRequestImpl pushReq,
PushedStream<?,T> pushStream)
throws IOException
{
if (Log.requests()) {
Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
}
PushGroup<?,T> pushGroup = exchange.getPushGroup();
if (pushGroup == null || pushGroup.noMorePushes()) {
cancelImpl(new IllegalStateException("unexpected push promise"
+ " on stream " + streamid));
}
HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
pushReq);
if (!bpOpt.isPresent()) {
IOException ex = new IOException("Stream "
+ streamid + " cancelled by user");
if (Log.trace()) {
Log.logTrace("No body processor for {0}: {1}", pushReq,
ex.getMessage());
}
pushStream.cancelImpl(ex);
cf.completeExceptionally(ex);
return;
}
pushGroup.addPush();
pushStream.requestSent();
pushStream.setPushHandler(bpOpt.get());
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
if (Log.trace()) {
Log.logTrace("Push completed on stream {0} for {1}{2}",
pushStream.streamid, resp,
((t==null) ? "": " with exception " + t));
}
if (t != null) {
pushGroup.pushError(t);
proc.onError(pushReq, t);
} else {
proc.onResponse(resp);
}
pushGroup.pushCompleted();
});
}
private OutgoingHeaders<Stream<T>> (long contentLength) {
HttpHeadersImpl h = request.getSystemHeaders();
if (contentLength > 0) {
h.setHeader("content-length", Long.toString(contentLength));
}
setPseudoHeaderFields();
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
if (contentLength == 0) {
f.setFlag(HeadersFrame.END_STREAM);
endStreamSent = true;
}
return f;
}
private void () {
HttpHeadersImpl hdrs = requestPseudoHeaders;
String method = request.method();
hdrs.setHeader(":method", method);
URI uri = request.uri();
hdrs.setHeader(":scheme", uri.getScheme());
hdrs.setHeader(":authority", uri.getAuthority());
String query = uri.getQuery();
String path = uri.getPath();
if (path == null || path.isEmpty()) {
if (method.equalsIgnoreCase("OPTIONS")) {
path = "*";
} else {
path = "/";
}
}
if (query != null) {
path += "?" + query;
}
hdrs.setHeader(":path", path);
}
HttpHeadersImpl () {
return requestPseudoHeaders;
}
@Override
Response getResponse() throws IOException {
try {
if (request.duration() != null) {
Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
streamid,
request.duration().toMillis());
return getResponseAsync(null).get(
request.duration().toMillis(), TimeUnit.MILLISECONDS);
} else {
Log.logTrace("Waiting for response (streamid={0})", streamid);
return getResponseAsync(null).join();
}
} catch (TimeoutException e) {
Log.logTrace("Response timeout (streamid={0})", streamid);
throw new HttpTimeoutException("Response timed out");
} catch (InterruptedException | ExecutionException | CompletionException e) {
Throwable t = e.getCause();
Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
if (t instanceof IOException) {
throw (IOException)t;
}
throw new IOException(e);
} finally {
Log.logTrace("Got response or failed (streamid={0})", streamid);
}
}
void setEndStreamReceived() {
assert remotelyClosed == false: "Unexpected endStream already set";
remotelyClosed = true;
responseReceived();
}
private boolean endStreamReceived() {
return remotelyClosed;
}
@Override
void () throws IOException, InterruptedException {
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
requestContentLen = requestProcessor.contentLength();
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
}
void registerStream(int id) {
this.streamid = id;
connection.putStream(this, streamid);
}
class RequestSubscriber
extends RequestProcessors.ProcessorBase
implements Flow.Subscriber<ByteBuffer>
{
private volatile long remainingContentLength;
private volatile Subscription subscription;
RequestSubscriber(long contentLen) {
this.remainingContentLength = contentLen;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException();
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
if (requestBodyCF.isDone()) {
throw new IllegalStateException();
}
try {
while (item.hasRemaining()) {
assert !endStreamSent : "internal error, send data after END_STREAM flag";
DataFrame df = getDataFrame(item);
if (remainingContentLength > 0) {
remainingContentLength -= df.getDataLength();
assert remainingContentLength >= 0;
if (remainingContentLength == 0) {
df.setFlag(DataFrame.END_STREAM);
endStreamSent = true;
}
}
connection.sendDataFrame(df);
}
subscription.request(1);
} catch (InterruptedException ex) {
subscription.cancel();
requestBodyCF.completeExceptionally(ex);
}
}
@Override
public void onError(Throwable throwable) {
if (requestBodyCF.isDone()) {
return;
}
subscription.cancel();
requestBodyCF.completeExceptionally(throwable);
}
@Override
public void onComplete() {
assert endStreamSent || remainingContentLength < 0;
try {
if (!endStreamSent) {
endStreamSent = true;
connection.sendDataFrame(getEmptyEndStreamDataFrame());
}
requestBodyCF.complete(null);
} catch (InterruptedException ex) {
requestBodyCF.completeExceptionally(ex);
}
}
}
DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
int actualAmount = windowController.tryAcquire(requestAmount, streamid);
ByteBuffer outBuf = Utils.slice(buffer, actualAmount);
DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
return df;
}
private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
}
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf = null;
synchronized (response_cfs) {
if (!response_cfs.isEmpty()) {
cf = response_cfs.remove(0);
assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
} else {
cf = new MinimalFuture<>();
response_cfs.add(cf);
}
}
if (executor != null && !cf.isDone()) {
cf = cf.thenApplyAsync(r -> r, executor);
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
cf = cf.whenComplete((t,e) -> pg.pushError(e));
}
return cf;
}
void completeResponse(Response resp) {
synchronized (response_cfs) {
CompletableFuture<Response> cf;
int cfs_len = response_cfs.size();
for (int i=0; i<cfs_len; i++) {
cf = response_cfs.get(i);
if (!cf.isDone()) {
Log.logTrace("Completing response (streamid={0}): {1}",
streamid, cf);
cf.complete(resp);
response_cfs.remove(cf);
return;
}
}
cf = MinimalFuture.completedFuture(resp);
Log.logTrace("Created completed future (streamid={0}): {1}",
streamid, cf);
response_cfs.add(cf);
}
}
synchronized void requestSent() {
requestSent = true;
if (responseReceived) {
close();
}
}
final synchronized boolean isResponseReceived() {
return responseReceived;
}
synchronized void responseReceived() {
responseReceived = true;
if (requestSent) {
close();
}
}
void completeResponseExceptionally(Throwable t) {
synchronized (response_cfs) {
for (int i = 0; i < response_cfs.size(); i++) {
CompletableFuture<Response> cf = response_cfs.get(i);
if (!cf.isDone()) {
cf.completeExceptionally(t);
response_cfs.remove(i);
return;
}
}
response_cfs.add(MinimalFuture.failedFuture(t));
}
}
CompletableFuture<Void> sendBodyImpl() {
RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
subscriber.setClient(client);
requestProcessor.subscribe(subscriber);
requestBodyCF.whenComplete((v,t) -> requestSent());
return requestBodyCF;
}
@Override
void cancel() {
cancel(new IOException("Stream " + streamid + " cancelled"));
}
@Override
void cancel(IOException cause) {
cancelImpl(cause);
}
void cancelImpl(Throwable e) {
if (Log.trace()) {
Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
}
boolean closing;
if (closing = !closed) {
synchronized (this) {
if (closing = !closed) {
closed=true;
}
}
}
if (closing) {
inputQ.close();
}
completeResponseExceptionally(e);
try {
if (streamid != 0) {
connection.resetStream(streamid, ResetFrame.CANCEL);
}
} catch (IOException ex) {
Log.logError(ex);
}
}
void close() {
if (closed) return;
synchronized(this) {
if (closed) return;
closed = true;
}
Log.logTrace("Closing stream {0}", streamid);
inputQ.close();
connection.closeStream(streamid);
Log.logTrace("Stream {0} closed", streamid);
}
static class PushedStream<U,T> extends Stream<T> {
final PushGroup<U,T> pushGroup;
private final Stream<T> parent;
final CompletableFuture<Response> pushCF;
final CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
Http2Connection connection, Stream<T> parent,
Exchange<T> pushReq) {
super(client, connection, pushReq, null);
this.pushGroup = pushGroup;
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
this.parent = parent;
}
CompletableFuture<HttpResponse<T>> responseCF() {
return responseCF;
}
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
this.pushHandler = pushHandler;
}
synchronized HttpResponse.BodyHandler<T> getPushHandler() {
return this.pushHandler;
}
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return super.sendBodyAsync()
.whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<ExchangeImpl<T>> () {
return super.sendHeadersAsync()
.whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
if(executor!=null && !cf.isDone()) {
cf = cf.thenApplyAsync( r -> r, executor);
}
return cf;
}
@Override
CompletableFuture<T> readBodyAsync(
HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
return super.readBodyAsync(handler, returnConnectionToPool, executor)
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
void completeResponse(Response r) {
HttpResponseImpl.logResponse(r);
pushCF.complete(r);
CompletableFuture<Void> start = new MinimalFuture<>();
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
} else {
HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
responseCF.complete(response);
}
});
start.completeAsync(() -> null, getExchange().executor());
}
@Override
void completeResponseExceptionally(Throwable t) {
pushCF.completeExceptionally(t);
}
@Override
synchronized void responseReceived() {
super.responseReceived();
}
@Override
protected void handleResponse() {
HttpConnection c = connection.connection;
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElse(-1);
if (responseCode == -1) {
completeResponseExceptionally(new IOException("No status code"));
}
this.response = new Response(
pushReq, exchange, responseHeaders,
responseCode, HttpClient.Version.HTTP_2);
this.responseContentLen = responseHeaders
.firstValueAsLong("content-length")
.orElse(-1L);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
sb.append(" (streamid=").append(streamid).append("): ");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
completeResponse(response);
}
}
final class StreamWindowUpdateSender extends WindowUpdateSender {
StreamWindowUpdateSender(Http2Connection connection) {
super(connection);
}
@Override
int getStreamId() {
return streamid;
}
}
}