/*
 * Copyright (C) 2015 Square, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package okhttp3.internal.connection;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.Socket;
import java.util.List;
import okhttp3.Address;
import okhttp3.Call;
import okhttp3.Connection;
import okhttp3.ConnectionPool;
import okhttp3.EventListener;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Route;
import okhttp3.internal.Internal;
import okhttp3.internal.Util;
import okhttp3.internal.http.HttpCodec;
import okhttp3.internal.http2.ConnectionShutdownException;
import okhttp3.internal.http2.ErrorCode;
import okhttp3.internal.http2.StreamResetException;

import static okhttp3.internal.Util.closeQuietly;

This class coordinates the relationship between three entities:
  • Connections: physical socket connections to remote servers. These are potentially slow to establish so it is necessary to be able to cancel a connection currently being connected.
  • Streams: logical HTTP request/response pairs that are layered on connections. Each connection has its own allocation limit, which defines how many concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream at a time, HTTP/2 typically carry multiple.
  • Calls: a logical sequence of streams, typically an initial request and its follow up requests. We prefer to keep all streams of a single call on the same connection for better behavior and locality.

Instances of this class act on behalf of the call, using one or more streams over one or more connections. This class has APIs to release each of the above resources:

  • noNewStreams() prevents the connection from being used for new streams in the future. Use this after a Connection: close header, or when the connection may be inconsistent.
  • streamFinished() releases the active stream from this allocation. Note that only one stream may be active at a given time, so it is necessary to call streamFinished() before creating a subsequent stream with newStream().
  • release() removes the call's hold on the connection. Note that this won't immediately free the connection if there is a stream still lingering. That happens when a call is complete but its response body has yet to be fully consumed.

This class supports asynchronous canceling. This is intended to have the smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream but not the other streams sharing its connection. But if the TLS handshake is still in progress then canceling may break the entire connection.

/** * This class coordinates the relationship between three entities: * * <ul> * <li><strong>Connections:</strong> physical socket connections to remote servers. These are * potentially slow to establish so it is necessary to be able to cancel a connection * currently being connected. * <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on * connections. Each connection has its own allocation limit, which defines how many * concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream * at a time, HTTP/2 typically carry multiple. * <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and * its follow up requests. We prefer to keep all streams of a single call on the same * connection for better behavior and locality. * </ul> * * <p>Instances of this class act on behalf of the call, using one or more streams over one or more * connections. This class has APIs to release each of the above resources: * * <ul> * <li>{@link #noNewStreams()} prevents the connection from being used for new streams in the * future. Use this after a {@code Connection: close} header, or when the connection may be * inconsistent. * <li>{@link #streamFinished streamFinished()} releases the active stream from this allocation. * Note that only one stream may be active at a given time, so it is necessary to call * {@link #streamFinished streamFinished()} before creating a subsequent stream with {@link * #newStream newStream()}. * <li>{@link #release()} removes the call's hold on the connection. Note that this won't * immediately free the connection if there is a stream still lingering. That happens when a * call is complete but its response body has yet to be fully consumed. * </ul> * * <p>This class supports {@linkplain #cancel asynchronous canceling}. This is intended to have the * smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream * but not the other streams sharing its connection. But if the TLS handshake is still in progress * then canceling may break the entire connection. */
public final class StreamAllocation { public final Address address; private RouteSelector.Selection routeSelection; private Route route; private final ConnectionPool connectionPool; public final Call call; public final EventListener eventListener; private final Object callStackTrace; // State guarded by connectionPool. private final RouteSelector routeSelector; private int refusedStreamCount; private RealConnection connection; private boolean reportedAcquired; private boolean released; private boolean canceled; private HttpCodec codec; public StreamAllocation(ConnectionPool connectionPool, Address address, Call call, EventListener eventListener, Object callStackTrace) { this.connectionPool = connectionPool; this.address = address; this.call = call; this.eventListener = eventListener; this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener); this.callStackTrace = callStackTrace; } public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); int readTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); HttpCodec resultCodec = resultConnection.newCodec(client, chain, this); synchronized (connectionPool) { codec = resultCodec; return resultCodec; } } catch (IOException e) { throw new RouteException(e); } }
Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated until a healthy connection is found.
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; } return candidate; } }
Returns a connection to host a new stream. This prefers the existing connection if it exists, then the pool, finally building a new connection.
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); if (this.connection != null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; } if (result == null) { // Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }
Releases the currently held connection and returns a socket to close if the held connection restricts new streams from being created. With HTTP/2 multiple requests share the same connection so it's possible that our connection is restricted from creating new streams during a follow-up request.
/** * Releases the currently held connection and returns a socket to close if the held connection * restricts new streams from being created. With HTTP/2 multiple requests share the same * connection so it's possible that our connection is restricted from creating new streams during * a follow-up request. */
private Socket releaseIfNoNewStreams() { assert (Thread.holdsLock(connectionPool)); RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && allocatedConnection.noNewStreams) { return deallocate(false, false, true); } return null; } public void streamFinished(boolean noNewStreams, HttpCodec codec, long bytesRead, IOException e) { eventListener.responseBodyEnd(call, bytesRead); Socket socket; Connection releasedConnection; boolean callEnd; synchronized (connectionPool) { if (codec == null || codec != this.codec) { throw new IllegalStateException("expected " + this.codec + " but was " + codec); } if (!noNewStreams) { connection.successCount++; } releasedConnection = connection; socket = deallocate(noNewStreams, false, true); if (connection != null) releasedConnection = null; callEnd = this.released; } closeQuietly(socket); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (e != null) { e = Internal.instance.timeoutExit(call, e); eventListener.callFailed(call, e); } else if (callEnd) { Internal.instance.timeoutExit(call, null); eventListener.callEnd(call); } } public HttpCodec codec() { synchronized (connectionPool) { return codec; } } private RouteDatabase routeDatabase() { return Internal.instance.routeDatabase(connectionPool); } public Route route() { return route; } public synchronized RealConnection connection() { return connection; } public void release() { Socket socket; Connection releasedConnection; synchronized (connectionPool) { releasedConnection = connection; socket = deallocate(false, true, false); if (connection != null) releasedConnection = null; } closeQuietly(socket); if (releasedConnection != null) { Internal.instance.timeoutExit(call, null); eventListener.connectionReleased(call, releasedConnection); eventListener.callEnd(call); } }
Forbid new streams from being created on the connection that hosts this allocation.
/** Forbid new streams from being created on the connection that hosts this allocation. */
public void noNewStreams() { Socket socket; Connection releasedConnection; synchronized (connectionPool) { releasedConnection = connection; socket = deallocate(true, false, false); if (connection != null) releasedConnection = null; } closeQuietly(socket); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } }
Releases resources held by this allocation. If sufficient resources are allocated, the connection will be detached or closed. Callers must be synchronized on the connection pool.

