/*
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.ConnectException;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.WebSocket;
import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Pair;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.common.OperationTrackers.Trackable;
import jdk.internal.net.http.common.OperationTrackers.Tracker;
import jdk.internal.net.http.websocket.BuilderImpl;
import jdk.internal.misc.InnocuousThread;
Client implementation. Contains all configuration information and also
the selector manager thread which allows async events to be registered
and delivered when they occur. See AsyncEvent.
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
final class HttpClientImpl extends HttpClient implements Trackable {
static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag
static final boolean DEBUGTIMEOUT = false; // dev flag
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
static final AtomicLong CLIENT_IDS = new AtomicLong();
// Define the default factory as a static inner class
// that embeds all the necessary logic to avoid
// the risk of using a lambda that might keep a reference on the
// HttpClient instance from which it was created (helps with
// heapdump analysis).
private static final class DefaultThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger();
DefaultThreadFactory(long clientID) {
namePrefix = "HttpClient-" + clientID + "-Worker-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndIncrement();
Thread t;
if (System.getSecurityManager() == null) {
t = new Thread(null, r, name, 0, false);
} else {
t = InnocuousThread.newThread(name, r);
}
t.setDaemon(true);
return t;
}
}
A DelegatingExecutor is an executor that delegates tasks to
a wrapped executor when it detects that the current thread
is the SelectorManager thread. If the current thread is not
the selector manager thread the given task is executed inline.
/**
* A DelegatingExecutor is an executor that delegates tasks to
* a wrapped executor when it detects that the current thread
* is the SelectorManager thread. If the current thread is not
* the selector manager thread the given task is executed inline.
*/
final static class DelegatingExecutor implements Executor {
private final BooleanSupplier isInSelectorThread;
private final Executor delegate;
DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
this.isInSelectorThread = isInSelectorThread;
this.delegate = delegate;
}
Executor delegate() {
return delegate;
}
@Override
public void execute(Runnable command) {
if (isInSelectorThread.getAsBoolean()) {
delegate.execute(command);
} else {
command.run();
}
}
}
private final CookieHandler cookieHandler;
private final Duration connectTimeout;
private final Redirect followRedirects;
private final Optional<ProxySelector> userProxySelector;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
private final DelegatingExecutor delegatingExecutor;
private final boolean isDefaultExecutor;
// Security parameters
private final SSLContext sslContext;
private final SSLParameters sslParams;
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
private final long id;
private final String dbgTag;
// The SSL DirectBuffer Supplier provides the ability to recycle
// buffers used between the socket reader and the SSLEngine, or
// more precisely between the SocketTube publisher and the
// SSLFlowDelegate reader.
private final SSLDirectBufferSupplier sslBufferSupplier
= new SSLDirectBufferSupplier(this);
// This reference is used to keep track of the facade HttpClient
// that was returned to the application code.
// It makes it possible to know when the application no longer
// holds any reference to the HttpClient.
// Unfortunately, this information is not enough to know when
// to exit the SelectorManager thread. Because of the asynchronous
// nature of the API, we also need to wait until all pending operations
// have completed.
private final WeakReference<HttpClientFacade> facadeRef;
// This counter keeps track of the number of operations pending
// on the HttpClient. The SelectorManager thread will wait
// until there are no longer any pending operations and the
// facadeRef is cleared before exiting.
//
// The pendingOperationCount is incremented every time a send/sendAsync
// operation is invoked on the HttpClient, and is decremented when
// the HttpResponse<T> object is returned to the user.
// However, at this point, the body may not have been fully read yet.
// This is the case when the response T is implemented as a streaming
// subscriber (such as an InputStream).
//
// To take care of this issue the pendingOperationCount will additionally
// be incremented/decremented in the following cases:
//
// 1. For HTTP/2 it is incremented when a stream is added to the
// Http2Connection streams map, and decreased when the stream is removed
// from the map. This should also take care of push promises.
// 2. For WebSocket the count is increased when creating a
// DetachedConnectionChannel for the socket, and decreased
// when the channel is closed.
// In addition, the HttpClient facade is passed to the WebSocket builder,
// (instead of the client implementation delegate).
// 3. For HTTP/1.1 the count is incremented before starting to parse the body
// response, and decremented when the parser has reached the end of the
// response body flow.
//
// This should ensure that the selector manager thread remains alive until
// the response has been fully received or the web socket is closed.
private final AtomicLong pendingOperationCount = new AtomicLong();
private final AtomicLong pendingWebSocketCount = new AtomicLong();
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
A Set of, deadline first, ordered timeout events. /** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts;
This is a bit tricky:
1. an HttpClientFacade has a final HttpClientImpl field.
2. an HttpClientImpl has a final WeakReference field,
where the referent is the facade created for that instance.
3. We cannot just create the HttpClientFacade in the HttpClientImpl
constructor, because it would be only weakly referenced and could
be GC'ed before we can return it.
The solution is to use an instance of SingleFacadeFactory which will
allow the caller of new HttpClientImpl(...) to retrieve the facade
after the HttpClientImpl has been created.
/**
* This is a bit tricky:
* 1. an HttpClientFacade has a final HttpClientImpl field.
* 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
* where the referent is the facade created for that instance.
* 3. We cannot just create the HttpClientFacade in the HttpClientImpl
* constructor, because it would be only weakly referenced and could
* be GC'ed before we can return it.
* The solution is to use an instance of SingleFacadeFactory which will
* allow the caller of new HttpClientImpl(...) to retrieve the facade
* after the HttpClientImpl has been created.
*/
private static final class SingleFacadeFactory {
HttpClientFacade facade;
HttpClientFacade createFacade(HttpClientImpl impl) {
assert facade == null;
return (facade = new HttpClientFacade(impl));
}
}
static HttpClientFacade create(HttpClientBuilderImpl builder) {
SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
impl.start();
assert facadeFactory.facade != null;
assert impl.facadeRef.get() == facadeFactory.facade;
return facadeFactory.facade;
}
private HttpClientImpl(HttpClientBuilderImpl builder,
SingleFacadeFactory facadeFactory) {
id = CLIENT_IDS.incrementAndGet();
dbgTag = "HttpClientImpl(" + id +")";
if (builder.sslContext == null) {
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
throw new InternalError(ex);
}
} else {
sslContext = builder.sslContext;
}
Executor ex = builder.executor;
if (ex == null) {
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
isDefaultExecutor = true;
} else {
isDefaultExecutor = false;
}
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
cookieHandler = builder.cookieHandler;
connectTimeout = builder.connectTimeout;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
this.userProxySelector = Optional.ofNullable(builder.proxy);
this.proxySelector = userProxySelector
.orElseGet(HttpClientImpl::getDefaultProxySelector);
if (debug.on())
debug.log("proxySelector is %s (user-supplied=%s)",
this.proxySelector, userProxySelector.isPresent());
authenticator = builder.authenticator;
if (builder.version == null) {
version = HttpClient.Version.HTTP_2;
} else {
version = builder.version;
}
if (builder.sslParams == null) {
sslParams = getDefaultParams(sslContext);
} else {
sslParams = builder.sslParams;
}
connections = new ConnectionPool(id);
connections.start();
timeouts = new TreeSet<>();
try {
selmgr = new SelectorManager(this);
} catch (IOException e) {
// unlikely
throw new InternalError(e);
}
selmgr.setDaemon(true);
filters = new FilterFactory();
initFilters();
assert facadeRef.get() != null;
}
private void start() {
selmgr.start();
}
// Called from the SelectorManager thread, just before exiting.
// Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
// that may be still lingering there are properly closed (and their
// possibly still opened SocketChannel released).
private void stop() {
// Clears HTTP/1.1 cache and close its connections
connections.stop();
// Clears HTTP/2 cache and close its connections.
client2.stop();
}
private static SSLParameters getDefaultParams(SSLContext ctx) {
SSLParameters params = ctx.getSupportedSSLParameters();
String[] protocols = params.getProtocols();
boolean found13 = false;
for (String proto : protocols) {
if (proto.equals("TLSv1.3")) {
found13 = true;
break;
}
}
if (found13)
params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"});
else
params.setProtocols(new String[] {"TLSv1.2"});
return params;
}
private static ProxySelector getDefaultProxySelector() {
PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
return AccessController.doPrivileged(action);
}
// Returns the facade that was returned to the application code.
// May be null if that facade is no longer referenced.
final HttpClientFacade facade() {
return facadeRef.get();
}
// Increments the pendingOperationCount.
final long reference() {
pendingHttpRequestCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long unreference() {
final long count = pendingOperationCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Increments the pendingOperationCount.
final long streamReference() {
pendingHttp2StreamCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long streamUnreference() {
final long count = pendingOperationCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Increments the pendingOperationCount.
final long webSocketOpen() {
pendingWebSocketCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long webSocketClose() {
final long count = pendingOperationCount.decrementAndGet();
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long http2Count = pendingHttp2StreamCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Returns the pendingOperationCount.
final long referenceCount() {
return pendingOperationCount.get();
}
final static class HttpClientTracker implements Tracker {
final AtomicLong httpCount;
final AtomicLong http2Count;
final AtomicLong websocketCount;
final AtomicLong operationsCount;
final Reference<?> reference;
final String name;
HttpClientTracker(AtomicLong http,
AtomicLong http2,
AtomicLong ws,
AtomicLong ops,
Reference<?> ref,
String name) {
this.httpCount = http;
this.http2Count = http2;
this.websocketCount = ws;
this.operationsCount = ops;
this.reference = ref;
this.name = name;
}
@Override
public long getOutstandingOperations() {
return operationsCount.get();
}
@Override
public long getOutstandingHttpOperations() {
return httpCount.get();
}
@Override
public long getOutstandingHttp2Streams() { return http2Count.get(); }
@Override
public long getOutstandingWebSocketOperations() {
return websocketCount.get();
}
@Override
public boolean isFacadeReferenced() {
return reference.get() != null;
}
@Override
public String getName() {
return name;
}
}
public Tracker getOperationsTracker() {
return new HttpClientTracker(pendingHttpRequestCount,
pendingHttp2StreamCount,
pendingWebSocketCount,
pendingOperationCount,
facadeRef,
dbgTag);
}
// Called by the SelectorManager thread to figure out whether it's time
// to terminate.
final boolean isReferenced() {
HttpClient facade = facade();
return facade != null || referenceCount() > 0;
}
Wait for activity on given exchange.
The following occurs in the SelectorManager thread.
1) add to selector
2) If selector fires for this exchange then
call AsyncEvent.handle()
If exchange needs to change interest ops, then call registerEvent() again.
/**
* Wait for activity on given exchange.
* The following occurs in the SelectorManager thread.
*
* 1) add to selector
* 2) If selector fires for this exchange then
* call AsyncEvent.handle()
*
* If exchange needs to change interest ops, then call registerEvent() again.
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
}
Allows an AsyncEvent to modify its interestOps.
Params: - event – The modified event.
/**
* Allows an AsyncEvent to modify its interestOps.
* @param event The modified event.
*/
void eventUpdated(AsyncEvent event) throws ClosedChannelException {
assert !(event instanceof AsyncTriggerEvent);
selmgr.eventUpdated(event);
}
boolean isSelectorThread() {
return Thread.currentThread() == selmgr;
}
Http2ClientImpl client2() {
return client2;
}
private void debugCompleted(String tag, long startNanos, HttpRequest req) {
if (debugelapsed.on()) {
debugelapsed.log(tag + " elapsed "
+ (System.nanoTime() - startNanos)/1000_000L
+ " millis for " + req.method()
+ " to " + req.uri());
}
}
@Override
public <T> HttpResponse<T>
send(HttpRequest req, BodyHandler<T> responseHandler)
throws IOException, InterruptedException
{
CompletableFuture<HttpResponse<T>> cf = null;
try {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
cf.cancel(true);
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
final String msg = throwable.getMessage();
if (throwable instanceof IllegalArgumentException) {
throw new IllegalArgumentException(msg, throwable);
} else if (throwable instanceof SecurityException) {
throw new SecurityException(msg, throwable);
} else if (throwable instanceof HttpConnectTimeoutException) {
HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
hcte.initCause(throwable);
throw hcte;
} else if (throwable instanceof HttpTimeoutException) {
throw new HttpTimeoutException(msg);
} else if (throwable instanceof ConnectException) {
ConnectException ce = new ConnectException(msg);
ce.initCause(throwable);
throw ce;
} else if (throwable instanceof IOException) {
throw new IOException(msg, throwable);
} else {
throw new IOException(msg, throwable);
}
}
}
private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
@Override
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
{
return sendAsync(userRequest, responseHandler, null);
}
@Override
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler) {
return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
}
private <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler,
Executor exchangeExecutor) {
Objects.requireNonNull(userRequest);
Objects.requireNonNull(responseHandler);
AccessControlContext acc = null;
if (System.getSecurityManager() != null)
acc = AccessController.getContext();
// Clone the, possibly untrusted, HttpRequest
HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
if (requestImpl.method().equals("CONNECT"))
throw new IllegalArgumentException("Unsupported method CONNECT");
long start = DEBUGELAPSED ? System.nanoTime() : 0;
reference();
try {
if (debugelapsed.on())
debugelapsed.log("ClientImpl (async) send %s", userRequest);
// When using sendAsync(...) we explicitly pass the
// executor's delegate as exchange executor to force
// asynchronous scheduling of the exchange.
// When using send(...) we don't specify any executor
// and default to using the client's delegating executor
// which only spawns asynchronous tasks if it detects
// that the current thread is the selector manager
// thread. This will cause everything to execute inline
// until we need to schedule some event with the selector.
Executor executor = exchangeExecutor == null
? this.delegatingExecutor : exchangeExecutor;
MultiExchange<T> mex = new MultiExchange<>(userRequest,
requestImpl,
this,
responseHandler,
pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
mex.responseAsync(executor).whenComplete((b,t) -> unreference());
if (DEBUGELAPSED) {
res = res.whenComplete(
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}
return res;
} catch(Throwable t) {
unreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
}
}
// Main loop for this client's selector
private final static class SelectorManager extends Thread {
// For testing purposes we have an internal System property that
// can control the frequency at which the selector manager will wake
// up when there are no pending operations.
// Increasing the frequency (shorter delays) might allow the selector
// to observe that the facade is no longer referenced and might allow
// the selector thread to terminate more timely - for when nothing is
// ongoing it will only check for that condition every NODEADLINE ms.
// To avoid misuse of the property, the delay that can be specified
// is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
// value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
// The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
private static final int MIN_NODEADLINE = 1000; // ms
private static final int MAX_NODEADLINE = 1000 * 1200; // ms
private static final int DEF_NODEADLINE = 3000; // ms
private static final long NODEADLINE; // default is DEF_NODEADLINE ms
static {
// ensure NODEADLINE is initialized with some valid value.
long deadline = Utils.getIntegerProperty(
"jdk.internal.httpclient.selectorTimeout",
DEF_NODEADLINE); // millis
if (deadline <= 0) deadline = DEF_NODEADLINE;
deadline = Math.max(deadline, MIN_NODEADLINE);
NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
}
private final Selector selector;
private volatile boolean closed;
private final List<AsyncEvent> registrations;
private final List<AsyncTriggerEvent> deregistrations;
private final Logger debug;
private final Logger debugtimeout;
HttpClientImpl owner;
ConnectionPool pool;
SelectorManager(HttpClientImpl ref) throws IOException {
super(null, null,
"HttpClient-" + ref.id + "-SelectorManager",
0, false);
owner = ref;
debug = ref.debug;
debugtimeout = ref.debugtimeout;
pool = ref.connectionPool();
registrations = new ArrayList<>();
deregistrations = new ArrayList<>();
selector = Selector.open();
}
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
if (key != null && key.isValid()) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
sa.register(e);
} else if (e.interestOps() != 0){
// We don't care about paused events.
// These are actually handled by
// SelectorAttachment::resetInterestOps later on.
// But if we reach here when trying to resume an
// event then it's better to fail fast.
if (debug.on()) debug.log("No key for channel");
e.abort(new IOException("No key for channel"));
}
} else {
register(e);
}
}
// This returns immediately. So caller not allowed to send/receive
// on connection.
synchronized void register(AsyncEvent e) {
registrations.add(e);
selector.wakeup();
}
synchronized void cancel(SocketChannel e) {
SelectionKey key = e.keyFor(selector);
if (key != null) {
key.cancel();
}
selector.wakeup();
}
void wakeupSelector() {
selector.wakeup();
}
synchronized void shutdown() {
Log.logTrace("{0}: shutting down", getName());
if (debug.on()) debug.log("SelectorManager shutting down");
closed = true;
try {
selector.close();
} catch (IOException ignored) {
} finally {
owner.stop();
}
}
@Override
public void run() {
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
List<AsyncEvent> readyList = new ArrayList<>();
List<Runnable> resetList = new ArrayList<>();
try {
if (Log.channel()) Log.logChannel(getName() + ": starting");
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
assert errorList.isEmpty();
assert readyList.isEmpty();
assert resetList.isEmpty();
for (AsyncTriggerEvent event : deregistrations) {
event.handle();
}
deregistrations.clear();
for (AsyncEvent event : registrations) {
if (event instanceof AsyncTriggerEvent) {
readyList.add(event);
continue;
}
SelectableChannel chan = event.channel();
SelectionKey key = null;
try {
key = chan.keyFor(selector);
SelectorAttachment sa;
if (key == null || !key.isValid()) {
if (key != null) {
// key is canceled.
// invoke selectNow() to purge it
// before registering the new event.
selector.selectNow();
}
sa = new SelectorAttachment(chan, selector);
} else {
sa = (SelectorAttachment) key.attachment();
}
// may throw IOE if channel closed: that's OK
sa.register(event);
if (!chan.isOpen()) {
throw new IOException("Channel closed");
}
} catch (IOException e) {
Log.logTrace("{0}: {1}", getName(), e);
if (debug.on())
debug.log("Got " + e.getClass().getName()
+ " while handling registration events");
chan.close();
// let the event abort deal with it
errorList.add(new Pair<>(event, e));
if (key != null) {
key.cancel();
selector.selectNow();
}
}
}
registrations.clear();
selector.selectedKeys().clear();
}
for (AsyncEvent event : readyList) {
assert event instanceof AsyncTriggerEvent;
event.handle();
}
readyList.clear();
for (Pair<AsyncEvent,IOException> error : errorList) {
// an IOException was raised and the channel closed.
handleEvent(error.first, error.second);
}
errorList.clear();
// Check whether client is still alive, and if not,
// gracefully stop this thread
if (!owner.isReferenced()) {
Log.logTrace("{0}: {1}",
getName(),
"HttpClient no longer referenced. Exiting...");
return;
}
// Timeouts will have milliseconds granularity. It is important
// to handle them in a timely fashion.
long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
if (debugtimeout.on())
debugtimeout.log("next timeout: %d", nextTimeout);
// Keep-alive have seconds granularity. It's not really an
// issue if we keep connections linger a bit more in the keep
// alive cache.
long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
if (debugtimeout.on())
debugtimeout.log("next expired: %d", nextExpiry);
assert nextTimeout >= 0;
assert nextExpiry >= 0;
// Don't wait for ever as it might prevent the thread to
// stop gracefully. millis will be 0 if no deadline was found.
if (nextTimeout <= 0) nextTimeout = NODEADLINE;
// Clip nextExpiry at NODEADLINE limit. The default
// keep alive is 1200 seconds (half an hour) - we don't
// want to wait that long.
if (nextExpiry <= 0) nextExpiry = NODEADLINE;
else nextExpiry = Math.min(NODEADLINE, nextExpiry);
// takes the least of the two.
long millis = Math.min(nextExpiry, nextTimeout);
if (debugtimeout.on())
debugtimeout.log("Next deadline is %d",
(millis == 0 ? NODEADLINE : millis));
//debugPrint(selector);
int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
// Check whether client is still alive, and if not,
// gracefully stop this thread
if (!owner.isReferenced()) {
Log.logTrace("{0}: {1}",
getName(),
"HttpClient no longer referenced. Exiting...");
return;
}
owner.purgeTimeoutsAndReturnNextDeadline();
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
assert errorList.isEmpty();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
if (!key.isValid()) {
IOException ex = sa.chan.isOpen()
? new IOException("Invalid key")
: new ClosedChannelException();
sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
sa.pending.clear();
continue;
}
int eventsOccurred;
try {
eventsOccurred = key.readyOps();
} catch (CancelledKeyException ex) {
IOException io = Utils.getIOException(ex);
sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
sa.pending.clear();
continue;
}
sa.events(eventsOccurred).forEach(readyList::add);
resetList.add(() -> sa.resetInterestOps(eventsOccurred));
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
// handle selected events
readyList.forEach((e) -> handleEvent(e, null));
readyList.clear();
// handle errors (closed channels etc...)
errorList.forEach((p) -> handleEvent(p.first, p.second));
errorList.clear();
// reset interest ops for selected channels
resetList.forEach(r -> r.run());
resetList.clear();
}
} catch (Throwable e) {
if (!closed) {
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("{0}: {1}: {2}", getName(),
"HttpClientImpl shutting down due to fatal error", err);
}
if (debug.on()) debug.log("shutting down", e);
if (Utils.ASSERTIONSENABLED && !debug.on()) {
e.printStackTrace(System.err); // always print the stack
}
} finally {
if (Log.channel()) Log.logChannel(getName() + ": stopping");
shutdown();
}
}
// void debugPrint(Selector selector) {
// System.err.println("Selector: debugprint start");
// Set<SelectionKey> keys = selector.keys();
// for (SelectionKey key : keys) {
// SelectableChannel c = key.channel();
// int ops = key.interestOps();
// System.err.printf("selector chan:%s ops:%d\n", c, ops);
// }
// System.err.println("Selector: debugprint end");
// }
Handles the given event. The given ioe may be null. /** Handles the given event. The given ioe may be null. */
void handleEvent(AsyncEvent event, IOException ioe) {
if (closed || ioe != null) {
event.abort(ioe);
} else {
event.handle();
}
}
}
final String debugInterestOps(SelectableChannel channel) {
try {
SelectionKey key = channel.keyFor(selmgr.selector);
if (key == null) return "channel not registered with selector";
String keyInterestOps = key.isValid()
? "key.interestOps=" + key.interestOps() : "invalid key";
return String.format("channel registered with selector, %s, sa.interestOps=%s",
keyInterestOps,
((SelectorAttachment)key.attachment()).interestOps);
} catch (Throwable t) {
return String.valueOf(t);
}
}
Tracks multiple user level registrations associated with one NIO
registration (SelectionKey). In this implementation, registrations
are one-off and when an event is posted the registration is cancelled
until explicitly registered again.
No external synchronization required as this class is only used
by the SelectorManager thread. One of these objects required per
connection.
/**
* Tracks multiple user level registrations associated with one NIO
* registration (SelectionKey). In this implementation, registrations
* are one-off and when an event is posted the registration is cancelled
* until explicitly registered again.
*
* <p> No external synchronization required as this class is only used
* by the SelectorManager thread. One of these objects required per
* connection.
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
private final Set<AsyncEvent> pending;
private final static Logger debug =
Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new HashSet<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
int newOps = e.interestOps();
// re register interest if we are not already interested
// in the event. If the event is paused, then the pause will
// be taken into account later when resetInterestOps is called.
boolean reRegister = (interestOps & newOps) != newOps;
interestOps |= newOps;
pending.add(e);
if (debug.on())
debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
if (reRegister) {
// first time registration happens here also
try {
chan.register(selector, interestOps, this);
} catch (Throwable x) {
abortPending(x);
}
} else if (!chan.isOpen()) {
abortPending(new ClosedChannelException());
}
}
Returns a Stream containing only events that are registered with the given interestOps
. /**
* Returns a Stream<AsyncEvents> containing only events that are
* registered with the given {@code interestOps}.
*/
Stream<AsyncEvent> events(int interestOps) {
return pending.stream()
.filter(ev -> (ev.interestOps() & interestOps) != 0);
}
Removes any events with the given interestOps
, and if no events remaining, cancels the associated SelectionKey. /**
* Removes any events with the given {@code interestOps}, and if no
* events remaining, cancels the associated SelectionKey.
*/
void resetInterestOps(int interestOps) {
int newOps = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
if (event.repeating()) {
newOps |= evops;
continue;
}
if ((evops & interestOps) != 0) {
itr.remove();
} else {
newOps |= evops;
}
}
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
if (newOps == 0 && key != null && pending.isEmpty()) {
key.cancel();
} else {
try {
if (key == null || !key.isValid()) {
throw new CancelledKeyException();
}
key.interestOps(newOps);
// double check after
if (!chan.isOpen()) {
abortPending(new ClosedChannelException());
return;
}
assert key.interestOps() == newOps;
} catch (CancelledKeyException x) {
// channel may have been closed
if (debug.on()) debug.log("key cancelled for " + chan);
abortPending(x);
}
}
}
void abortPending(Throwable x) {
if (!pending.isEmpty()) {
AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
pending.clear();
IOException io = Utils.getIOException(x);
for (AsyncEvent event : evts) {
event.abort(io);
}
}
}
}
/*package-private*/ SSLContext theSSLContext() {
return sslContext;
}
@Override
public SSLContext sslContext() {
return sslContext;
}
@Override
public SSLParameters sslParameters() {
return Utils.copySSLParameters(sslParams);
}
@Override
public Optional<Authenticator> authenticator() {
return Optional.ofNullable(authenticator);
}
/*package-private*/ final DelegatingExecutor theExecutor() {
return delegatingExecutor;
}
@Override
public final Optional<Executor> executor() {
return isDefaultExecutor
? Optional.empty()
: Optional.of(delegatingExecutor.delegate());
}
ConnectionPool connectionPool() {
return connections;
}
@Override
public Redirect followRedirects() {
return followRedirects;
}
@Override
public Optional<CookieHandler> cookieHandler() {
return Optional.ofNullable(cookieHandler);
}
@Override
public Optional<Duration> connectTimeout() {
return Optional.ofNullable(connectTimeout);
}
@Override
public Optional<ProxySelector> proxy() {
return this.userProxySelector;
}
// Return the effective proxy that this client uses.
ProxySelector proxySelector() {
return proxySelector;
}
@Override
public WebSocket.Builder newWebSocketBuilder() {
// Make sure to pass the HttpClientFacade to the WebSocket builder.
// This will ensure that the facade is not released before the
// WebSocket has been created, at which point the pendingOperationCount
// will have been incremented by the RawChannelTube.
// See RawChannelTube.
return new BuilderImpl(this.facade(), proxySelector);
}
@Override
public Version version() {
return version;
}
String dbgString() {
return dbgTag;
}
@Override
public String toString() {
// Used by tests to get the client's id and compute the
// name of the SelectorManager thread.
return super.toString() + ("(" + id + ")");
}
private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
if (this.cookieHandler != null) {
addFilter(CookieFilter.class);
}
}
private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
final LinkedList<HeaderFilter> filterChain() {
return filters.getFilterChain();
}
// Timer controls.
// Timers are implemented through timed Selector.select() calls.
synchronized void registerTimer(TimeoutEvent event) {
Log.logTrace("Registering timer {0}", event);
timeouts.add(event);
selmgr.wakeupSelector();
}
synchronized void cancelTimer(TimeoutEvent event) {
Log.logTrace("Canceling timer {0}", event);
timeouts.remove(event);
}
Purges ( handles ) timer events that have passed their deadline, and
returns the amount of time, in milliseconds, until the next earliest
event. A return value of 0 means that there are no events.
/**
* Purges ( handles ) timer events that have passed their deadline, and
* returns the amount of time, in milliseconds, until the next earliest
* event. A return value of 0 means that there are no events.
*/
private long purgeTimeoutsAndReturnNextDeadline() {
long diff = 0L;
List<TimeoutEvent> toHandle = null;
int remaining = 0;
// enter critical section to retrieve the timeout event to handle
synchronized(this) {
if (timeouts.isEmpty()) return 0L;
Instant now = Instant.now();
Iterator<TimeoutEvent> itr = timeouts.iterator();
while (itr.hasNext()) {
TimeoutEvent event = itr.next();
diff = now.until(event.deadline(), ChronoUnit.MILLIS);
if (diff <= 0) {
itr.remove();
toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
toHandle.add(event);
} else {
break;
}
}
remaining = timeouts.size();
}
// can be useful for debugging
if (toHandle != null && Log.trace()) {
Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
+ toHandle.size() + " events, "
+ "remaining " + remaining
+ ", next deadline: " + (diff < 0 ? 0L : diff));
}
// handle timeout events out of critical section
if (toHandle != null) {
Throwable failed = null;
for (TimeoutEvent event : toHandle) {
try {
Log.logTrace("Firing timer {0}", event);
event.handle();
} catch (Error | RuntimeException e) {
// Not expected. Handle remaining events then throw...
// If e is an OOME or SOE it might simply trigger a new
// error from here - but in this case there's not much we
// could do anyway. Just let it flow...
if (failed == null) failed = e;
else failed.addSuppressed(e);
Log.logTrace("Failed to handle event {0}: {1}", event, e);
}
}
if (failed instanceof Error) throw (Error) failed;
if (failed instanceof RuntimeException) throw (RuntimeException) failed;
}
// return time to wait until next event. 0L if there's no more events.
return diff < 0 ? 0L : diff;
}
// used for the connection window
int getReceiveBufferSize() {
return Utils.getIntegerNetProperty(
"jdk.httpclient.receiveBufferSize",
0 // only set the size if > 0
);
}
// Optimization for reading SSL encrypted data
// --------------------------------------------
// Returns a BufferSupplier that can be used for reading
// encrypted bytes of the channel. These buffers can then
// be recycled by the SSLFlowDelegate::Reader after their
// content has been copied in the SSLFlowDelegate::Reader
// readBuf.
// Because allocating, reading, copying, and recycling
// all happen in the SelectorManager thread,
// then this BufferSupplier can be shared between all
// the SSL connections managed by this client.
BufferSupplier getSSLBufferSupplier() {
return sslBufferSupplier;
}
// An implementation of BufferSupplier that manages a pool of
// maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
// are used for reading encrypted bytes off the channel before
// copying and subsequent unwrapping.
private static final class SSLDirectBufferSupplier implements BufferSupplier {
private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
private final HttpClientImpl client;
private final Logger debug;
private int tail, count; // no need for volatile: only accessed in SM thread.
SSLDirectBufferSupplier(HttpClientImpl client) {
this.client = Objects.requireNonNull(client);
this.debug = client.debug;
}
// Gets a buffer from the pool, or allocates a new one if needed.
@Override
public ByteBuffer get() {
assert client.isSelectorThread();
assert tail <= POOL_SIZE : "allocate tail is " + tail;
ByteBuffer buf;
if (tail == 0) {
if (debug.on()) {
// should not appear more than SocketTube.MAX_BUFFERS
debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
}
assert count++ < POOL_SIZE : "trying to allocate more than "
+ POOL_SIZE + " buffers";
buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
} else {
assert tail > 0 : "non positive tail value: " + tail;
tail--;
buf = pool[tail];
pool[tail] = null;
}
assert buf.isDirect();
assert buf.position() == 0;
assert buf.hasRemaining();
assert buf.limit() == Utils.BUFSIZE;
assert tail < POOL_SIZE;
assert tail >= 0;
return buf;
}
// Returns the given buffer to the pool.
@Override
public void recycle(ByteBuffer buffer) {
assert client.isSelectorThread();
assert buffer.isDirect();
assert !buffer.hasRemaining();
assert tail < POOL_SIZE : "recycle tail is " + tail;
assert tail >= 0;
buffer.position(0);
buffer.limit(buffer.capacity());
// don't fail if assertions are off. we have asserted above.
if (tail < POOL_SIZE) {
pool[tail] = buffer;
tail++;
}
assert tail <= POOL_SIZE;
assert tail > 0;
}
}
}