/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.scheduler;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import static reactor.core.Exceptions.unwrap;

Schedulers provides various Scheduler factories useable by publishOn or subscribeOn :

Factories prefixed with new return a new instance of their flavor of Scheduler, while other factories like elastic() return a shared instance, that is the one used by operators requiring that flavor as their default Scheduler.

Author:Stephane Maldini
/** * {@link Schedulers} provides various {@link Scheduler} factories useable by {@link * reactor.core.publisher.Flux#publishOn publishOn} or {@link reactor.core.publisher.Mono#subscribeOn * subscribeOn} : * <p> * <ul> <li>{@link #fromExecutorService(ExecutorService)}}. </li> <li>{@link #newParallel} * : Optimized for fast {@link Runnable} executions </li> <li>{@link #single} : Optimized * for low-latency {@link Runnable} executions </li> <li>{@link #immediate}. </li> </ul> * <p> * Factories prefixed with {@code new} return a new instance of their flavor of {@link Scheduler}, * while other factories like {@link #elastic()} return a shared instance, that is the one * used by operators requiring that flavor as their default Scheduler. * * @author Stephane Maldini */
public abstract class Schedulers {
Default pool size, initialized by system property `reactor.schedulers.defaultPoolSize` and falls back to the number of processors available to the runtime on init.
See Also:
  • availableProcessors.availableProcessors()
/** * Default pool size, initialized by system property `reactor.schedulers.defaultPoolSize` * and falls back to the number of processors available to the runtime on init. * * @see Runtime#availableProcessors() */
public static final int DEFAULT_POOL_SIZE = Optional.ofNullable(System.getProperty("reactor.schedulers.defaultPoolSize")) .map(Integer::parseInt) .orElseGet(() -> Runtime.getRuntime().availableProcessors()); static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;
Create a Scheduler which uses a backing Executor to schedule Runnables for async operators.
Params:
Returns:a new Scheduler
/** * Create a {@link Scheduler} which uses a backing {@link Executor} to schedule * Runnables for async operators. * * @param executor an {@link Executor} * * @return a new {@link Scheduler} */
public static Scheduler fromExecutor(Executor executor) { return fromExecutor(executor, false); }
Create a Scheduler which uses a backing Executor to schedule Runnables for async operators.
Params:
  • executor – an Executor
  • trampoline – set to false if this Scheduler is used by "operators" that already conflate Runnable executions (publishOn, subscribeOn...)
Returns:a new Scheduler
/** * Create a {@link Scheduler} which uses a backing {@link Executor} to schedule * Runnables for async operators. * * @param executor an {@link Executor} * @param trampoline set to false if this {@link Scheduler} is used by "operators" * that already conflate {@link Runnable} executions (publishOn, subscribeOn...) * * @return a new {@link Scheduler} */
public static Scheduler fromExecutor(Executor executor, boolean trampoline) { if(!trampoline && executor instanceof ExecutorService){ return fromExecutorService((ExecutorService) executor); } return new ExecutorScheduler(executor, trampoline); }
Create a Scheduler which uses a backing ExecutorService to schedule Runnables for async operators.
Params:
Returns:a new Scheduler
/** * Create a {@link Scheduler} which uses a backing {@link ExecutorService} to schedule * Runnables for async operators. * * @param executorService an {@link ExecutorService} * * @return a new {@link Scheduler} */
public static Scheduler fromExecutorService(ExecutorService executorService) { return new DelegateServiceScheduler(executorService); }
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.

The maximum number of created thread pools is unbounded.

The default time-to-live for unused thread pools is 60 seconds, use the appropriate factory to set a different value.

This scheduler is not restartable.

Returns:default instance of a Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches * the thread pools, reusing them once the Workers have been shut down. * <p> * The maximum number of created thread pools is unbounded. * <p> * The default time-to-live for unused thread pools is 60 seconds, use the appropriate * factory to set a different value. * <p> * This scheduler is not restartable. * * @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler elastic() { return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER); }
Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
Returns:default instance of a Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers
/** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based * workers and is suited for parallel work. * * @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers */
public static Scheduler parallel() { return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER); }
Executes tasks on the caller's thread immediately.
Returns:a reusable Scheduler
/** * Executes tasks on the caller's thread immediately. * * @return a reusable {@link Scheduler} */
public static Scheduler immediate() { return ImmediateScheduler.instance(); }
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.

The maximum number of created thread pools is unbounded.

The default time-to-live for unused thread pools is 60 seconds, use the appropriate factory to set a different value.

