/*
 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpHeaders;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Utils;
import static java.net.http.HttpClient.Version.HTTP_2;

Wraps socket channel layer and takes care of SSL also. Subtypes are: PlainHttpConnection: regular direct TCP connection to server PlainProxyConnection: plain text proxy connection PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server AsyncSSLConnection: TLS channel direct to server AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
/** * Wraps socket channel layer and takes care of SSL also. * * Subtypes are: * PlainHttpConnection: regular direct TCP connection to server * PlainProxyConnection: plain text proxy connection * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server * AsyncSSLConnection: TLS channel direct to server * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel */
abstract class HttpConnection implements Closeable { final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); final static Logger DEBUG_LOGGER = Utils.getDebugLogger( () -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
The address this connection is connected to. Could be a server or a proxy.
/** The address this connection is connected to. Could be a server or a proxy. */
final InetSocketAddress address; private final HttpClientImpl client; private final TrailingOperations trailingOperations; HttpConnection(InetSocketAddress address, HttpClientImpl client) { this.address = address; this.client = client; trailingOperations = new TrailingOperations(); } private static final class TrailingOperations { private final Map<CompletionStage<?>, Boolean> operations = new IdentityHashMap<>(); void add(CompletionStage<?> cf) { synchronized(operations) { operations.put(cf, Boolean.TRUE); cf.whenComplete((r,t)-> remove(cf)); } } boolean remove(CompletionStage<?> cf) { synchronized(operations) { return operations.remove(cf); } } } final void addTrailingOperation(CompletionStage<?> cf) { trailingOperations.add(cf); } // final void removeTrailingOperation(CompletableFuture<?> cf) { // trailingOperations.remove(cf); // } final HttpClientImpl client() { return client; }
Initiates the connect phase. Returns a CompletableFuture that completes when the underlying TCP connection has been established or an error occurs.
/** * Initiates the connect phase. * * Returns a CompletableFuture that completes when the underlying * TCP connection has been established or an error occurs. */
public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
Finishes the connection phase. Returns a CompletableFuture that completes when any additional, type specific, setup has been done. Must be called after connectAsync.
/** * Finishes the connection phase. * * Returns a CompletableFuture that completes when any additional, * type specific, setup has been done. Must be called after connectAsync. */
public abstract CompletableFuture<Void> finishConnect();
Tells whether, or not, this connection is connected to its destination.
/** Tells whether, or not, this connection is connected to its destination. */
abstract boolean connected();
Tells whether, or not, this connection is secure ( over SSL )
/** Tells whether, or not, this connection is secure ( over SSL ) */
abstract boolean isSecure();
Tells whether, or not, this connection is proxied. Returns true for tunnel connections, or clear connection to any host through proxy.
/** * Tells whether, or not, this connection is proxied. * Returns true for tunnel connections, or clear connection to * any host through proxy. */
abstract boolean isProxied();
Tells whether, or not, this connection is open.
/** Tells whether, or not, this connection is open. */
final boolean isOpen() { return channel().isOpen() && (connected() ? !getConnectionFlow().isFinished() : true); } interface HttpPublisher extends FlowTube.TubePublisher { void enqueue(List<ByteBuffer> buffers) throws IOException; void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; void signalEnqueued() throws IOException; }
Returns the HTTP publisher associated with this connection. May be null if invoked before connecting.
/** * Returns the HTTP publisher associated with this connection. May be null * if invoked before connecting. */
abstract HttpPublisher publisher(); // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
Returns true if the given client's SSL parameter protocols contains at least one TLS version that HTTP/2 requires.
/** * Returns true if the given client's SSL parameter protocols contains at * least one TLS version that HTTP/2 requires. */
private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { String[] protos = client.sslParameters().getProtocols(); if (protos != null) { return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); } else { return false; } }
Factory for retrieving HttpConnections. A connection can be retrieved from the connection pool, or a new one created if none available. The given addr is the ultimate destination. Any proxies, etc, are determined from the request. Returns a concrete instance which is one of the following: PlainHttpConnection PlainTunnelingConnection The returned connection, if not from the connection pool, must have its, connect() or connectAsync() method invoked, which ( when it completes successfully ) renders the connection usable for requests.
/** * Factory for retrieving HttpConnections. A connection can be retrieved * from the connection pool, or a new one created if none available. * * The given {@code addr} is the ultimate destination. Any proxies, * etc, are determined from the request. Returns a concrete instance which * is one of the following: * {@link PlainHttpConnection} * {@link PlainTunnelingConnection} * * The returned connection, if not from the connection pool, must have its, * connect() or connectAsync() method invoked, which ( when it completes * successfully ) renders the connection usable for requests. */
public static HttpConnection getConnection(InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request, Version version) { // The default proxy selector may select a proxy whose address is // unresolved. We must resolve the address before connecting to it. InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); HttpConnection c = null; boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); if (!secure) { c = pool.getConnection(false, addr, proxy); if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { final HttpConnection conn = c; if (DEBUG_LOGGER.on()) DEBUG_LOGGER.log(conn.getConnectionFlow() + ": plain connection retrieved from HTTP/1.1 pool"); return c; } else { return getPlainConnection(addr, proxy, request, client); } } else { // secure if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool c = pool.getConnection(true, addr, proxy); } if (c != null && c.isOpen()) { final HttpConnection conn = c; if (DEBUG_LOGGER.on()) DEBUG_LOGGER.log(conn.getConnectionFlow() + ": SSL connection retrieved from HTTP/1.1 pool"); return c; } else { String[] alpn = null; if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { alpn = new String[] { "h2", "http/1.1" }; } return getSSLConnection(addr, proxy, alpn, request, client); } } } private static HttpConnection getSSLConnection(InetSocketAddress addr, InetSocketAddress proxy, String[] alpn, HttpRequestImpl request, HttpClientImpl client) { if (proxy != null) return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, proxyTunnelHeaders(request)); else return new AsyncSSLConnection(addr, client, alpn); }
This method is used to build a filter that will accept or veto (header-name, value) tuple for transmission on the wire. The filter is applied to the headers when sending the headers to the remote party. Which tuple is accepted/vetoed depends on:
   - whether the connection is a tunnel connection
     [talking to a server through a proxy tunnel]
   - whether the method is CONNECT
     [establishing a CONNECT tunnel through a proxy]
   - whether the request is using a proxy
     (and the connection is not a tunnel)
     [talking to a server through a proxy]
   - whether the request is a direct connection to
     a server (no tunnel, no proxy).
Params:
  • request –
Returns:
/** * This method is used to build a filter that will accept or * veto (header-name, value) tuple for transmission on the * wire. * The filter is applied to the headers when sending the headers * to the remote party. * Which tuple is accepted/vetoed depends on: * <pre> * - whether the connection is a tunnel connection * [talking to a server through a proxy tunnel] * - whether the method is CONNECT * [establishing a CONNECT tunnel through a proxy] * - whether the request is using a proxy * (and the connection is not a tunnel) * [talking to a server through a proxy] * - whether the request is a direct connection to * a server (no tunnel, no proxy). * </pre> * @param request * @return */
BiPredicate<String,String> headerFilter(HttpRequestImpl request) { if (isTunnel()) { // talking to a server through a proxy tunnel // don't send proxy-* headers to a plain server assert !request.isConnect(); return Utils.NO_PROXY_HEADERS_FILTER; } else if (request.isConnect()) { // establishing a proxy tunnel // check for proxy tunnel disabled schemes // assert !this.isTunnel(); assert request.proxy() == null; return Utils.PROXY_TUNNEL_FILTER; } else if (request.proxy() != null) { // talking to a server through a proxy (no tunnel) // check for proxy disabled schemes // assert !isTunnel() && !request.isConnect(); return Utils.PROXY_FILTER; } else { // talking to a server directly (no tunnel, no proxy) // don't send proxy-* headers to a plain server // assert request.proxy() == null && !request.isConnect(); return Utils.NO_PROXY_HEADERS_FILTER; } } // Composes a new immutable HttpHeaders that combines the // user and system header but only keeps those headers that // start with "proxy-" private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); combined.putAll(request.getSystemHeadersBuilder().map()); combined.putAll(request.headers().map()); // let user override system // keep only proxy-* - and also strip authorization headers // for disabled schemes return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); } /* Returns either a plain HTTP connection or a plain tunnelling connection * for proxied WebSocket */ private static HttpConnection getPlainConnection(InetSocketAddress addr, InetSocketAddress proxy, HttpRequestImpl request, HttpClientImpl client) { if (request.isWebSocket() && proxy != null) return new PlainTunnelingConnection(addr, proxy, client, proxyTunnelHeaders(request)); if (proxy == null) return new PlainHttpConnection(addr, client); else return new PlainProxyConnection(proxy, client); } void closeOrReturnToCache(HttpHeaders hdrs) { if (hdrs == null) { // the connection was closed by server, eof Log.logTrace("Cannot return connection to pool: closing {0}", this); close(); return; } HttpClientImpl client = client(); if (client == null) { Log.logTrace("Client released: closing {0}", this); close(); return; } ConnectionPool pool = client.connectionPool(); boolean keepAlive = hdrs.firstValue("Connection") .map((s) -> !s.equalsIgnoreCase("close")) .orElse(true); if (keepAlive && isOpen()) { Log.logTrace("Returning connection to the pool: {0}", this); pool.returnToPool(this); } else { Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}", keepAlive, isOpen(), this); close(); } } /* Tells whether or not this connection is a tunnel through a proxy */ boolean isTunnel() { return false; } abstract SocketChannel channel(); final InetSocketAddress address() { return address; } abstract ConnectionPool.CacheKey cacheKey();
Closes this connection, by returning the socket to its connection pool.
/** * Closes this connection, by returning the socket to its connection pool. */
@Override public abstract void close(); abstract FlowTube getConnectionFlow();
A publisher that makes it possible to publish (write) ordered (normal priority) and unordered (high priority) buffers downstream.
/** * A publisher that makes it possible to publish (write) ordered (normal * priority) and unordered (high priority) buffers downstream. */
final class PlainHttpPublisher implements HttpPublisher { final Object reading; PlainHttpPublisher() { this(new Object()); } PlainHttpPublisher(Object readingLock) { this.reading = readingLock; } final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>(); volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; volatile HttpWriteSubscription subscription; final SequentialScheduler writeScheduler = new SequentialScheduler(this::flushTask); @Override public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { synchronized (reading) { //assert this.subscription == null; //assert this.subscriber == null; if (subscription == null) { subscription = new HttpWriteSubscription(); } this.subscriber = subscriber; } // TODO: should we do this in the flow? subscriber.onSubscribe(subscription); signal(); } void flushTask(DeferredCompleter completer) { try { HttpWriteSubscription sub = subscription; if (sub != null) sub.flush(); } finally { completer.complete(); } } void signal() { writeScheduler.runOrSchedule(); } final class HttpWriteSubscription implements Flow.Subscription { final Demand demand = new Demand(); @Override public void request(long n) { if (n <= 0) throw new IllegalArgumentException("non-positive request"); demand.increase(n); if (debug.on()) debug.log("HttpPublisher: got request of " + n + " from " + getConnectionFlow()); writeScheduler.runOrSchedule(); } @Override public void cancel() { if (debug.on()) debug.log("HttpPublisher: cancelled by " + getConnectionFlow()); } private boolean isEmpty() { return queue.isEmpty() && priority.isEmpty(); } private List<ByteBuffer> poll() { List<ByteBuffer> elem = priority.poll(); return elem == null ? queue.poll() : elem; } void flush() { while (!isEmpty() && demand.tryDecrement()) { List<ByteBuffer> elem = poll(); if (debug.on()) debug.log("HttpPublisher: sending " + Utils.remaining(elem) + " bytes (" + elem.size() + " buffers) to " + getConnectionFlow()); subscriber.onNext(elem); } } } @Override public void enqueue(List<ByteBuffer> buffers) throws IOException { queue.add(buffers); int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); debug.log("added %d bytes to the write queue", bytes); } @Override public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { // Unordered frames are sent before existing frames. int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); priority.add(buffers); debug.log("added %d bytes in the priority write queue", bytes); } @Override public void signalEnqueued() throws IOException { debug.log("signalling the publisher of the write queue"); signal(); } } String dbgTag; final String dbgString() { FlowTube flow = getConnectionFlow(); String tag = dbgTag; if (tag == null && flow != null) { dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; } else if (tag == null) { tag = this.getClass().getSimpleName() + "(?)"; } return tag; } @Override public String toString() { return "HttpConnection: " + channel().toString(); } }