/*
* Copyright (C) 2013 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import okhttp3.RealCall.AsyncCall;
import okhttp3.internal.Util;
Policy on when async requests are executed.
Each dispatcher uses an ExecutorService
to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
*/
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
Executes calls. Created lazily. /** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
Ready async calls in the order they'll be run. /** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
Running asynchronous calls. Includes canceled calls that haven't finished yet. /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
Running synchronous calls. Includes canceled calls that haven't finished yet. /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Set the maximum number of requests to execute concurrently. Above this requests queue in
memory, waiting for the running calls to complete.
If more than maxRequests
requests are in flight when this is invoked, those requests will remain in flight.
/**
* Set the maximum number of requests to execute concurrently. Above this requests queue in
* memory, waiting for the running calls to complete.
*
* <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests
* will remain in flight.
*/
public void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
synchronized (this) {
this.maxRequests = maxRequests;
}
promoteAndExecute();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
Set the maximum number of requests for each host to execute concurrently. This limits requests
by the URL's host name. Note that concurrent requests to a single IP address may still exceed
this limit: multiple hostnames may share an IP address or be routed through the same HTTP
proxy.
If more than maxRequestsPerHost
requests are in flight when this is invoked, those requests will remain in flight.
WebSocket connections to hosts do not count against this limit.
/**
* Set the maximum number of requests for each host to execute concurrently. This limits requests
* by the URL's host name. Note that concurrent requests to a single IP address may still exceed
* this limit: multiple hostnames may share an IP address or be routed through the same HTTP
* proxy.
*
* <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those
* requests will remain in flight.
*
* <p>WebSocket connections to hosts <b>do not</b> count against this limit.
*/
public void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
synchronized (this) {
this.maxRequestsPerHost = maxRequestsPerHost;
}
promoteAndExecute();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
Set a callback to be invoked each time the dispatcher becomes idle (when the number of running
calls returns to zero).
Note: The time at which a call is considered idle is different depending on whether it was run asynchronously or synchronously. Asynchronous calls become idle after the onResponse
or onFailure
callback has returned. Synchronous calls become idle once execute()
returns. This means that if you are doing synchronous calls the network layer will not truly be idle until every returned Response
has been closed.
/**
* Set a callback to be invoked each time the dispatcher becomes idle (when the number of running
* calls returns to zero).
*
* <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending
* on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or
* {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the
* {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has
* returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This
* means that if you are doing synchronous calls the network layer will not truly be idle until
* every returned {@link Response} has been closed.
*/
public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
this.idleCallback = idleCallback;
}
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
Cancel all calls currently enqueued or executing. Includes calls executed both synchronously and asynchronously. /**
* Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain
* Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}.
*/
public synchronized void cancelAll() {
for (AsyncCall call : readyAsyncCalls) {
call.get().cancel();
}
for (AsyncCall call : runningAsyncCalls) {
call.get().cancel();
}
for (RealCall call : runningSyncCalls) {
call.cancel();
}
}
Promotes eligible calls from readyAsyncCalls
to runningAsyncCalls
and runs them on the executor service. Must not be called with synchronization because executing calls can call into user code. Returns: true if the dispatcher is currently running calls.
/**
* Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
* them on the executor service. Must not be called with synchronization because executing calls
* can call into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
Returns the number of running calls that share a host with call
. /** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;
}
return result;
}
Used by Call#execute
to signal it is in-flight. /** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
Used by AsyncCall#run
to signal completion. /** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call);
}
Used by Call#execute
to signal completion. /** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
Returns a snapshot of the calls currently awaiting execution. /** Returns a snapshot of the calls currently awaiting execution. */
public synchronized List<Call> queuedCalls() {
List<Call> result = new ArrayList<>();
for (AsyncCall asyncCall : readyAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
Returns a snapshot of the calls currently being executed. /** Returns a snapshot of the calls currently being executed. */
public synchronized List<Call> runningCalls() {
List<Call> result = new ArrayList<>();
result.addAll(runningSyncCalls);
for (AsyncCall asyncCall : runningAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized int queuedCallsCount() {
return readyAsyncCalls.size();
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
}