/*
 * Copyright (C) 2008 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.j2objc.annotations.WeakOuter;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

Executor ensuring that all Runnables submitted are executed in order, using the provided Executor, and sequentially such that no two will ever be running at the same time.

Tasks submitted to execute(Runnable) are executed in FIFO order.

The execution of tasks is done by one thread as long as there are tasks left in the queue. When a task is interrupted, execution of subsequent tasks continues. See QueueWorker.workOnQueue for details.

RuntimeExceptions thrown by tasks are simply logged and the executor keeps trucking. If an Error is thrown, the error will propagate and execution will stop until it is restarted by a call to execute.

/** * Executor ensuring that all Runnables submitted are executed in order, using the provided * Executor, and sequentially such that no two will ever be running at the same time. * * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order. * * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue. * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks * continues. See {@link QueueWorker#workOnQueue} for details. * * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. * If an {@code Error} is thrown, the error will propagate and execution will stop until it is * restarted by a call to {@link #execute}. */
@GwtIncompatible final class SequentialExecutor implements Executor { private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName()); enum WorkerRunningState {
Runnable is not running and not queued for execution
/** Runnable is not running and not queued for execution */
IDLE,
Runnable is not running, but is being queued for execution
/** Runnable is not running, but is being queued for execution */
QUEUING,
runnable has been submitted but has not yet begun execution
/** runnable has been submitted but has not yet begun execution */
QUEUED, RUNNING, }
Underlying executor that all submitted Runnable objects are run on.
/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor; @GuardedBy("queue") private final Deque<Runnable> queue = new ArrayDeque<>(); /** see {@link WorkerRunningState} */ @GuardedBy("queue") private WorkerRunningState workerRunningState = IDLE;
This counter prevents an ABA issue where a thread may successfully schedule the worker, the worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the worker, and then the first thread's call to delegate.execute() returns. Without this counter, it would observe the QUEUING state and set it to QUEUED, and the worker would never be scheduled again for future submissions.
/** * This counter prevents an ABA issue where a thread may successfully schedule the worker, the * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the * worker, and then the first thread's call to delegate.execute() returns. Without this counter, * it would observe the QUEUING state and set it to QUEUED, and the worker would never be * scheduled again for future submissions. */
@GuardedBy("queue") private long workerRunCount = 0; private final QueueWorker worker = new QueueWorker(); /** Use {@link MoreExecutors#newSequentialExecutor} */ SequentialExecutor(Executor executor) { this.executor = Preconditions.checkNotNull(executor); }
Adds a task to the queue and makes sure a worker thread is running.

If this method throws, e.g. a RejectedExecutionException from the delegate executor, execution of tasks will stop until a call to this method or to resume() is made.

/** * Adds a task to the queue and makes sure a worker thread is running. * * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor, * execution of tasks will stop until a call to this method or to {@link #resume()} is made. */
@Override public void execute(final Runnable task) { checkNotNull(task); final Runnable submittedTask; final long oldRunCount; synchronized (queue) { // If the worker is already running (or execute() on the delegate returned successfully, and // the worker has yet to start) then we don't need to start the worker. if (workerRunningState == RUNNING || workerRunningState == QUEUED) { queue.add(task); return; } oldRunCount = workerRunCount; // If the worker is not yet running, the delegate Executor might reject our attempt to start // it. To preserve FIFO order and failure atomicity of rejected execution when the same // Runnable is executed more than once, allocate a wrapper that we know is safe to remove by // object identity. // A data structure that returned a removal handle from add() would allow eliminating this // allocation. submittedTask = new Runnable() { @Override public void run() { task.run(); } }; queue.add(submittedTask); workerRunningState = QUEUING; } try { executor.execute(worker); } catch (RuntimeException | Error t) { synchronized (queue) { boolean removed = (workerRunningState == IDLE || workerRunningState == QUEUING) && queue.removeLastOccurrence(submittedTask); // If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But // that's handled by the log check that catches RuntimeExceptions in the queue worker. if (!(t instanceof RejectedExecutionException) || removed) { throw t; } } return; } /* * This is an unsynchronized read! After the read, the function returns immediately or acquires * the lock to check again. Since an IDLE state was observed inside the preceding synchronized * block, and reference field assignment is atomic, this may save reacquiring the lock when * another thread or the worker task has cleared the count and set the state. * * <p>When {@link #executor} is a directExecutor(), the value written to * {@code workerRunningState} will be available synchronously, and behaviour will be * deterministic. */ @SuppressWarnings("GuardedBy") boolean alreadyMarkedQueued = workerRunningState != QUEUING; if (alreadyMarkedQueued) { return; } synchronized (queue) { if (workerRunCount == oldRunCount && workerRunningState == QUEUING) { workerRunningState = QUEUED; } } }
Worker that runs tasks from SequentialExecutor.queue until it is empty.
/** Worker that runs tasks from {@link #queue} until it is empty. */
@WeakOuter private final class QueueWorker implements Runnable { @Override public void run() { try { workOnQueue(); } catch (Error e) { synchronized (queue) { workerRunningState = IDLE; } throw e; // The execution of a task has ended abnormally. // We could have tasks left in the queue, so should perhaps try to restart a worker, // but then the Error will get delayed if we are using a direct (same thread) executor. } }
Continues executing tasks from SequentialExecutor.queue until it is empty.

The thread's interrupt bit is cleared before execution of each task.

If the Thread in use is interrupted before or during execution of the tasks in SequentialExecutor.queue, the Executor will complete its tasks, and then restore the interruption. This means that once the Thread returns to the Executor that this Executor composes, the interruption will still be present. If the composed Executor is an ExecutorService, it can respond to shutdown() by returning tasks queued on that Thread after SequentialExecutor.worker drains the queue.

/** * Continues executing tasks from {@link #queue} until it is empty. * * <p>The thread's interrupt bit is cleared before execution of each task. * * <p>If the Thread in use is interrupted before or during execution of the tasks in {@link * #queue}, the Executor will complete its tasks, and then restore the interruption. This means * that once the Thread returns to the Executor that this Executor composes, the interruption * will still be present. If the composed Executor is an ExecutorService, it can respond to * shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue. */
private void workOnQueue() { boolean interruptedDuringTask = false; boolean hasSetRunning = false; try { while (true) { Runnable task; synchronized (queue) { // Choose whether this thread will run or not after acquiring the lock on the first // iteration if (!hasSetRunning) { if (workerRunningState == RUNNING) { // Don't want to have two workers pulling from the queue. return; } else { // Increment the run counter to avoid the ABA problem of a submitter marking the // thread as QUEUED after it already ran and exhausted the queue before returning // from execute(). workerRunCount++; workerRunningState = RUNNING; hasSetRunning = true; } } task = queue.poll(); if (task == null) { workerRunningState = IDLE; return; } } // Remove the interrupt bit before each task. The interrupt is for the "current task" when // it is sent, so subsequent tasks in the queue should not be caused to be interrupted // by a previous one in the queue being interrupted. interruptedDuringTask |= Thread.interrupted(); try { task.run(); } catch (RuntimeException e) { log.log(Level.SEVERE, "Exception while executing runnable " + task, e); } } } finally { // Ensure that if the thread was interrupted at all while processing the task queue, it // is returned to the delegate Executor interrupted so that it may handle the // interruption if it likes. if (interruptedDuringTask) { Thread.currentThread().interrupt(); } } } } }