/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.scheduler;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import static reactor.core.Exceptions.unwrap;
Schedulers
provides various Scheduler
factories useable by publishOn
or
subscribeOn
:
fromExecutorService(ExecutorService)
}. newParallel
: Optimized for fast Runnable
executions single
: Optimized for low-latency Runnable
executions immediate
.
Factories prefixed with new
return a new instance of their flavor of Scheduler
, while other factories like elastic()
return a shared instance, that is the one used by operators requiring that flavor as their default Scheduler.
Author: Stephane Maldini
/**
* {@link Schedulers} provides various {@link Scheduler} factories useable by {@link
* reactor.core.publisher.Flux#publishOn publishOn} or {@link reactor.core.publisher.Mono#subscribeOn
* subscribeOn} :
* <p>
* <ul> <li>{@link #fromExecutorService(ExecutorService)}}. </li> <li>{@link #newParallel}
* : Optimized for fast {@link Runnable} executions </li> <li>{@link #single} : Optimized
* for low-latency {@link Runnable} executions </li> <li>{@link #immediate}. </li> </ul>
* <p>
* Factories prefixed with {@code new} return a new instance of their flavor of {@link Scheduler},
* while other factories like {@link #elastic()} return a shared instance, that is the one
* used by operators requiring that flavor as their default Scheduler.
*
* @author Stephane Maldini
*/
public abstract class Schedulers {
Default pool size, initialized by system property `reactor.schedulers.defaultPoolSize`
and falls back to the number of processors available to the runtime on init.
See Also: - availableProcessors.availableProcessors()
/**
* Default pool size, initialized by system property `reactor.schedulers.defaultPoolSize`
* and falls back to the number of processors available to the runtime on init.
*
* @see Runtime#availableProcessors()
*/
public static final int DEFAULT_POOL_SIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultPoolSize"))
.map(Integer::parseInt)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;
Params: - executor – an
Executor
Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
*
* @param executor an {@link Executor}
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutor(Executor executor) {
return fromExecutor(executor, false);
}
Params: Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
*
* @param executor an {@link Executor}
* @param trampoline set to false if this {@link Scheduler} is used by "operators"
* that already conflate {@link Runnable} executions (publishOn, subscribeOn...)
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutor(Executor executor, boolean trampoline) {
if(!trampoline && executor instanceof ExecutorService){
return fromExecutorService((ExecutorService) executor);
}
return new ExecutorScheduler(executor, trampoline);
}
Create a Scheduler
which uses a backing ExecutorService
to schedule Runnables for async operators. Params: - executorService – an
ExecutorService
Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link ExecutorService} to schedule
* Runnables for async operators.
*
* @param executorService an {@link ExecutorService}
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutorService(ExecutorService executorService) {
return new DelegateServiceScheduler(executorService);
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the appropriate
factory to set a different value.
This scheduler is not restartable.
Returns: default instance of a Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the appropriate
* factory to set a different value.
* <p>
* This scheduler is not restartable.
*
* @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler elastic() {
return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. Returns: default instance of a Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work.
*
* @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers
*/
public static Scheduler parallel() {
return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER);
}
Executes tasks on the caller's thread immediately.
Returns: a reusable Scheduler
/**
* Executes tasks on the caller's thread immediately.
*
* @return a reusable {@link Scheduler}
*/
public static Scheduler immediate() {
return ImmediateScheduler.instance();
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the appropriate
factory to set a different value.
This scheduler is not restartable.
Params: - name – Thread prefix
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the appropriate
* factory to set a different value.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newElastic(String name) {
return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newElastic(String name, int ttlSeconds) {
return newElastic(name, ttlSeconds, false);
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newElastic(String name, int ttlSeconds, boolean daemon) {
return newElastic(ttlSeconds,
new ReactorThreadFactory(name, ElasticScheduler.COUNTER, daemon, false,
Schedulers::defaultUncaughtException));
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - ttlSeconds – Time-to-live for an idle
Worker
- threadFactory – a
ThreadFactory
to use each thread initialization
Returns: a new Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
*
* @return a new {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the thread pools, reusing them once the Workers have been shut
* down.
*/
public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
return factory.newElastic(ttlSeconds, threadFactory);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name) {
return newParallel(name, DEFAULT_POOL_SIZE);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
- parallelism – Number of pooled workers.
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism) {
return newParallel(name, parallelism, false);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
- parallelism – Number of pooled workers.
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism, boolean daemon) {
return newParallel(parallelism,
new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon,
true, Schedulers::defaultUncaughtException));
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. Params: - parallelism – Number of pooled workers.
- threadFactory – a
ThreadFactory
to use for the fixed initialized number of Thread
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work.
*
* @param parallelism Number of pooled workers.
* @param threadFactory a {@link ThreadFactory} to use for the fixed initialized
* number of {@link Thread}
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
return factory.newParallel(parallelism, threadFactory);
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler
detects and rejects usage * of blocking Reactor APIs. Params: - name – Component and thread name prefix
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. This type of {@link Scheduler} detects and rejects usage
* * of blocking Reactor APIs.
*
* @param name Component and thread name prefix
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(String name) {
return newSingle(name, false);
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Component and thread name prefix
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. This type of {@link Scheduler} detects and rejects usage
* of blocking Reactor APIs.
*
* @param name Component and thread name prefix
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(String name, boolean daemon) {
return newSingle(new ReactorThreadFactory(name, SingleScheduler.COUNTER, daemon,
true, Schedulers::defaultUncaughtException));
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. Params: - threadFactory – a
ThreadFactory
to use for the unique thread of the Scheduler
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work.
*
* @param threadFactory a {@link ThreadFactory} to use for the unique thread of the
* {@link Scheduler}
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(ThreadFactory threadFactory) {
return factory.newSingle(threadFactory);
}
Define a hook that is executed when a Scheduler
has handled an error
. Note that it is executed after the error has been passed to the thread uncaughtErrorHandler, which is not the case when a fatal error occurs (see Exceptions.throwIfJvmFatal(Throwable)
). Params: - c – the new hook to set.
/**
* Define a hook that is executed when a {@link Scheduler} has
* {@link #handleError(Throwable) handled an error}. Note that it is executed after
* the error has been passed to the thread uncaughtErrorHandler, which is not the
* case when a fatal error occurs (see {@link Exceptions#throwIfJvmFatal(Throwable)}).
*
* @param c the new hook to set.
*/
public static void onHandleError(BiConsumer<Thread, ? super Throwable> c) {
if (log.isDebugEnabled()) {
log.debug("Hooking new default: onHandleError");
}
onHandleErrorHook = Objects.requireNonNull(c, "onHandleError");
}
Check if calling a Reactor blocking API in the current Thread
is forbidden or not, by checking if the thread implements NonBlocking
(in which case it is forbidden and this method returns true
). Returns: true
if blocking is forbidden in this thread, false
otherwise
/**
* Check if calling a Reactor blocking API in the current {@link Thread} is forbidden
* or not, by checking if the thread implements {@link NonBlocking} (in which case it is
* forbidden and this method returns {@code true}).
*
* @return {@code true} if blocking is forbidden in this thread, {@code false} otherwise
*/
public static boolean isInNonBlockingThread() {
return Thread.currentThread() instanceof NonBlocking;
}
Check if calling a Reactor blocking API in the given Thread
is forbidden or not, by checking if the thread implements NonBlocking
(in which case it is forbidden and this method returns true
). Returns: true
if blocking is forbidden in that thread, false
otherwise
/**
* Check if calling a Reactor blocking API in the given {@link Thread} is forbidden
* or not, by checking if the thread implements {@link NonBlocking} (in which case it is
* forbidden and this method returns {@code true}).
*
* @return {@code true} if blocking is forbidden in that thread, {@code false} otherwise
*/
public static boolean isNonBlockingThread(Thread t) {
return t instanceof NonBlocking;
}
Re-apply default factory to Schedulers
/**
* Re-apply default factory to {@link Schedulers}
*/
public static void resetFactory(){
setFactory(DEFAULT);
}
Reset the onHandleError(BiConsumer<Thread,? super Throwable>)
hook to the default no-op behavior. /**
* Reset the {@link #onHandleError(BiConsumer)} hook to the default no-op behavior.
*/
public static void resetOnHandleError() {
if (log.isDebugEnabled()) {
log.debug("Reset to factory defaults: onHandleError");
}
onHandleErrorHook = null;
}
Replace Schedulers
factories (newParallel
, newSingle
and newElastic
). Also shutdown Schedulers from the cached factories (like single()
) in order to also use these replacements, re-creating the shared schedulers from the new factory upon next use.
This method should be called safely and with caution, typically on app startup.
Params: - factoryInstance – an arbitrary
Factory
instance.
/**
* Replace {@link Schedulers} factories ({@link #newParallel(String) newParallel},
* {@link #newSingle(String) newSingle} and {@link #newElastic(String) newElastic}). Also
* shutdown Schedulers from the cached factories (like {@link #single()}) in order to
* also use these replacements, re-creating the shared schedulers from the new factory
* upon next use.
* <p>
* This method should be called safely and with caution, typically on app startup.
*
* @param factoryInstance an arbitrary {@link Factory} instance.
*/
public static void setFactory(Factory factoryInstance) {
Objects.requireNonNull(factoryInstance, "factoryInstance");
shutdownNow();
factory = factoryInstance;
}
Clear any cached Scheduler
and call dispose on them. /**
* Clear any cached {@link Scheduler} and call dispose on them.
*/
public static void shutdownNow() {
CachedScheduler oldElastic = CACHED_ELASTIC.getAndSet(null);
CachedScheduler oldParallel = CACHED_PARALLEL.getAndSet(null);
CachedScheduler oldSingle = CACHED_SINGLE.getAndSet(null);
if (oldElastic != null) oldElastic._dispose();
if (oldParallel != null) oldParallel._dispose();
if (oldSingle != null) oldSingle._dispose();
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose. Returns: default instance of a Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose.
*
* @return default instance of a {@link Scheduler} that hosts a single-threaded
* ExecutorService-based worker
*/
public static Scheduler single() {
return cache(CACHED_SINGLE, SINGLE, SINGLE_SUPPLIER);
}
Wraps a single Worker
from some other Scheduler
and provides Worker
services on top of it. Use the Scheduler.dispose()
to release the wrapped worker.
Params: Returns: a wrapping Scheduler
consistently returning a same worker from a source Scheduler
/**
* Wraps a single {@link reactor.core.scheduler.Scheduler.Worker} from some other
* {@link Scheduler} and provides {@link reactor.core.scheduler.Scheduler.Worker}
* services on top of it.
* <p>
* Use the {@link Scheduler#dispose()} to release the wrapped worker.
*
* @param original a {@link Scheduler} to call upon to get the single {@link
* reactor.core.scheduler.Scheduler.Worker}
*
* @return a wrapping {@link Scheduler} consistently returning a same worker from a
* source {@link Scheduler}
*/
public static Scheduler single(Scheduler original) {
return new SingleWorkerScheduler(original);
}
Public factory hook to override Schedulers behavior globally
/**
* Public factory hook to override Schedulers behavior globally
*/
public interface Factory {
Override this method to decorate ScheduledExecutorService
internally used by Reactor's various Scheduler
implementations, allowing to tune the ScheduledExecutorService
backing implementation. Params: - schedulerType – a name hinting at the flavor of Scheduler being tuned.
- actual – the default backing implementation, provided lazily as a Supplier
so that you can bypass instantiation completely if you want to replace it.
Returns: the internal ScheduledExecutorService
instance to use.
/**
* Override this method to decorate {@link ScheduledExecutorService} internally used by
* Reactor's various {@link Scheduler} implementations, allowing to tune the
* {@link ScheduledExecutorService} backing implementation.
*
* @param schedulerType a name hinting at the flavor of Scheduler being tuned.
* @param actual the default backing implementation, provided lazily as a Supplier
* so that you can bypass instantiation completely if you want to replace it.
* @return the internal {@link ScheduledExecutorService} instance to use.
*/
default ScheduledExecutorService decorateExecutorService(String schedulerType,
Supplier<? extends ScheduledExecutorService> actual) {
return actual.get();
}
Scheduler
that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down.
The maximum number of created workers is unbounded.
Params: - ttlSeconds – Time-to-live for an idle
Worker
- threadFactory – a
ThreadFactory
to use
Returns: a new Scheduler
that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down.
/**
* {@link Scheduler} that dynamically creates Workers resources and caches
* eventually, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created workers is unbounded.
*
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param threadFactory a {@link ThreadFactory} to use
*
* @return a new {@link Scheduler} that dynamically creates Workers resources and
* caches eventually, reusing them once the Workers have been shut down.
*/
default Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
return new ElasticScheduler(threadFactory, ttlSeconds);
}
Scheduler
that hosts a fixed pool of workers and is suited for parallel work. Params: - parallelism – Number of pooled workers.
- threadFactory – a
ThreadFactory
to use for the fixed initialized number of Thread
Returns: a new Scheduler
that hosts a fixed pool of workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of workers and is suited for parallel
* work.
*
* @param parallelism Number of pooled workers.
* @param threadFactory a {@link ThreadFactory} to use for the fixed initialized
* number of {@link Thread}
*
* @return a new {@link Scheduler} that hosts a fixed pool of workers and is
* suited for parallel work
*/
default Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
return new ParallelScheduler(parallelism, threadFactory);
}
Scheduler
that hosts a single worker and is suited for non-blocking work. Params: - threadFactory – a
ThreadFactory
to use for the unique resource of the Scheduler
Returns: a new Scheduler
that hosts a single worker
/**
* {@link Scheduler} that hosts a single worker and is suited for non-blocking
* work.
*
* @param threadFactory a {@link ThreadFactory} to use for the unique resource of
* the {@link Scheduler}
*
* @return a new {@link Scheduler} that hosts a single worker
*/
default Scheduler newSingle(ThreadFactory threadFactory) {
return new SingleScheduler(threadFactory);
}
}
// Internals
static final String ELASTIC = "elastic"; // IO stuff
static final String PARALLEL = "parallel"; //scale up common tasks
static final String SINGLE = "single"; //non blocking tasks
static final String IMMEDIATE = "immediate";
static final String FROM_EXECUTOR = "fromExecutor";
static final String FROM_EXECUTOR_SERVICE = "fromExecutorService";
// Cached schedulers in atomic references:
static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_SINGLE = new AtomicReference<>();
static final Supplier<Scheduler> ELASTIC_SUPPLIER =
() -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);
static final Supplier<Scheduler> PARALLEL_SUPPLIER =
() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);
static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true);
static final Factory DEFAULT = new Factory() {
};
static volatile Factory factory = DEFAULT;
Get a CachedScheduler
out of the reference
or create one using the Supplier
if the reference is empty, effectively creating a single instance to be reused as a default scheduler for the given key
category. Params: - reference – the cache reference that holds the scheduler
- key – the "name" for the Scheduler's category/type
- supplier – the
Scheduler
generator to use and wrap into a CachedScheduler
. Note that in case of a race, an extraneous Scheduler can be created, but it'll get immediately disposed
.
Returns: a CachedScheduler
to be reused, either pre-existing or created
/**
* Get a {@link CachedScheduler} out of the {@code reference} or create one using the
* {@link Supplier} if the reference is empty, effectively creating a single instance
* to be reused as a default scheduler for the given {@code key} category.
*
* @param reference the cache reference that holds the scheduler
* @param key the "name" for the Scheduler's category/type
* @param supplier the {@link Scheduler} generator to use and wrap into a {@link CachedScheduler}.
* Note that in case of a race, an extraneous Scheduler can be created, but it'll get
* immediately {@link Scheduler#dispose() disposed}.
* @return a {@link CachedScheduler} to be reused, either pre-existing or created
*/
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
CachedScheduler s = reference.get();
if (s != null) {
return s;
}
s = new CachedScheduler(key, supplier.get());
if (reference.compareAndSet(null, s)) {
return s;
}
//the reference was updated in the meantime with a cached scheduler
//fallback to it and dispose the extraneous one
s._dispose();
return reference.get();
}
static final Logger log = Loggers.getLogger(Schedulers.class);
static final void defaultUncaughtException(Thread t, Throwable e) {
Schedulers.log.error("Scheduler worker in group " + t.getThreadGroup().getName()
+ " failed with an uncaught exception", e);
}
static void handleError(Throwable ex) {
Thread thread = Thread.currentThread();
Throwable t = unwrap(ex);
Thread.UncaughtExceptionHandler x = thread.getUncaughtExceptionHandler();
if (x != null) {
x.uncaughtException(thread, t);
}
else {
log.error("Scheduler worker failed with an uncaught exception", t);
}
if (onHandleErrorHook != null) {
onHandleErrorHook.accept(thread, t);
}
}
static class CachedScheduler implements Scheduler, Supplier<Scheduler>, Scannable {
final Scheduler cached;
final String key;
CachedScheduler(String key, Scheduler cached) {
this.cached = cached;
this.key = key;
}
@Override
public Disposable schedule(Runnable task) {
return cached.schedule(task);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
return cached.schedule(task, delay, unit);
}
@Override
public Disposable schedulePeriodically(Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
return cached.schedulePeriodically(task, initialDelay, period, unit);
}
@Override
public Worker createWorker() {
return cached.createWorker();
}
@Override
public long now(TimeUnit unit) {
return cached.now(unit);
}
@Override
public void start() {
cached.start();
}
@Override
public void dispose() {
}
@Override
public boolean isDisposed() {
return cached.isDisposed();
}
@Override
public String toString() {
return cached.toString();
}
@Override
public Object scanUnsafe(Attr key) {
return Scannable.from(cached).scanUnsafe(key);
}
Get the Scheduler
that is cached and wrapped inside this CachedScheduler
. Returns: the cached Scheduler
/**
* Get the {@link Scheduler} that is cached and wrapped inside this
* {@link CachedScheduler}.
*
* @return the cached Scheduler
*/
@Override
public Scheduler get() {
return cached;
}
void _dispose() {
cached.dispose();
}
}
static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
SchedulerTask sr = new SchedulerTask(task);
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
return sr;
}
static Disposable directSchedulePeriodically(ScheduledExecutorService exec,
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
PeriodicSchedulerTask sr = new PeriodicSchedulerTask(task);
Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.setFuture(f);
return sr;
}
static Disposable workerSchedule(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long delay,
TimeUnit unit) {
WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
return sr;
}
static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
if (period <= 0L) {
InstantPeriodicWorkerTask isr =
new InstantPeriodicWorkerTask(task, exec, tasks);
try {
Future<?> f;
if (initialDelay <= 0L) {
f = exec.submit(isr);
}
else {
f = exec.schedule(isr, initialDelay, unit);
}
isr.setFirst(f);
}
catch (RejectedExecutionException ex) {
isr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
catch (IllegalArgumentException | NullPointerException ex) {
isr.dispose();
//IllegalArgumentException are wrapped into RejectedExecutionException and propagated up
throw new RejectedExecutionException(ex);
}
return isr;
}
PeriodicWorkerTask sr = new PeriodicWorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
catch (IllegalArgumentException | NullPointerException ex) {
sr.dispose();
//IllegalArgumentException are wrapped into RejectedExecutionException and propagated up
throw new RejectedExecutionException(ex);
}
return sr;
}
static ScheduledExecutorService decorateExecutorService(String schedulerType,
Supplier<? extends ScheduledExecutorService> actual) {
return factory.decorateExecutorService(schedulerType, actual);
}
Scan an Executor
or ExecutorService
, recognizing several special implementations. Unwraps some decorating schedulers, recognizes Scannable
schedulers and delegates to their Scannable.scanUnsafe(Attr)
method, introspects ThreadPoolExecutor
instances. If no data can be extracted, defaults to the provided orElse
scanUnsafe
.
Params: - executor – the executor to introspect in a best effort manner.
- key – the key to scan for. CAPACITY and BUFFERED mainly.
Returns: an equivalent of Scannable.scanUnsafe(Attr)
but that can also work on some implementations of Executor
/**
* Scan an {@link Executor} or {@link ExecutorService}, recognizing several special
* implementations. Unwraps some decorating schedulers, recognizes {@link Scannable}
* schedulers and delegates to their {@link Scannable#scanUnsafe(Scannable.Attr)}
* method, introspects {@link ThreadPoolExecutor} instances.
* <p>
* If no data can be extracted, defaults to the provided {@code orElse}
* {@link Scannable#scanUnsafe(Scannable.Attr) scanUnsafe}.
*
* @param executor the executor to introspect in a best effort manner.
* @param key the key to scan for. CAPACITY and BUFFERED mainly.
* @return an equivalent of {@link Scannable#scanUnsafe(Scannable.Attr)} but that can
* also work on some implementations of {@link Executor}
*/
@Nullable
static final Object scanExecutor(Executor executor, Scannable.Attr key) {
if (executor instanceof DelegateServiceScheduler.UnsupportedScheduledExecutorService) {
executor = ((DelegateServiceScheduler.UnsupportedScheduledExecutorService) executor).get();
}
if (executor instanceof Scannable) {
return ((Scannable) executor).scanUnsafe(key);
}
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
if (key == Scannable.Attr.TERMINATED) return service.isTerminated();
if (key == Scannable.Attr.CANCELLED) return service.isShutdown();
}
if (executor instanceof ThreadPoolExecutor) {
final ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) executor;
if (key == Scannable.Attr.CAPACITY) return poolExecutor.getMaximumPoolSize();
if (key == Scannable.Attr.BUFFERED) return ((Long) (poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount())).intValue();
if (key == Scannable.Attr.LARGE_BUFFERED) return poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount();
}
return null;
}
}