/*
* Copyright (C) 2008 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;
import static java.lang.System.identityHashCode;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.j2objc.annotations.RetainedWith;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Executor ensuring that all Runnables submitted are executed in order, using the provided
Executor, and sequentially such that no two will ever be running at the same time.
Tasks submitted to execute(Runnable)
are executed in FIFO order.
The execution of tasks is done by one thread as long as there are tasks left in the queue. When a task is interrupted, execution of subsequent tasks continues. See QueueWorker.workOnQueue
for details.
RuntimeException
s thrown by tasks are simply logged and the executor keeps trucking. If an Error
is thrown, the error will propagate and execution will stop until it is restarted by a call to execute
.
/**
* Executor ensuring that all Runnables submitted are executed in order, using the provided
* Executor, and sequentially such that no two will ever be running at the same time.
*
* <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
*
* <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
* When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. See {@link QueueWorker#workOnQueue} for details.
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until it is
* restarted by a call to {@link #execute}.
*/
@GwtIncompatible
final class SequentialExecutor implements Executor {
private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());
enum WorkerRunningState {
Runnable is not running and not queued for execution /** Runnable is not running and not queued for execution */
IDLE,
Runnable is not running, but is being queued for execution /** Runnable is not running, but is being queued for execution */
QUEUING,
runnable has been submitted but has not yet begun execution /** runnable has been submitted but has not yet begun execution */
QUEUED,
RUNNING,
}
Underlying executor that all submitted Runnable objects are run on. /** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
@GuardedBy("queue")
private final Deque<Runnable> queue = new ArrayDeque<>();
/** see {@link WorkerRunningState} */
@GuardedBy("queue")
private WorkerRunningState workerRunningState = IDLE;
This counter prevents an ABA issue where a thread may successfully schedule the worker, the
worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
worker, and then the first thread's call to delegate.execute() returns. Without this counter,
it would observe the QUEUING state and set it to QUEUED, and the worker would never be
scheduled again for future submissions.
/**
* This counter prevents an ABA issue where a thread may successfully schedule the worker, the
* worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
* worker, and then the first thread's call to delegate.execute() returns. Without this counter,
* it would observe the QUEUING state and set it to QUEUED, and the worker would never be
* scheduled again for future submissions.
*/
@GuardedBy("queue")
private long workerRunCount = 0;
@RetainedWith private final QueueWorker worker = new QueueWorker();
/** Use {@link MoreExecutors#newSequentialExecutor} */
SequentialExecutor(Executor executor) {
this.executor = Preconditions.checkNotNull(executor);
}
Adds a task to the queue and makes sure a worker thread is running.
If this method throws, e.g. a RejectedExecutionException
from the delegate executor, execution of tasks will stop until a call to this method or to resume()
is made.
/**
* Adds a task to the queue and makes sure a worker thread is running.
*
* <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
* execution of tasks will stop until a call to this method or to {@link #resume()} is made.
*/
@Override
public void execute(final Runnable task) {
checkNotNull(task);
final Runnable submittedTask;
final long oldRunCount;
synchronized (queue) {
// If the worker is already running (or execute() on the delegate returned successfully, and
// the worker has yet to start) then we don't need to start the worker.
if (workerRunningState == RUNNING || workerRunningState == QUEUED) {
queue.add(task);
return;
}
oldRunCount = workerRunCount;
// If the worker is not yet running, the delegate Executor might reject our attempt to start
// it. To preserve FIFO order and failure atomicity of rejected execution when the same
// Runnable is executed more than once, allocate a wrapper that we know is safe to remove by
// object identity.
// A data structure that returned a removal handle from add() would allow eliminating this
// allocation.
submittedTask =
new Runnable() {
@Override
public void run() {
task.run();
}
@Override
public String toString() {
return task.toString();
}
};
queue.add(submittedTask);
workerRunningState = QUEUING;
}
try {
executor.execute(worker);
} catch (RuntimeException | Error t) {
synchronized (queue) {
boolean removed =
(workerRunningState == IDLE || workerRunningState == QUEUING)
&& queue.removeLastOccurrence(submittedTask);
// If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But
// that's handled by the log check that catches RuntimeExceptions in the queue worker.
if (!(t instanceof RejectedExecutionException) || removed) {
throw t;
}
}
return;
}
/*
* This is an unsynchronized read! After the read, the function returns immediately or acquires
* the lock to check again. Since an IDLE state was observed inside the preceding synchronized
* block, and reference field assignment is atomic, this may save reacquiring the lock when
* another thread or the worker task has cleared the count and set the state.
*
* <p>When {@link #executor} is a directExecutor(), the value written to
* {@code workerRunningState} will be available synchronously, and behaviour will be
* deterministic.
*/
@SuppressWarnings("GuardedBy")
boolean alreadyMarkedQueued = workerRunningState != QUEUING;
if (alreadyMarkedQueued) {
return;
}
synchronized (queue) {
if (workerRunCount == oldRunCount && workerRunningState == QUEUING) {
workerRunningState = QUEUED;
}
}
}
Worker that runs tasks from SequentialExecutor.queue
until it is empty. /** Worker that runs tasks from {@link #queue} until it is empty. */
private final class QueueWorker implements Runnable {
Runnable task;
@Override
public void run() {
try {
workOnQueue();
} catch (Error e) {
synchronized (queue) {
workerRunningState = IDLE;
}
throw e;
// The execution of a task has ended abnormally.
// We could have tasks left in the queue, so should perhaps try to restart a worker,
// but then the Error will get delayed if we are using a direct (same thread) executor.
}
}
Continues executing tasks from SequentialExecutor.queue
until it is empty. The thread's interrupt bit is cleared before execution of each task.
If the Thread in use is interrupted before or during execution of the tasks in SequentialExecutor.queue
, the Executor will complete its tasks, and then restore the interruption. This means that once the Thread returns to the Executor that this Executor composes, the interruption will still be present. If the composed Executor is an ExecutorService, it can respond to shutdown() by returning tasks queued on that Thread after SequentialExecutor.worker
drains the queue.
/**
* Continues executing tasks from {@link #queue} until it is empty.
*
* <p>The thread's interrupt bit is cleared before execution of each task.
*
* <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
* #queue}, the Executor will complete its tasks, and then restore the interruption. This means
* that once the Thread returns to the Executor that this Executor composes, the interruption
* will still be present. If the composed Executor is an ExecutorService, it can respond to
* shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue.
*/
private void workOnQueue() {
boolean interruptedDuringTask = false;
boolean hasSetRunning = false;
try {
while (true) {
synchronized (queue) {
// Choose whether this thread will run or not after acquiring the lock on the first
// iteration
if (!hasSetRunning) {
if (workerRunningState == RUNNING) {
// Don't want to have two workers pulling from the queue.
return;
} else {
// Increment the run counter to avoid the ABA problem of a submitter marking the
// thread as QUEUED after it already ran and exhausted the queue before returning
// from execute().
workerRunCount++;
workerRunningState = RUNNING;
hasSetRunning = true;
}
}
task = queue.poll();
if (task == null) {
workerRunningState = IDLE;
return;
}
}
// Remove the interrupt bit before each task. The interrupt is for the "current task" when
// it is sent, so subsequent tasks in the queue should not be caused to be interrupted
// by a previous one in the queue being interrupted.
interruptedDuringTask |= Thread.interrupted();
try {
task.run();
} catch (RuntimeException e) {
log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
} finally {
task = null;
}
}
} finally {
// Ensure that if the thread was interrupted at all while processing the task queue, it
// is returned to the delegate Executor interrupted so that it may handle the
// interruption if it likes.
if (interruptedDuringTask) {
Thread.currentThread().interrupt();
}
}
}
@SuppressWarnings("GuardedBy")
@Override
public String toString() {
Runnable currentlyRunning = task;
if (currentlyRunning != null) {
return "SequentialExecutorWorker{running=" + currentlyRunning + "}";
}
return "SequentialExecutorWorker{state=" + workerRunningState + "}";
}
}
@Override
public String toString() {
return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}";
}
}