/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.scheduler;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

Wraps a java.util.concurrent.Executor and provides the Scheduler API over it.

It supports both non-trampolined worker (for cases where the trampolining happens externally) and trampolined worker. This scheduler is NOT time-capable (can't schedule with delay / periodically).

Author:Stephane Maldini
/** * Wraps a java.util.concurrent.Executor and provides the Scheduler API over it. * <p> * It supports both non-trampolined worker (for cases where the trampolining happens * externally) and trampolined worker. This scheduler is NOT time-capable (can't schedule * with delay / periodically). * * @author Stephane Maldini */
final class ExecutorScheduler implements Scheduler, Scannable { final Executor executor; final boolean trampoline; volatile boolean terminated; ExecutorScheduler(Executor executor, boolean trampoline) { this.executor = executor; this.trampoline = trampoline; } @Override public Disposable schedule(Runnable task) { if(terminated){ throw Exceptions.failWithRejected(); } Objects.requireNonNull(task, "task"); ExecutorPlainRunnable r = new ExecutorPlainRunnable(task); //RejectedExecutionException are propagated up, but since Executor doesn't from //failing tasks we'll also wrap the execute call in a try catch: try { executor.execute(r); } catch (Throwable ex) { if (executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { terminated = true; } Schedulers.handleError(ex); throw Exceptions.failWithRejected(ex); } return r; } @Override public void dispose() { terminated = true; } @Override public boolean isDisposed() { return terminated; } @Override public Worker createWorker() { return trampoline ? new ExecutorSchedulerTrampolineWorker(executor) : new ExecutorSchedulerWorker(executor); } @Override public String toString() { StringBuilder ts = new StringBuilder(Schedulers.FROM_EXECUTOR) .append('(').append(executor); if (trampoline) ts.append(",trampolining"); ts.append(')'); return ts.toString(); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.NAME) return toString(); return null; }
A non-tracked runnable that wraps a task and offers cancel support in the form of not executing the task.

Since Executor doesn't have cancellation support of its own, the ExecutorRunnable will stay in the Executor's queue and be always executed.

/** * A non-tracked runnable that wraps a task and offers cancel support in the form * of not executing the task. * <p>Since Executor doesn't have cancellation support of its own, the * ExecutorRunnable will stay in the Executor's queue and be always executed. */
static final class ExecutorPlainRunnable extends AtomicBoolean implements Runnable, Disposable { /** */ private static final long serialVersionUID = 5116223460201378097L; final Runnable task; ExecutorPlainRunnable(Runnable task) { this.task = task; } @Override public void run() { if (!get()) { try { task.run(); } catch (Throwable ex) { Schedulers.handleError(ex); } finally { lazySet(true); } } } @Override public boolean isDisposed() { return get(); } @Override public void dispose() { set(true); } }
Common interface between the tracking workers to signal the need for removal.
/** * Common interface between the tracking workers to signal the need for removal. */
interface WorkerDelete { void delete(ExecutorTrackedRunnable r); }
A Runnable that wraps a task and has reference back to its parent worker to remove itself once completed or cancelled
/** * A Runnable that wraps a task and has reference back to its parent worker to * remove itself once completed or cancelled */
static final class ExecutorTrackedRunnable extends AtomicBoolean implements Runnable, Disposable { /** */ private static final long serialVersionUID = 3503344795919906192L; final Runnable task; final WorkerDelete parent; final boolean callRemoveOnFinish; ExecutorTrackedRunnable(Runnable task, WorkerDelete parent, boolean callRemoveOnFinish) { this.task = task; this.parent = parent; this.callRemoveOnFinish = callRemoveOnFinish; } @Override public void run() { if (!get()) { try { task.run(); } catch (Throwable ex) { Schedulers.handleError(ex); } finally { if (callRemoveOnFinish) { dispose(); } else { lazySet(true); } } } } @Override public void dispose() { if (compareAndSet(false, true)) { parent.delete(this); } } @Override public boolean isDisposed() { return get(); } }
A non-trampolining worker that tracks tasks.
/** * A non-trampolining worker that tracks tasks. */
static final class ExecutorSchedulerWorker implements Scheduler.Worker, WorkerDelete, Scannable { final Executor executor; final Disposable.Composite tasks; ExecutorSchedulerWorker(Executor executor) { this.executor = executor; this.tasks = Disposables.composite(); } @Override public Disposable schedule(Runnable task) { Objects.requireNonNull(task, "task"); ExecutorTrackedRunnable r = new ExecutorTrackedRunnable(task, this, true); if (!tasks.add(r)) { throw Exceptions.failWithRejected(); } try { executor.execute(r); } catch (Throwable ex) { tasks.remove(r); Schedulers.handleError(ex); throw Exceptions.failWithRejected(ex); } return r; } @Override public void dispose() { tasks.dispose(); } @Override public boolean isDisposed() { return tasks.isDisposed(); } @Override public void delete(ExecutorTrackedRunnable r) { tasks.remove(r); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.BUFFERED) return tasks.size(); if (key == Attr.PARENT) return (executor instanceof Scannable) ? executor : null; if (key == Attr.NAME) { //hack to recognize the SingleWorker if (executor instanceof SingleWorkerScheduler) return executor + ".worker"; return Schedulers.FROM_EXECUTOR + "(" + executor + ").worker"; } return Schedulers.scanExecutor(executor, key); } }
A trampolining worker that tracks tasks.
/** * A trampolining worker that tracks tasks. */
static final class ExecutorSchedulerTrampolineWorker implements Scheduler.Worker, WorkerDelete, Runnable, Scannable { final Executor executor; final Queue<ExecutorTrackedRunnable> queue; volatile boolean terminated; volatile int wip; static final AtomicIntegerFieldUpdater<ExecutorSchedulerTrampolineWorker> WIP = AtomicIntegerFieldUpdater.newUpdater(ExecutorSchedulerTrampolineWorker.class, "wip"); ExecutorSchedulerTrampolineWorker(Executor executor) { this.executor = executor; this.queue = new ConcurrentLinkedQueue<>(); } @Override public Disposable schedule(Runnable task) { Objects.requireNonNull(task, "task"); if (terminated) { throw Exceptions.failWithRejected(); } ExecutorTrackedRunnable r = new ExecutorTrackedRunnable(task, this, false); synchronized (this) { if (terminated) { throw Exceptions.failWithRejected(); } queue.offer(r); } if (WIP.getAndIncrement(this) == 0) { try { executor.execute(this); } catch (Throwable ex) { r.dispose(); Schedulers.handleError(ex); throw Exceptions.failWithRejected(ex); } } return r; } @Override public void dispose() { if (terminated) { return; } terminated = true; final Queue<ExecutorTrackedRunnable> q = queue; ExecutorTrackedRunnable r; while ((r = q.poll()) != null && !q.isEmpty()) { r.dispose(); } } @Override public boolean isDisposed() { return terminated; } @Override public void delete(ExecutorTrackedRunnable r) { synchronized (this) { if (!terminated) { queue.remove(r); } } } @Override public void run() { final Queue<ExecutorTrackedRunnable> q = queue; for (; ; ) { int e = 0; int r = wip; while (e != r) { if (terminated) { return; } ExecutorTrackedRunnable task = q.poll(); if (task == null) { break; } task.run(); e++; } if (e == r && terminated) { return; } if (WIP.addAndGet(this, -e) == 0) { break; } } } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.PARENT) return (executor instanceof Scannable) ? executor : null; if (key == Attr.NAME) return Schedulers.FROM_EXECUTOR + "(" + executor + ",trampolining).worker"; if (key == Attr.BUFFERED || key == Attr.LARGE_BUFFERED) return queue.size(); return Schedulers.scanExecutor(executor, key); } } }