package jdk.incubator.http;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.internal.common.Pair.pair;
class MultiExchange<U,T> {
private final HttpRequest userRequest;
private final HttpRequestImpl request;
final AccessControlContext acc;
final HttpClientImpl client;
final HttpResponse.BodyHandler<T> responseHandler;
final ExecutorWrapper execWrapper;
final Executor executor;
final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
HttpRequestImpl currentreq;
Exchange<T> exchange;
Exchange<T> previous;
int attempts;
static final int DEFAULT_MAX_ATTEMPTS = 5;
static final int max_attempts = Utils.getIntegerNetProperty(
"jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
);
private final List<HeaderFilter> filters;
TimedEvent timedEvent;
volatile boolean cancelled;
final PushGroup<U,T> pushGroup;
volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
volatile int numberOfRedirects = 0;
MultiExchange(HttpRequest req,
HttpClientImpl client,
HttpResponse.BodyHandler<T> responseHandler) {
this.previous = null;
this.userRequest = req;
this.request = new HttpRequestImpl(req);
this.currentreq = request;
this.attempts = 0;
this.client = client;
this.filters = client.filterChain();
if (System.getSecurityManager() != null) {
this.acc = AccessController.getContext();
} else {
this.acc = null;
}
this.execWrapper = new ExecutorWrapper(client.executor(), acc);
this.executor = execWrapper.executor();
this.responseHandler = responseHandler;
this.exchange = new Exchange<>(request, this);
this.multiResponseHandler = null;
this.pushGroup = null;
}
MultiExchange(HttpRequest req,
HttpClientImpl client,
HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
this.previous = null;
this.userRequest = req;
this.request = new HttpRequestImpl(req);
this.currentreq = request;
this.attempts = 0;
this.client = client;
this.filters = client.filterChain();
if (System.getSecurityManager() != null) {
this.acc = AccessController.getContext();
} else {
this.acc = null;
}
this.execWrapper = new ExecutorWrapper(client.executor(), acc);
this.executor = execWrapper.executor();
this.multiResponseHandler = multiResponseHandler;
this.pushGroup = new PushGroup<>(multiResponseHandler, request);
this.exchange = new Exchange<>(request, this);
this.responseHandler = pushGroup.mainResponseHandler();
}
public HttpResponseImpl<T> response() throws IOException, InterruptedException {
HttpRequestImpl r = request;
if (r.duration() != null) {
timedEvent = new TimedEvent(r.duration());
client.registerTimer(timedEvent);
}
while (attempts < max_attempts) {
try {
attempts++;
Exchange<T> currExchange = getExchange();
requestFilters(r);
Response response = currExchange.response();
HttpRequestImpl newreq = responseFilters(response);
if (newreq == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
}
T body = currExchange.readBody(responseHandler);
cancelTimer();
return new HttpResponseImpl<>(userRequest, response, body, currExchange);
}
setExchange(new Exchange<>(newreq, this, acc));
r = newreq;
} catch (IOException e) {
if (cancelled) {
throw new HttpTimeoutException("Request timed out");
}
throw e;
}
}
cancelTimer();
throw new IOException("Retry limit exceeded");
}
CompletableFuture<Void> multiCompletionCF() {
return pushGroup.groupResult();
}
private synchronized Exchange<T> getExchange() {
return exchange;
}
HttpClientImpl client() {
return client;
}
HttpClient.Redirect followRedirects() {
return client.followRedirects();
}
HttpClient.Version version() {
return request.version().orElse(client.version());
}
private synchronized void setExchange(Exchange<T> exchange) {
this.exchange = exchange;
}
private void cancelTimer() {
if (timedEvent != null) {
client.cancelTimer(timedEvent);
}
}
private void requestFilters(HttpRequestImpl r) throws IOException {
Log.logTrace("Applying request filters");
for (HeaderFilter filter : filters) {
Log.logTrace("Applying {0}", filter);
filter.request(r, this);
}
Log.logTrace("All filters applied");
}
private HttpRequestImpl responseFilters(Response response) throws IOException
{
Log.logTrace("Applying response filters");
for (HeaderFilter filter : filters) {
Log.logTrace("Applying {0}", filter);
HttpRequestImpl newreq = filter.response(response);
if (newreq != null) {
Log.logTrace("New request: stopping filters");
return newreq;
}
}
Log.logTrace("All filters applied");
return null;
}
public void cancel() {
cancelled = true;
getExchange().cancel();
}
public void cancel(IOException cause) {
cancelled = true;
getExchange().cancel(cause);
}
public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
start.completeAsync( () -> null, executor);
return cf;
}
private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
return start.thenCompose( v -> responseAsyncImpl())
.thenCompose((Response r) -> {
Exchange<T> exch = getExchange();
return exch.readBodyAsync(responseHandler)
.thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch));
});
}
CompletableFuture<U> multiResponseAsync() {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
CompletableFuture<HttpResponse<T>> mainResponse =
cf.thenApply((HttpResponseImpl<T> b) -> {
multiResponseHandler.onResponse(b);
return (HttpResponse<T>)b;
});
pushGroup.setMainResponse(mainResponse);
mainResponse.thenAccept((r) -> {
pushGroup.noMorePushes(true);
});
CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
start.completeAsync( () -> null, executor);
return res;
}
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (++attempts > max_attempts) {
cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
} else {
if (currentreq.duration() != null) {
timedEvent = new TimedEvent(currentreq.duration());
client.registerTimer(timedEvent);
}
try {
requestFilters(currentreq);
} catch (IOException e) {
return MinimalFuture.failedFuture(e);
}
Exchange<T> exch = getExchange();
cf = exch.responseAsync()
.thenCompose((Response response) -> {
HttpRequestImpl newrequest = null;
try {
newrequest = responseFilters(response);
} catch (IOException e) {
return MinimalFuture.failedFuture(e);
}
if (newrequest == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
}
return MinimalFuture.completedFuture(response);
} else {
currentreq = newrequest;
setExchange(new Exchange<>(currentreq, this, acc));
return responseAsyncImpl();
}
})
.handle((response, ex) -> {
cancelTimer();
if (ex == null) {
assert response != null;
return MinimalFuture.completedFuture(response);
}
CompletableFuture<Response> error = getExceptionalCF(ex);
if (error == null) {
return responseAsyncImpl();
} else {
return error;
}
})
.thenCompose(UnaryOperator.identity());
}
return cf;
}
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
if (t.getCause() != null) {
t = t.getCause();
}
}
if (cancelled && t instanceof IOException) {
t = new HttpTimeoutException("request timed out");
}
return MinimalFuture.failedFuture(t);
}
class TimedEvent extends TimeoutEvent {
TimedEvent(Duration duration) {
super(duration);
}
@Override
public void handle() {
cancel(new HttpTimeoutException("request timed out"));
}
}
}