This scheduler is not restartable.

Params:
  • name – Thread prefix
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches * the thread pools, reusing them once the Workers have been shut down. * <p> * The maximum number of created thread pools is unbounded. * <p> * The default time-to-live for unused thread pools is 60 seconds, use the appropriate * factory to set a different value. * <p> * This scheduler is not restartable. * * @param name Thread prefix * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newElastic(String name) { return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS); }
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.

The maximum number of created thread pools is unbounded.

This scheduler is not restartable.

Params:
  • name – Thread prefix
  • ttlSeconds – Time-to-live for an idle Worker
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches * the thread pools, reusing them once the Workers have been shut down. * <p> * The maximum number of created thread pools is unbounded. * <p> * This scheduler is not restartable. * * @param name Thread prefix * @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker} * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newElastic(String name, int ttlSeconds) { return newElastic(name, ttlSeconds, false); }
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.

The maximum number of created thread pools is unbounded.

This scheduler is not restartable.

Params:
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches * the thread pools, reusing them once the Workers have been shut down. * <p> * The maximum number of created thread pools is unbounded. * <p> * This scheduler is not restartable. * * @param name Thread prefix * @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker} * @param daemon false if the {@link Scheduler} requires an explicit {@link * Scheduler#dispose()} to exit the VM. * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newElastic(String name, int ttlSeconds, boolean daemon) { return newElastic(ttlSeconds, new ReactorThreadFactory(name, ElasticScheduler.COUNTER, daemon, false, Schedulers::defaultUncaughtException)); }
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.

The maximum number of created thread pools is unbounded.

This scheduler is not restartable.

Params:
  • ttlSeconds – Time-to-live for an idle Worker
  • threadFactory – a ThreadFactory to use each thread initialization
Returns:a new Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
/** * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches * the thread pools, reusing them once the Workers have been shut down. * <p> * The maximum number of created thread pools is unbounded. * <p> * This scheduler is not restartable. * * @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker} * @param threadFactory a {@link ThreadFactory} to use each thread initialization * * @return a new {@link Scheduler} that dynamically creates ExecutorService-based * Workers and caches the thread pools, reusing them once the Workers have been shut * down. */
public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { return factory.newElastic(ttlSeconds, threadFactory); }
Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler detects and rejects usage of blocking Reactor APIs.
Params:
  • name – Thread prefix
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based * workers and is suited for parallel work. This type of {@link Scheduler} detects and * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newParallel(String name) { return newParallel(name, DEFAULT_POOL_SIZE); }
Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler detects and rejects usage of blocking Reactor APIs.
Params:
  • name – Thread prefix
  • parallelism – Number of pooled workers.
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based * workers and is suited for parallel work. This type of {@link Scheduler} detects and * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * @param parallelism Number of pooled workers. * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newParallel(String name, int parallelism) { return newParallel(name, parallelism, false); }
Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler detects and rejects usage of blocking Reactor APIs.
Params:
  • name – Thread prefix
  • parallelism – Number of pooled workers.
  • daemon – false if the Scheduler requires an explicit Scheduler.dispose() to exit the VM.
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based * workers and is suited for parallel work. This type of {@link Scheduler} detects and * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * @param parallelism Number of pooled workers. * @param daemon false if the {@link Scheduler} requires an explicit {@link * Scheduler#dispose()} to exit the VM. * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newParallel(String name, int parallelism, boolean daemon) { return newParallel(parallelism, new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon, true, Schedulers::defaultUncaughtException)); }
Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work.
Params:
  • parallelism – Number of pooled workers.
  • threadFactory – a ThreadFactory to use for the fixed initialized number of Thread
Returns:a new Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based * workers and is suited for parallel work. * * @param parallelism Number of pooled workers. * @param threadFactory a {@link ThreadFactory} to use for the fixed initialized * number of {@link Thread} * * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded * ExecutorService-based workers and is suited for parallel work */
public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { return factory.newParallel(parallelism, threadFactory); }
Scheduler that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler detects and rejects usage * of blocking Reactor APIs.
Params:
  • name – Component and thread name prefix
Returns:a new Scheduler that hosts a single-threaded ExecutorService-based worker
/** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is * suited for parallel work. This type of {@link Scheduler} detects and rejects usage * * of blocking Reactor APIs. * * @param name Component and thread name prefix * * @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based * worker */
public static Scheduler newSingle(String name) { return newSingle(name, false); }
Scheduler that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler detects and rejects usage of blocking Reactor APIs.
Params:
Returns:a new Scheduler that hosts a single-threaded ExecutorService-based worker
/** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is * suited for parallel work. This type of {@link Scheduler} detects and rejects usage * of blocking Reactor APIs. * * @param name Component and thread name prefix * @param daemon false if the {@link Scheduler} requires an explicit {@link * Scheduler#dispose()} to exit the VM. * * @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based * worker */
public static Scheduler newSingle(String name, boolean daemon) { return newSingle(new ReactorThreadFactory(name, SingleScheduler.COUNTER, daemon, true, Schedulers::defaultUncaughtException)); }
Scheduler that hosts a single-threaded ExecutorService-based worker and is suited for parallel work.
Params:
Returns:a new Scheduler that hosts a single-threaded ExecutorService-based worker
/** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is * suited for parallel work. * * @param threadFactory a {@link ThreadFactory} to use for the unique thread of the * {@link Scheduler} * * @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based * worker */
public static Scheduler newSingle(ThreadFactory threadFactory) { return factory.newSingle(threadFactory); }
Define a hook that is executed when a Scheduler has handled an error. Note that it is executed after the error has been passed to the thread uncaughtErrorHandler, which is not the case when a fatal error occurs (see Exceptions.throwIfJvmFatal(Throwable)).
Params:
  • c – the new hook to set.
/** * Define a hook that is executed when a {@link Scheduler} has * {@link #handleError(Throwable) handled an error}. Note that it is executed after * the error has been passed to the thread uncaughtErrorHandler, which is not the * case when a fatal error occurs (see {@link Exceptions#throwIfJvmFatal(Throwable)}). * * @param c the new hook to set. */
public static void onHandleError(BiConsumer<Thread, ? super Throwable> c) { if (log.isDebugEnabled()) { log.debug("Hooking new default: onHandleError"); } onHandleErrorHook = Objects.requireNonNull(c, "onHandleError"); }
Check if calling a Reactor blocking API in the current Thread is forbidden or not, by checking if the thread implements NonBlocking (in which case it is forbidden and this method returns true).
Returns:true if blocking is forbidden in this thread, false otherwise
/** * Check if calling a Reactor blocking API in the current {@link Thread} is forbidden * or not, by checking if the thread implements {@link NonBlocking} (in which case it is * forbidden and this method returns {@code true}). * * @return {@code true} if blocking is forbidden in this thread, {@code false} otherwise */
public static boolean isInNonBlockingThread() { return Thread.currentThread() instanceof NonBlocking; }
Check if calling a Reactor blocking API in the given Thread is forbidden or not, by checking if the thread implements NonBlocking (in which case it is forbidden and this method returns true).
Returns:true if blocking is forbidden in that thread, false otherwise
/** * Check if calling a Reactor blocking API in the given {@link Thread} is forbidden * or not, by checking if the thread implements {@link NonBlocking} (in which case it is * forbidden and this method returns {@code true}). * * @return {@code true} if blocking is forbidden in that thread, {@code false} otherwise */
public static boolean isNonBlockingThread(Thread t) { return t instanceof NonBlocking; }
Re-apply default factory to Schedulers
/** * Re-apply default factory to {@link Schedulers} */
public static void resetFactory(){ setFactory(DEFAULT); }
Reset the onHandleError(BiConsumer<Thread,? super Throwable>) hook to the default no-op behavior.
/** * Reset the {@link #onHandleError(BiConsumer)} hook to the default no-op behavior. */
public static void resetOnHandleError() { if (log.isDebugEnabled()) { log.debug("Reset to factory defaults: onHandleError"); } onHandleErrorHook = null; }
Replace Schedulers factories (newParallel, newSingle and newElastic). Also shutdown Schedulers from the cached factories (like single()) in order to also use these replacements, re-creating the shared schedulers from the new factory upon next use.

This method should be called safely and with caution, typically on app startup.

Params:
  • factoryInstance – an arbitrary Factory instance.
/** * Replace {@link Schedulers} factories ({@link #newParallel(String) newParallel}, * {@link #newSingle(String) newSingle} and {@link #newElastic(String) newElastic}). Also * shutdown Schedulers from the cached factories (like {@link #single()}) in order to * also use these replacements, re-creating the shared schedulers from the new factory * upon next use. * <p> * This method should be called safely and with caution, typically on app startup. * * @param factoryInstance an arbitrary {@link Factory} instance. */
public static void setFactory(Factory factoryInstance) { Objects.requireNonNull(factoryInstance, "factoryInstance"); shutdownNow(); factory = factoryInstance; }
Clear any cached Scheduler and call dispose on them.
/** * Clear any cached {@link Scheduler} and call dispose on them. */
public static void shutdownNow() { CachedScheduler oldElastic = CACHED_ELASTIC.getAndSet(null); CachedScheduler oldParallel = CACHED_PARALLEL.getAndSet(null); CachedScheduler oldSingle = CACHED_SINGLE.getAndSet(null); if (oldElastic != null) oldElastic._dispose(); if (oldParallel != null) oldParallel._dispose(); if (oldSingle != null) oldSingle._dispose(); }
Scheduler that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose.
Returns:default instance of a Scheduler that hosts a single-threaded ExecutorService-based worker
/** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is * suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose. * * @return default instance of a {@link Scheduler} that hosts a single-threaded * ExecutorService-based worker */
public static Scheduler single() { return cache(CACHED_SINGLE, SINGLE, SINGLE_SUPPLIER); }
Wraps a single Worker from some other Scheduler and provides Worker services on top of it.

Use the Scheduler.dispose() to release the wrapped worker.

Params:
Returns:a wrapping Scheduler consistently returning a same worker from a source Scheduler
/** * Wraps a single {@link reactor.core.scheduler.Scheduler.Worker} from some other * {@link Scheduler} and provides {@link reactor.core.scheduler.Scheduler.Worker} * services on top of it. * <p> * Use the {@link Scheduler#dispose()} to release the wrapped worker. * * @param original a {@link Scheduler} to call upon to get the single {@link * reactor.core.scheduler.Scheduler.Worker} * * @return a wrapping {@link Scheduler} consistently returning a same worker from a * source {@link Scheduler} */
public static Scheduler single(Scheduler original) { return new SingleWorkerScheduler(original); }
Public factory hook to override Schedulers behavior globally
/** * Public factory hook to override Schedulers behavior globally */
public interface Factory {
Override this method to decorate ScheduledExecutorService internally used by Reactor's various Scheduler implementations, allowing to tune the ScheduledExecutorService backing implementation.
Params:
  • schedulerType – a name hinting at the flavor of Scheduler being tuned.
  • actual – the default backing implementation, provided lazily as a Supplier so that you can bypass instantiation completely if you want to replace it.
Returns:the internal ScheduledExecutorService instance to use.
/** * Override this method to decorate {@link ScheduledExecutorService} internally used by * Reactor's various {@link Scheduler} implementations, allowing to tune the * {@link ScheduledExecutorService} backing implementation. * * @param schedulerType a name hinting at the flavor of Scheduler being tuned. * @param actual the default backing implementation, provided lazily as a Supplier * so that you can bypass instantiation completely if you want to replace it. * @return the internal {@link ScheduledExecutorService} instance to use. */
default ScheduledExecutorService decorateExecutorService(String schedulerType, Supplier<? extends ScheduledExecutorService> actual) { return actual.get(); }
Scheduler that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down.

The maximum number of created workers is unbounded.

Params:
Returns:a new Scheduler that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down.
/** * {@link Scheduler} that dynamically creates Workers resources and caches * eventually, reusing them once the Workers have been shut down. * <p> * The maximum number of created workers is unbounded. * * @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker} * @param threadFactory a {@link ThreadFactory} to use * * @return a new {@link Scheduler} that dynamically creates Workers resources and * caches eventually, reusing them once the Workers have been shut down. */
default Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { return new ElasticScheduler(threadFactory, ttlSeconds); }
Scheduler that hosts a fixed pool of workers and is suited for parallel work.
Params:
  • parallelism – Number of pooled workers.
  • threadFactory – a ThreadFactory to use for the fixed initialized number of Thread
Returns:a new Scheduler that hosts a fixed pool of workers and is suited for parallel work
/** * {@link Scheduler} that hosts a fixed pool of workers and is suited for parallel * work. * * @param parallelism Number of pooled workers. * @param threadFactory a {@link ThreadFactory} to use for the fixed initialized * number of {@link Thread} * * @return a new {@link Scheduler} that hosts a fixed pool of workers and is * suited for parallel work */
default Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { return new ParallelScheduler(parallelism, threadFactory); }
Scheduler that hosts a single worker and is suited for non-blocking work.
Params:
Returns:a new Scheduler that hosts a single worker
/** * {@link Scheduler} that hosts a single worker and is suited for non-blocking * work. * * @param threadFactory a {@link ThreadFactory} to use for the unique resource of * the {@link Scheduler} * * @return a new {@link Scheduler} that hosts a single worker */
default Scheduler newSingle(ThreadFactory threadFactory) { return new SingleScheduler(threadFactory); } } // Internals static final String ELASTIC = "elastic"; // IO stuff static final String PARALLEL = "parallel"; //scale up common tasks static final String SINGLE = "single"; //non blocking tasks static final String IMMEDIATE = "immediate"; static final String FROM_EXECUTOR = "fromExecutor"; static final String FROM_EXECUTOR_SERVICE = "fromExecutorService"; // Cached schedulers in atomic references: static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>(); static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>(); static AtomicReference<CachedScheduler> CACHED_SINGLE = new AtomicReference<>(); static final Supplier<Scheduler> ELASTIC_SUPPLIER = () -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true); static final Supplier<Scheduler> PARALLEL_SUPPLIER = () -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true); static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true); static final Factory DEFAULT = new Factory() { }; static volatile Factory factory = DEFAULT;
Get a CachedScheduler out of the reference or create one using the Supplier if the reference is empty, effectively creating a single instance to be reused as a default scheduler for the given key category.
Params:
  • reference – the cache reference that holds the scheduler
  • key – the "name" for the Scheduler's category/type
  • supplier – the Scheduler generator to use and wrap into a CachedScheduler. Note that in case of a race, an extraneous Scheduler can be created, but it'll get immediately disposed.
Returns:a CachedScheduler to be reused, either pre-existing or created
/** * Get a {@link CachedScheduler} out of the {@code reference} or create one using the * {@link Supplier} if the reference is empty, effectively creating a single instance * to be reused as a default scheduler for the given {@code key} category. * * @param reference the cache reference that holds the scheduler * @param key the "name" for the Scheduler's category/type * @param supplier the {@link Scheduler} generator to use and wrap into a {@link CachedScheduler}. * Note that in case of a race, an extraneous Scheduler can be created, but it'll get * immediately {@link Scheduler#dispose() disposed}. * @return a {@link CachedScheduler} to be reused, either pre-existing or created */
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) { CachedScheduler s = reference.get(); if (s != null) { return s; } s = new CachedScheduler(key, supplier.get()); if (reference.compareAndSet(null, s)) { return s; } //the reference was updated in the meantime with a cached scheduler //fallback to it and dispose the extraneous one s._dispose(); return reference.get(); } static final Logger log = Loggers.getLogger(Schedulers.class); static final void defaultUncaughtException(Thread t, Throwable e) { Schedulers.log.error("Scheduler worker in group " + t.getThreadGroup().getName() + " failed with an uncaught exception", e); } static void handleError(Throwable ex) { Thread thread = Thread.currentThread(); Throwable t = unwrap(ex); Thread.UncaughtExceptionHandler x = thread.getUncaughtExceptionHandler(); if (x != null) { x.uncaughtException(thread, t); } else { log.error("Scheduler worker failed with an uncaught exception", t); } if (onHandleErrorHook != null) { onHandleErrorHook.accept(thread, t); } } static class CachedScheduler implements Scheduler, Supplier<Scheduler>, Scannable { final Scheduler cached; final String key; CachedScheduler(String key, Scheduler cached) { this.cached = cached; this.key = key; } @Override public Disposable schedule(Runnable task) { return cached.schedule(task); } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { return cached.schedule(task, delay, unit); } @Override public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { return cached.schedulePeriodically(task, initialDelay, period, unit); } @Override public Worker createWorker() { return cached.createWorker(); } @Override public long now(TimeUnit unit) { return cached.now(unit); } @Override public void start() { cached.start(); } @Override public void dispose() { } @Override public boolean isDisposed() { return cached.isDisposed(); } @Override public String toString() { return cached.toString(); } @Override public Object scanUnsafe(Attr key) { return Scannable.from(cached).scanUnsafe(key); }
Get the Scheduler that is cached and wrapped inside this CachedScheduler.
Returns:the cached Scheduler
/** * Get the {@link Scheduler} that is cached and wrapped inside this * {@link CachedScheduler}. * * @return the cached Scheduler */
@Override public Scheduler get() { return cached; } void _dispose() { cached.dispose(); } } static Disposable directSchedule(ScheduledExecutorService exec, Runnable task, long delay, TimeUnit unit) { SchedulerTask sr = new SchedulerTask(task); Future<?> f; if (delay <= 0L) { f = exec.submit((Callable<?>) sr); } else { f = exec.schedule((Callable<?>) sr, delay, unit); } sr.setFuture(f); return sr; } static Disposable directSchedulePeriodically(ScheduledExecutorService exec, Runnable task, long initialDelay, long period, TimeUnit unit) { PeriodicSchedulerTask sr = new PeriodicSchedulerTask(task); Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit); sr.setFuture(f); return sr; } static Disposable workerSchedule(ScheduledExecutorService exec, Disposable.Composite tasks, Runnable task, long delay, TimeUnit unit) { WorkerTask sr = new WorkerTask(task, tasks); if (!tasks.add(sr)) { throw Exceptions.failWithRejected(); } try { Future<?> f; if (delay <= 0L) { f = exec.submit((Callable<?>) sr); } else { f = exec.schedule((Callable<?>) sr, delay, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { sr.dispose(); //RejectedExecutionException are propagated up throw ex; } return sr; } static Disposable workerSchedulePeriodically(ScheduledExecutorService exec, Disposable.Composite tasks, Runnable task, long initialDelay, long period, TimeUnit unit) { if (period <= 0L) { InstantPeriodicWorkerTask isr = new InstantPeriodicWorkerTask(task, exec, tasks); try { Future<?> f; if (initialDelay <= 0L) { f = exec.submit(isr); } else { f = exec.schedule(isr, initialDelay, unit); } isr.setFirst(f); } catch (RejectedExecutionException ex) { isr.dispose(); //RejectedExecutionException are propagated up throw ex; } catch (IllegalArgumentException | NullPointerException ex) { isr.dispose(); //IllegalArgumentException are wrapped into RejectedExecutionException and propagated up throw new RejectedExecutionException(ex); } return isr; } PeriodicWorkerTask sr = new PeriodicWorkerTask(task, tasks); if (!tasks.add(sr)) { throw Exceptions.failWithRejected(); } try { Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit); sr.setFuture(f); } catch (RejectedExecutionException ex) { sr.dispose(); //RejectedExecutionException are propagated up throw ex; } catch (IllegalArgumentException | NullPointerException ex) { sr.dispose(); //IllegalArgumentException are wrapped into RejectedExecutionException and propagated up throw new RejectedExecutionException(ex); } return sr; } static ScheduledExecutorService decorateExecutorService(String schedulerType, Supplier<? extends ScheduledExecutorService> actual) { return factory.decorateExecutorService(schedulerType, actual); }
Scan an Executor or ExecutorService, recognizing several special implementations. Unwraps some decorating schedulers, recognizes Scannable schedulers and delegates to their Scannable.scanUnsafe(Attr) method, introspects ThreadPoolExecutor instances.

If no data can be extracted, defaults to the provided orElse scanUnsafe.

Params:
  • executor – the executor to introspect in a best effort manner.
  • key – the key to scan for. CAPACITY and BUFFERED mainly.
Returns:an equivalent of Scannable.scanUnsafe(Attr) but that can also work on some implementations of Executor
/** * Scan an {@link Executor} or {@link ExecutorService}, recognizing several special * implementations. Unwraps some decorating schedulers, recognizes {@link Scannable} * schedulers and delegates to their {@link Scannable#scanUnsafe(Scannable.Attr)} * method, introspects {@link ThreadPoolExecutor} instances. * <p> * If no data can be extracted, defaults to the provided {@code orElse} * {@link Scannable#scanUnsafe(Scannable.Attr) scanUnsafe}. * * @param executor the executor to introspect in a best effort manner. * @param key the key to scan for. CAPACITY and BUFFERED mainly. * @return an equivalent of {@link Scannable#scanUnsafe(Scannable.Attr)} but that can * also work on some implementations of {@link Executor} */
@Nullable static final Object scanExecutor(Executor executor, Scannable.Attr key) { if (executor instanceof DelegateServiceScheduler.UnsupportedScheduledExecutorService) { executor = ((DelegateServiceScheduler.UnsupportedScheduledExecutorService) executor).get(); } if (executor instanceof Scannable) { return ((Scannable) executor).scanUnsafe(key); } if (executor instanceof ExecutorService) { ExecutorService service = (ExecutorService) executor; if (key == Scannable.Attr.TERMINATED) return service.isTerminated(); if (key == Scannable.Attr.CANCELLED) return service.isShutdown(); } if (executor instanceof ThreadPoolExecutor) { final ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) executor; if (key == Scannable.Attr.CAPACITY) return poolExecutor.getMaximumPoolSize(); if (key == Scannable.Attr.BUFFERED) return ((Long) (poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount())).intValue(); if (key == Scannable.Attr.LARGE_BUFFERED) return poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount(); } return null; } }