/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.Utils;

Http 1.1 connection pool.
/** * Http 1.1 connection pool. */
final class ConnectionPool { static final long KEEP_ALIVE = Utils.getIntegerNetProperty( "jdk.httpclient.keepalive.timeout", 1200); // seconds static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); // Pools of idle connections private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool; private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool; private final ExpiryList expiryList; private final String dbgTag; // used for debug boolean stopped;
Entries in connection pool are keyed by destination address and/or proxy address: case 1: plain TCP not via proxy (destination only) case 2: plain TCP via proxy (proxy only) case 3: SSL not via proxy (destination only) case 4: SSL over tunnel (destination and proxy)
/** * Entries in connection pool are keyed by destination address and/or * proxy address: * case 1: plain TCP not via proxy (destination only) * case 2: plain TCP via proxy (proxy only) * case 3: SSL not via proxy (destination only) * case 4: SSL over tunnel (destination and proxy) */
static class CacheKey { final InetSocketAddress proxy; final InetSocketAddress destination; CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { this.proxy = proxy; this.destination = destination; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final CacheKey other = (CacheKey) obj; if (!Objects.equals(this.proxy, other.proxy)) { return false; } if (!Objects.equals(this.destination, other.destination)) { return false; } return true; } @Override public int hashCode() { return Objects.hash(proxy, destination); } } ConnectionPool(long clientId) { this("ConnectionPool("+clientId+")"); }
There should be one of these per HttpClient.
/** * There should be one of these per HttpClient. */
private ConnectionPool(String tag) { dbgTag = tag; plainPool = new HashMap<>(); sslPool = new HashMap<>(); expiryList = new ExpiryList(); } final String dbgString() { return dbgTag; } void start() { assert !stopped : "Already stopped"; } static CacheKey cacheKey(InetSocketAddress destination, InetSocketAddress proxy) { return new CacheKey(destination, proxy); } synchronized HttpConnection getConnection(boolean secure, InetSocketAddress addr, InetSocketAddress proxy) { if (stopped) return null; CacheKey key = new CacheKey(addr, proxy); HttpConnection c = secure ? findConnection(key, sslPool) : findConnection(key, plainPool); //System.out.println ("getConnection returning: " + c); return c; }
Returns the connection to the pool.
/** * Returns the connection to the pool. */
void returnToPool(HttpConnection conn) { returnToPool(conn, Instant.now(), KEEP_ALIVE); } // Called also by whitebox tests void returnToPool(HttpConnection conn, Instant now, long keepAlive) { // Don't call registerCleanupTrigger while holding a lock, // but register it before the connection is added to the pool, // since we don't want to trigger the cleanup if the connection // is not in the pool. CleanupTrigger cleanup = registerCleanupTrigger(conn); // it's possible that cleanup may have been called. synchronized(this) { if (cleanup.isDone()) { return; } else if (stopped) { conn.close(); return; } if (conn instanceof PlainHttpConnection) { putConnection(conn, plainPool); } else { assert conn.isSecure(); putConnection(conn, sslPool); } expiryList.add(conn, now, keepAlive); } //System.out.println("Return to pool: " + conn); } private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { // Connect the connection flow to a pub/sub pair that will take the // connection out of the pool and close it if anything happens // while the connection is sitting in the pool. CleanupTrigger cleanup = new CleanupTrigger(conn); FlowTube flow = conn.getConnectionFlow(); debug.log(Level.DEBUG, "registering %s", cleanup); flow.connectFlows(cleanup, cleanup); return cleanup; } private HttpConnection findConnection(CacheKey key, HashMap<CacheKey,LinkedList<HttpConnection>> pool) { LinkedList<HttpConnection> l = pool.get(key); if (l == null || l.isEmpty()) { return null; } else { HttpConnection c = l.removeFirst(); expiryList.remove(c); return c; } } /* called from cache cleaner only */ private boolean removeFromPool(HttpConnection c, HashMap<CacheKey,LinkedList<HttpConnection>> pool) { //System.out.println("cacheCleaner removing: " + c); assert Thread.holdsLock(this); CacheKey k = c.cacheKey(); List<HttpConnection> l = pool.get(k); if (l == null || l.isEmpty()) { pool.remove(k); return false; } return l.remove(c); } private void putConnection(HttpConnection c, HashMap<CacheKey,LinkedList<HttpConnection>> pool) { CacheKey key = c.cacheKey(); LinkedList<HttpConnection> l = pool.get(key); if (l == null) { l = new LinkedList<>(); pool.put(key, l); } l.add(c); }
Purge expired connection and return the number of milliseconds in which the next connection is scheduled to expire. If no connections are scheduled to be purged return 0.
Returns:the delay in milliseconds in which the next connection will expire.
/** * Purge expired connection and return the number of milliseconds * in which the next connection is scheduled to expire. * If no connections are scheduled to be purged return 0. * @return the delay in milliseconds in which the next connection will * expire. */
long purgeExpiredConnectionsAndReturnNextDeadline() { if (!expiryList.purgeMaybeRequired()) return 0; return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); } // Used for whitebox testing long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { long nextPurge = 0; // We may be in the process of adding new elements // to the expiry list - but those elements will not // have outlast their keep alive timer yet since we're // just adding them. if (!expiryList.purgeMaybeRequired()) return nextPurge; List<HttpConnection> closelist; synchronized (this) { closelist = expiryList.purgeUntil(now); for (HttpConnection c : closelist) { if (c instanceof PlainHttpConnection) { boolean wasPresent = removeFromPool(c, plainPool); assert wasPresent; } else { boolean wasPresent = removeFromPool(c, sslPool); assert wasPresent; } } nextPurge = now.until( expiryList.nextExpiryDeadline().orElse(now), ChronoUnit.MILLIS); } closelist.forEach(this::close); return nextPurge; } private void close(HttpConnection c) { try { c.close(); } catch (Throwable e) {} // ignore } void stop() { List<HttpConnection> closelist = Collections.emptyList(); try { synchronized (this) { stopped = true; closelist = expiryList.stream() .map(e -> e.connection) .collect(Collectors.toList()); expiryList.clear(); plainPool.clear(); sslPool.clear(); } } finally { closelist.forEach(this::close); } } static final class ExpiryEntry { final HttpConnection connection; final Instant expiry; // absolute time in seconds of expiry time ExpiryEntry(HttpConnection connection, Instant expiry) { this.connection = connection; this.expiry = expiry; } }
Manages a LinkedList of sorted ExpiryEntry. The entry with the closer deadline is at the tail of the list, and the entry with the farther deadline is at the head. In the most common situation, new elements will need to be added at the head (or close to it), and expired elements will need to be purged from the tail.
/** * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer * deadline is at the tail of the list, and the entry with the farther * deadline is at the head. In the most common situation, new elements * will need to be added at the head (or close to it), and expired elements * will need to be purged from the tail. */
private static final class ExpiryList { private final LinkedList<ExpiryEntry> list = new LinkedList<>(); private volatile boolean mayContainEntries; // A loosely accurate boolean whose value is computed // at the end of each operation performed on ExpiryList; // Does not require synchronizing on the ConnectionPool. boolean purgeMaybeRequired() { return mayContainEntries; } // Returns the next expiry deadline // should only be called while holding a synchronization // lock on the ConnectionPool Optional<Instant> nextExpiryDeadline() { if (list.isEmpty()) return Optional.empty(); else return Optional.of(list.getLast().expiry); } // should only be called while holding a synchronization // lock on the ConnectionPool void add(HttpConnection conn) { add(conn, Instant.now(), KEEP_ALIVE); } // Used by whitebox test. void add(HttpConnection conn, Instant now, long keepAlive) { Instant then = now.truncatedTo(ChronoUnit.SECONDS) .plus(keepAlive, ChronoUnit.SECONDS); // Elements with the farther deadline are at the head of // the list. It's more likely that the new element will // have the farthest deadline, and will need to be inserted // at the head of the list, so we're using an ascending // list iterator to find the right insertion point. ListIterator<ExpiryEntry> li = list.listIterator(); while (li.hasNext()) { ExpiryEntry entry = li.next(); if (then.isAfter(entry.expiry)) { li.previous(); // insert here li.add(new ExpiryEntry(conn, then)); mayContainEntries = true; return; } } // last (or first) element of list (the last element is // the first when the list is empty) list.add(new ExpiryEntry(conn, then)); mayContainEntries = true; } // should only be called while holding a synchronization // lock on the ConnectionPool void remove(HttpConnection c) { if (c == null || list.isEmpty()) return; ListIterator<ExpiryEntry> li = list.listIterator(); while (li.hasNext()) { ExpiryEntry e = li.next(); if (e.connection.equals(c)) { li.remove(); mayContainEntries = !list.isEmpty(); return; } } } // should only be called while holding a synchronization // lock on the ConnectionPool. // Purge all elements whose deadline is before now (now included). List<HttpConnection> purgeUntil(Instant now) { if (list.isEmpty()) return Collections.emptyList(); List<HttpConnection> closelist = new ArrayList<>(); // elements with the closest deadlines are at the tail // of the queue, so we're going to use a descending iterator // to remove them, and stop when we find the first element // that has not expired yet. Iterator<ExpiryEntry> li = list.descendingIterator(); while (li.hasNext()) { ExpiryEntry entry = li.next(); // use !isAfter instead of isBefore in order to // remove the entry if its expiry == now if (!entry.expiry.isAfter(now)) { li.remove(); HttpConnection c = entry.connection; closelist.add(c); } else break; // the list is sorted } mayContainEntries = !list.isEmpty(); return closelist; } // should only be called while holding a synchronization // lock on the ConnectionPool java.util.stream.Stream<ExpiryEntry> stream() { return list.stream(); } // should only be called while holding a synchronization // lock on the ConnectionPool void clear() { list.clear(); mayContainEntries = false; } } void cleanup(HttpConnection c, Throwable error) { debug.log(Level.DEBUG, "%s : ConnectionPool.cleanup(%s)", String.valueOf(c.getConnectionFlow()), error); synchronized(this) { if (c instanceof PlainHttpConnection) { removeFromPool(c, plainPool); } else { assert c.isSecure(); removeFromPool(c, sslPool); } expiryList.remove(c); } c.close(); }
An object that subscribes to the flow while the connection is in the pool. Anything that comes in will cause the connection to be closed and removed from the pool.
/** * An object that subscribes to the flow while the connection is in * the pool. Anything that comes in will cause the connection to be closed * and removed from the pool. */
private final class CleanupTrigger implements FlowTube.TubeSubscriber, FlowTube.TubePublisher, Flow.Subscription { private final HttpConnection connection; private volatile boolean done; public CleanupTrigger(HttpConnection connection) { this.connection = connection; } public boolean isDone() { return done;} private void triggerCleanup(Throwable error) { done = true; cleanup(connection, error); } @Override public void request(long n) {} @Override public void cancel() {} @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(1); } @Override public void onError(Throwable error) { triggerCleanup(error); } @Override public void onComplete() { triggerCleanup(null); } @Override public void onNext(List<ByteBuffer> item) { triggerCleanup(new IOException("Data received while in pool")); } @Override public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { subscriber.onSubscribe(this); } @Override public String toString() { return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; } } }