/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;

import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.internal.common.Pair.pair;

Encapsulates multiple Exchanges belonging to one HttpRequestImpl. - manages filters - retries due to filters. - I/O errors and most other exceptions get returned directly to user Creates a new Exchange for each request/response interaction
/** * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. * - manages filters * - retries due to filters. * - I/O errors and most other exceptions get returned directly to user * * Creates a new Exchange for each request/response interaction */
class MultiExchange<U,T> { private final HttpRequest userRequest; // the user request private final HttpRequestImpl request; // a copy of the user request final AccessControlContext acc; final HttpClientImpl client; final HttpResponse.BodyHandler<T> responseHandler; final ExecutorWrapper execWrapper; final Executor executor; final HttpResponse.MultiProcessor<U,T> multiResponseHandler; HttpRequestImpl currentreq; // used for async only Exchange<T> exchange; // the current exchange Exchange<T> previous; int attempts; // Maximum number of times a request will be retried/redirected // for any reason static final int DEFAULT_MAX_ATTEMPTS = 5; static final int max_attempts = Utils.getIntegerNetProperty( "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS ); private final List<HeaderFilter> filters; TimedEvent timedEvent; volatile boolean cancelled; final PushGroup<U,T> pushGroup;
Filter fields. These are attached as required by filters and only used by the filter implementations. This could be generalised into Objects that are passed explicitly to the filters (one per MultiExchange object, and one per Exchange object possibly)
/** * Filter fields. These are attached as required by filters * and only used by the filter implementations. This could be * generalised into Objects that are passed explicitly to the filters * (one per MultiExchange object, and one per Exchange object possibly) */
volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; // RedirectHandler volatile int numberOfRedirects = 0;
MultiExchange with one final response.
/** * MultiExchange with one final response. */
MultiExchange(HttpRequest req, HttpClientImpl client, HttpResponse.BodyHandler<T> responseHandler) { this.previous = null; this.userRequest = req; this.request = new HttpRequestImpl(req); this.currentreq = request; this.attempts = 0; this.client = client; this.filters = client.filterChain(); if (System.getSecurityManager() != null) { this.acc = AccessController.getContext(); } else { this.acc = null; } this.execWrapper = new ExecutorWrapper(client.executor(), acc); this.executor = execWrapper.executor(); this.responseHandler = responseHandler; this.exchange = new Exchange<>(request, this); this.multiResponseHandler = null; this.pushGroup = null; }
MultiExchange with multiple responses (HTTP/2 server pushes).
/** * MultiExchange with multiple responses (HTTP/2 server pushes). */
MultiExchange(HttpRequest req, HttpClientImpl client, HttpResponse.MultiProcessor<U, T> multiResponseHandler) { this.previous = null; this.userRequest = req; this.request = new HttpRequestImpl(req); this.currentreq = request; this.attempts = 0; this.client = client; this.filters = client.filterChain(); if (System.getSecurityManager() != null) { this.acc = AccessController.getContext(); } else { this.acc = null; } this.execWrapper = new ExecutorWrapper(client.executor(), acc); this.executor = execWrapper.executor(); this.multiResponseHandler = multiResponseHandler; this.pushGroup = new PushGroup<>(multiResponseHandler, request); this.exchange = new Exchange<>(request, this); this.responseHandler = pushGroup.mainResponseHandler(); } public HttpResponseImpl<T> response() throws IOException, InterruptedException { HttpRequestImpl r = request; if (r.duration() != null) { timedEvent = new TimedEvent(r.duration()); client.registerTimer(timedEvent); } while (attempts < max_attempts) { try { attempts++; Exchange<T> currExchange = getExchange(); requestFilters(r); Response response = currExchange.response(); HttpRequestImpl newreq = responseFilters(response); if (newreq == null) { if (attempts > 1) { Log.logError("Succeeded on attempt: " + attempts); } T body = currExchange.readBody(responseHandler); cancelTimer(); return new HttpResponseImpl<>(userRequest, response, body, currExchange); } //response.body(HttpResponse.ignoreBody()); setExchange(new Exchange<>(newreq, this, acc)); r = newreq; } catch (IOException e) { if (cancelled) { throw new HttpTimeoutException("Request timed out"); } throw e; } } cancelTimer(); throw new IOException("Retry limit exceeded"); } CompletableFuture<Void> multiCompletionCF() { return pushGroup.groupResult(); } private synchronized Exchange<T> getExchange() { return exchange; } HttpClientImpl client() { return client; } HttpClient.Redirect followRedirects() { return client.followRedirects(); } HttpClient.Version version() { return request.version().orElse(client.version()); } private synchronized void setExchange(Exchange<T> exchange) { this.exchange = exchange; } private void cancelTimer() { if (timedEvent != null) { client.cancelTimer(timedEvent); } } private void requestFilters(HttpRequestImpl r) throws IOException { Log.logTrace("Applying request filters"); for (HeaderFilter filter : filters) { Log.logTrace("Applying {0}", filter); filter.request(r, this); } Log.logTrace("All filters applied"); } private HttpRequestImpl responseFilters(Response response) throws IOException { Log.logTrace("Applying response filters"); for (HeaderFilter filter : filters) { Log.logTrace("Applying {0}", filter); HttpRequestImpl newreq = filter.response(response); if (newreq != null) { Log.logTrace("New request: stopping filters"); return newreq; } } Log.logTrace("All filters applied"); return null; } public void cancel() { cancelled = true; getExchange().cancel(); } public void cancel(IOException cause) { cancelled = true; getExchange().cancel(cause); } public CompletableFuture<HttpResponseImpl<T>> responseAsync() { CompletableFuture<Void> start = new MinimalFuture<>(); CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); start.completeAsync( () -> null, executor); // trigger execution return cf; } private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) { return start.thenCompose( v -> responseAsyncImpl()) .thenCompose((Response r) -> { Exchange<T> exch = getExchange(); return exch.readBodyAsync(responseHandler) .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch)); }); } CompletableFuture<U> multiResponseAsync() { CompletableFuture<Void> start = new MinimalFuture<>(); CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); CompletableFuture<HttpResponse<T>> mainResponse = cf.thenApply((HttpResponseImpl<T> b) -> { multiResponseHandler.onResponse(b); return (HttpResponse<T>)b; }); pushGroup.setMainResponse(mainResponse); // set up house-keeping related to multi-response mainResponse.thenAccept((r) -> { // All push promises received by now. pushGroup.noMorePushes(true); }); CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); start.completeAsync( () -> null, executor); // trigger execution return res; } private CompletableFuture<Response> responseAsyncImpl() { CompletableFuture<Response> cf; if (++attempts > max_attempts) { cf = MinimalFuture.failedFuture(new IOException("Too many retries")); } else { if (currentreq.duration() != null) { timedEvent = new TimedEvent(currentreq.duration()); client.registerTimer(timedEvent); } try { // 1. Apply request filters requestFilters(currentreq); } catch (IOException e) { return MinimalFuture.failedFuture(e); } Exchange<T> exch = getExchange(); // 2. get response cf = exch.responseAsync() .thenCompose((Response response) -> { HttpRequestImpl newrequest = null; try { // 3. Apply response filters newrequest = responseFilters(response); } catch (IOException e) { return MinimalFuture.failedFuture(e); } // 4. Check filter result and repeat or continue if (newrequest == null) { if (attempts > 1) { Log.logError("Succeeded on attempt: " + attempts); } return MinimalFuture.completedFuture(response); } else { currentreq = newrequest; setExchange(new Exchange<>(currentreq, this, acc)); //reads body off previous, and then waits for next response return responseAsyncImpl(); } }) // 5. Handle errors and cancel any timer set .handle((response, ex) -> { cancelTimer(); if (ex == null) { assert response != null; return MinimalFuture.completedFuture(response); } // all exceptions thrown are handled here CompletableFuture<Response> error = getExceptionalCF(ex); if (error == null) { return responseAsyncImpl(); } else { return error; } }) .thenCompose(UnaryOperator.identity()); } return cf; }
Take a Throwable and return a suitable CompletableFuture that is completed exceptionally.
/** * Take a Throwable and return a suitable CompletableFuture that is * completed exceptionally. */
private CompletableFuture<Response> getExceptionalCF(Throwable t) { if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { if (t.getCause() != null) { t = t.getCause(); } } if (cancelled && t instanceof IOException) { t = new HttpTimeoutException("request timed out"); } return MinimalFuture.failedFuture(t); } class TimedEvent extends TimeoutEvent { TimedEvent(Duration duration) { super(duration); } @Override public void handle() { cancel(new HttpTimeoutException("request timed out")); } } }