/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.websocket.BuilderImpl;

Client implementation. Contains all configuration information and also the selector manager thread which allows async events to be registered and delivered when they occur. See AsyncEvent.
/** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */
class HttpClientImpl extends HttpClient { // Define the default factory as a static inner class // that embeds all the necessary logic to avoid // the risk of using a lambda that might keep a reference on the // HttpClient instance from which it was created (helps with // heapdump analysis). private static final class DefaultThreadFactory implements ThreadFactory { private DefaultThreadFactory() {} @Override public Thread newThread(Runnable r) { Thread t = new Thread(null, r, "HttpClient_worker", 0, true); t.setDaemon(true); return t; } static final ThreadFactory INSTANCE = new DefaultThreadFactory(); } private final CookieManager cookieManager; private final Redirect followRedirects; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; private final Executor executor; // Security parameters private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2;
A Set of, deadline first, ordered timeout events.
/** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts; public static HttpClientImpl create(HttpClientBuilderImpl builder) { HttpClientImpl impl = new HttpClientImpl(builder); impl.start(); return impl; } private HttpClientImpl(HttpClientBuilderImpl builder) { if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex); } } else { sslContext = builder.sslContext; } Executor ex = builder.executor; if (ex == null) { ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE); } else { ex = builder.executor; } client2 = new Http2ClientImpl(this); executor = ex; cookieManager = builder.cookieManager; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; this.proxySelector = builder.proxy; authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; } else { version = builder.version; } if (builder.sslParams == null) { sslParams = getDefaultParams(sslContext); } else { sslParams = builder.sslParams; } connections = new ConnectionPool(); connections.start(); timeouts = new TreeSet<>(); try { selmgr = new SelectorManager(this); } catch (IOException e) { // unlikely throw new InternalError(e); } selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); } private void start() { selmgr.start(); } private static SSLParameters getDefaultParams(SSLContext ctx) { SSLParameters params = ctx.getSupportedSSLParameters(); params.setProtocols(new String[]{"TLSv1.2"}); return params; }
Wait for activity on given exchange (assuming blocking = false). It's a no-op if blocking = true. In particular, the following occurs in the SelectorManager thread. 1) mark the connection non-blocking 2) add to selector 3) If selector fires for this exchange then 4) - mark connection as blocking 5) - call AsyncEvent.handle() If exchange needs to block again, then call registerEvent() again
/** * Wait for activity on given exchange (assuming blocking = false). * It's a no-op if blocking = true. In particular, the following occurs * in the SelectorManager thread. * * 1) mark the connection non-blocking * 2) add to selector * 3) If selector fires for this exchange then * 4) - mark connection as blocking * 5) - call AsyncEvent.handle() * * If exchange needs to block again, then call registerEvent() again */
void registerEvent(AsyncEvent exchange) throws IOException { selmgr.register(exchange); }
Only used from RawChannel to disconnect the channel from the selector
/** * Only used from RawChannel to disconnect the channel from * the selector */
void cancelRegistration(SocketChannel s) { selmgr.cancel(s); } Http2ClientImpl client2() { return client2; } /* @Override public ByteBuffer getBuffer() { return pool.getBuffer(); } // SSL buffers are larger. Manage separately int size = 16 * 1024; ByteBuffer getSSLBuffer() { return ByteBuffer.allocate(size); } /** * Return a new buffer that's a bit bigger than the given one * * @param buf * @return * ByteBuffer reallocSSLBuffer(ByteBuffer buf) { size = buf.capacity() * 12 / 10; // 20% bigger return ByteBuffer.allocate(size); } synchronized void returnSSLBuffer(ByteBuffer buf) { if (buf.capacity() >= size) sslBuffers.add(0, buf); } @Override public void returnBuffer(ByteBuffer buffer) { pool.returnBuffer(buffer); } */ @Override public <T> HttpResponse<T> send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) throws IOException, InterruptedException { MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); return mex.response(); } @Override public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) { MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); return mex.responseAsync() .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b); } @Override public <U, T> CompletableFuture<U> sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) { MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler); return mex.multiResponseAsync(); } // new impl. Should get rid of above /* static class BufferPool implements BufferHandler { final LinkedList<ByteBuffer> freelist = new LinkedList<>(); @Override public synchronized ByteBuffer getBuffer() { ByteBuffer buf; while (!freelist.isEmpty()) { buf = freelist.removeFirst(); buf.clear(); return buf; } return ByteBuffer.allocate(BUFSIZE); } @Override public synchronized void returnBuffer(ByteBuffer buffer) { assert buffer.capacity() > 0; freelist.add(buffer); } } static BufferPool pool = new BufferPool(); static BufferHandler pool() { return pool; } */ // Main loop for this client's selector private final static class SelectorManager extends Thread { private static final long NODEADLINE = 3000L; private final Selector selector; private volatile boolean closed; private final List<AsyncEvent> readyList; private final List<AsyncEvent> registrations; // Uses a weak reference to the HttpClient owning this // selector: a strong reference prevents its garbage // collection while the thread is running. // We want the thread to exit gracefully when the // HttpClient that owns it gets GC'ed. WeakReference<HttpClientImpl> ownerRef; SelectorManager(HttpClientImpl ref) throws IOException { super(null, null, "SelectorManager", 0, false); ownerRef = new WeakReference<>(ref); readyList = new ArrayList<>(); registrations = new ArrayList<>(); selector = Selector.open(); } // This returns immediately. So caller not allowed to send/receive // on connection. synchronized void register(AsyncEvent e) throws IOException { registrations.add(e); selector.wakeup(); } synchronized void cancel(SocketChannel e) { SelectionKey key = e.keyFor(selector); if (key != null) { key.cancel(); } selector.wakeup(); } void wakeupSelector() { selector.wakeup(); } synchronized void shutdown() { closed = true; try { selector.close(); } catch (IOException ignored) { } } @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { HttpClientImpl client; synchronized (this) { for (AsyncEvent exchange : registrations) { SelectableChannel c = exchange.channel(); try { c.configureBlocking(false); SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null || !key.isValid()) { if (key != null) { // key is canceled. // invoke selectNow() to purge it // before registering the new event. selector.selectNow(); } sa = new SelectorAttachment(c, selector); } else { sa = (SelectorAttachment) key.attachment(); } sa.register(exchange); } catch (IOException e) { Log.logError("HttpClientImpl: " + e); c.close(); // let the exchange deal with it handleEvent(exchange); } } registrations.clear(); } // Check whether client is still alive, and if not, // gracefully stop this thread if ((client = ownerRef.get()) == null) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } long millis = client.purgeTimeoutsAndReturnNextDeadline(); client = null; // don't hold onto the client ref //debugPrint(selector); // Don't wait for ever as it might prevent the thread to // stop gracefully. millis will be 0 if no deadline was found. int n = selector.select(millis == 0 ? NODEADLINE : millis); if (n == 0) { // Check whether client is still alive, and if not, // gracefully stop this thread if ((client = ownerRef.get()) == null) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } client.purgeTimeoutsAndReturnNextDeadline(); client = null; // don't hold onto the client ref continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); int eventsOccurred = key.readyOps(); sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); for (AsyncEvent exchange : readyList) { if (exchange.blocking()) { exchange.channel().configureBlocking(true); } handleEvent(exchange); // will be delegated to executor } readyList.clear(); } } catch (Throwable e) { if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } } finally { shutdown(); } } void debugPrint(Selector selector) { System.err.println("Selector: debugprint start"); Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { SelectableChannel c = key.channel(); int ops = key.interestOps(); System.err.printf("selector chan:%s ops:%d\n", c, ops); } System.err.println("Selector: debugprint end"); } void handleEvent(AsyncEvent e) { if (closed) { e.abort(); } else { e.handle(); } } }
Tracks multiple user level registrations associated with one NIO registration (SelectionKey). In this implementation, registrations are one-off and when an event is posted the registration is cancelled until explicitly registered again.