Returns a closeable that the caller should pass to Util.closeQuietly upon completion of the synchronized block. (We don't do I/O while synchronized on the connection pool.)

/** * Releases resources held by this allocation. If sufficient resources are allocated, the * connection will be detached or closed. Callers must be synchronized on the connection pool. * * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion * of the synchronized block. (We don't do I/O while synchronized on the connection pool.) */
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) { assert (Thread.holdsLock(connectionPool)); if (streamFinished) { this.codec = null; } if (released) { this.released = true; } Socket socket = null; if (connection != null) { if (noNewStreams) { connection.noNewStreams = true; } if (this.codec == null && (this.released || connection.noNewStreams)) { release(connection); if (connection.allocations.isEmpty()) { connection.idleAtNanos = System.nanoTime(); if (Internal.instance.connectionBecameIdle(connectionPool, connection)) { socket = connection.socket(); } } connection = null; } } return socket; } public void cancel() { HttpCodec codecToCancel; RealConnection connectionToCancel; synchronized (connectionPool) { canceled = true; codecToCancel = codec; connectionToCancel = connection; } if (codecToCancel != null) { codecToCancel.cancel(); } else if (connectionToCancel != null) { connectionToCancel.cancel(); } } public void streamFailed(IOException e) { Socket socket; Connection releasedConnection; boolean noNewStreams = false; synchronized (connectionPool) { if (e instanceof StreamResetException) { ErrorCode errorCode = ((StreamResetException) e).errorCode; if (errorCode == ErrorCode.REFUSED_STREAM) { // Retry REFUSED_STREAM errors once on the same connection. refusedStreamCount++; if (refusedStreamCount > 1) { noNewStreams = true; route = null; } } else if (errorCode != ErrorCode.CANCEL) { // Keep the connection for CANCEL errors. Everything else wants a fresh connection. noNewStreams = true; route = null; } } else if (connection != null && (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) { noNewStreams = true; // If this route hasn't completed a call, avoid it for new connections. if (connection.successCount == 0) { if (route != null && e != null) { routeSelector.connectFailed(route, e); } route = null; } } releasedConnection = connection; socket = deallocate(noNewStreams, false, true); if (connection != null || !reportedAcquired) releasedConnection = null; } closeQuietly(socket); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } }
Use this allocation to hold connection. Each call to this must be paired with a call to release on the same connection.
/** * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to * {@link #release} on the same connection. */
public void acquire(RealConnection connection, boolean reportedAcquired) { assert (Thread.holdsLock(connectionPool)); if (this.connection != null) throw new IllegalStateException(); this.connection = connection; this.reportedAcquired = reportedAcquired; connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); }
Remove this allocation from the connection's list of allocations.
/** Remove this allocation from the connection's list of allocations. */
private void release(RealConnection connection) { for (int i = 0, size = connection.allocations.size(); i < size; i++) { Reference<StreamAllocation> reference = connection.allocations.get(i); if (reference.get() == this) { connection.allocations.remove(i); return; } } throw new IllegalStateException(); }
Release the connection held by this connection and acquire newConnection instead. It is only safe to call this if the held connection is newly connected but duplicated by newConnection. Typically this occurs when concurrently connecting to an HTTP/2 webserver.

Returns a closeable that the caller should pass to Util.closeQuietly upon completion of the synchronized block. (We don't do I/O while synchronized on the connection pool.)

/** * Release the connection held by this connection and acquire {@code newConnection} instead. It is * only safe to call this if the held connection is newly connected but duplicated by {@code * newConnection}. Typically this occurs when concurrently connecting to an HTTP/2 webserver. * * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion * of the synchronized block. (We don't do I/O while synchronized on the connection pool.) */
public Socket releaseAndAcquire(RealConnection newConnection) { assert (Thread.holdsLock(connectionPool)); if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException(); // Release the old connection. Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0); Socket socket = deallocate(true, false, false); // Acquire the new connection. this.connection = newConnection; newConnection.allocations.add(onlyAllocation); return socket; } public boolean hasMoreRoutes() { return route != null || (routeSelection != null && routeSelection.hasNext()) || routeSelector.hasNext(); } @Override public String toString() { RealConnection connection = connection(); return connection != null ? connection.toString() : address.toString(); } public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {
Captures the stack trace at the time the Call is executed or enqueued. This is helpful for identifying the origin of connection leaks.
/** * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for * identifying the origin of connection leaks. */
public final Object callStackTrace; StreamAllocationReference(StreamAllocation referent, Object callStackTrace) { super(referent); this.callStackTrace = callStackTrace; } } }