No external synchronization required as this class is only used by the SelectorManager thread. One of these objects required per connection.

/** * Tracks multiple user level registrations associated with one NIO * registration (SelectionKey). In this implementation, registrations * are one-off and when an event is posted the registration is cancelled * until explicitly registered again. * * <p> No external synchronization required as this class is only used * by the SelectorManager thread. One of these objects required per * connection. */
private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; private final ArrayList<AsyncEvent> pending; private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { this.pending = new ArrayList<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { int newOps = e.interestOps(); boolean reRegister = (interestOps & newOps) != newOps; interestOps |= newOps; pending.add(e); if (reRegister) { // first time registration happens here also chan.register(selector, interestOps, this); } }
Returns a Stream containing only events that are registered with the given interestOps.
/** * Returns a Stream<AsyncEvents> containing only events that are * registered with the given {@code interestOps}. */
Stream<AsyncEvent> events(int interestOps) { return pending.stream() .filter(ev -> (ev.interestOps() & interestOps) != 0); }
Removes any events with the given interestOps, and if no events remaining, cancels the associated SelectionKey.
/** * Removes any events with the given {@code interestOps}, and if no * events remaining, cancels the associated SelectionKey. */
void resetInterestOps(int interestOps) { int newOps = 0; Iterator<AsyncEvent> itr = pending.iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); int evops = event.interestOps(); if (event.repeating()) { newOps |= evops; continue; } if ((evops & interestOps) != 0) { itr.remove(); } else { newOps |= evops; } } this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); if (newOps == 0) { key.cancel(); } else { key.interestOps(newOps); } } } @Override public SSLContext sslContext() { Utils.checkNetPermission("getSSLContext"); return sslContext; } @Override public Optional<SSLParameters> sslParameters() { return Optional.ofNullable(sslParams); } @Override public Optional<Authenticator> authenticator() { return Optional.ofNullable(authenticator); } @Override public Executor executor() { return executor; } ConnectionPool connectionPool() { return connections; } @Override public Redirect followRedirects() { return followRedirects; } @Override public Optional<CookieManager> cookieManager() { return Optional.ofNullable(cookieManager); } @Override public Optional<ProxySelector> proxy() { return Optional.ofNullable(this.proxySelector); } @Override public WebSocket.Builder newWebSocketBuilder(URI uri, WebSocket.Listener listener) { return new BuilderImpl(this, uri, listener); } @Override public Version version() { return version; } //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); boolean getHttp2Allowed() { return version.equals(Version.HTTP_2); } private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); if (this.cookieManager != null) { addFilter(CookieFilter.class); } } private void addFilter(Class<? extends HeaderFilter> f) { filters.addFilter(f); } final List<HeaderFilter> filterChain() { return filters.getFilterChain(); } // Timer controls. // Timers are implemented through timed Selector.select() calls. synchronized void registerTimer(TimeoutEvent event) { Log.logTrace("Registering timer {0}", event); timeouts.add(event); selmgr.wakeupSelector(); } synchronized void cancelTimer(TimeoutEvent event) { Log.logTrace("Canceling timer {0}", event); timeouts.remove(event); }
Purges ( handles ) timer events that have passed their deadline, and returns the amount of time, in milliseconds, until the next earliest event. A return value of 0 means that there are no events.
/** * Purges ( handles ) timer events that have passed their deadline, and * returns the amount of time, in milliseconds, until the next earliest * event. A return value of 0 means that there are no events. */
private long purgeTimeoutsAndReturnNextDeadline() { long diff = 0L; List<TimeoutEvent> toHandle = null; int remaining = 0; // enter critical section to retrieve the timeout event to handle synchronized(this) { if (timeouts.isEmpty()) return 0L; Instant now = Instant.now(); Iterator<TimeoutEvent> itr = timeouts.iterator(); while (itr.hasNext()) { TimeoutEvent event = itr.next(); diff = now.until(event.deadline(), ChronoUnit.MILLIS); if (diff <= 0) { itr.remove(); toHandle = (toHandle == null) ? new ArrayList<>() : toHandle; toHandle.add(event); } else { break; } } remaining = timeouts.size(); } // can be useful for debugging if (toHandle != null && Log.trace()) { Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " + (toHandle == null ? 0 : toHandle.size()) + " events, " + "remaining " + remaining + ", next deadline: " + (diff < 0 ? 0L : diff)); } // handle timeout events out of critical section if (toHandle != null) { Throwable failed = null; for (TimeoutEvent event : toHandle) { try { Log.logTrace("Firing timer {0}", event); event.handle(); } catch (Error | RuntimeException e) { // Not expected. Handle remaining events then throw... // If e is an OOME or SOE it might simply trigger a new // error from here - but in this case there's not much we // could do anyway. Just let it flow... if (failed == null) failed = e; else failed.addSuppressed(e); Log.logTrace("Failed to handle event {0}: {1}", event, e); } } if (failed instanceof Error) throw (Error) failed; if (failed instanceof RuntimeException) throw (RuntimeException) failed; } // return time to wait until next event. 0L if there's no more events. return diff < 0 ? 0L : diff; } // used for the connection window int getReceiveBufferSize() { return Utils.getIntegerNetProperty( "jdk.httpclient.connectionWindowSize", 256 * 1024 ); } }