/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.scheduler;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.Metrics;
import reactor.util.annotation.Nullable;
import static reactor.core.Exceptions.unwrap;
Schedulers
provides various Scheduler
flavors usable by publishOn
or
subscribeOn
:
parallel()
: Optimized for fast Runnable
non-blocking executions
single
: Optimized for low-latency Runnable
one-off executions
elastic()
: Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) can grow indefinitely
boundedElastic()
: Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped
immediate
: to immediately run submitted Runnable
instead of scheduling them (somewhat of a no-op or "null object" Scheduler
)
fromExecutorService(ExecutorService)
to create new instances around Executors
Factories prefixed with new
(eg. newBoundedElastic(int, int, String)
return a new instance of their flavor of Scheduler
, while other factories like boundedElastic()
return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler. All instances are returned in a started
state.
Author: Stephane Maldini
/**
* {@link Schedulers} provides various {@link Scheduler} flavors usable by {@link
* reactor.core.publisher.Flux#publishOn(Scheduler) publishOn} or {@link reactor.core.publisher.Mono#subscribeOn
* subscribeOn} :
* <p>
* <ul>
* <li>{@link #parallel()}: Optimized for fast {@link Runnable} non-blocking executions </li>
* <li>{@link #single}: Optimized for low-latency {@link Runnable} one-off executions </li>
* <li>{@link #elastic()}: Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) can grow indefinitely</li>
* <li>{@link #boundedElastic()}: Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped</li>
* <li>{@link #immediate}: to immediately run submitted {@link Runnable} instead of scheduling them (somewhat of a no-op or "null object" {@link Scheduler})</li>
* <li>{@link #fromExecutorService(ExecutorService)} to create new instances around {@link java.util.concurrent.Executors} </li>
* </ul>
* <p>
* Factories prefixed with {@code new} (eg. {@link #newBoundedElastic(int, int, String)} return a new instance of their flavor of {@link Scheduler},
* while other factories like {@link #boundedElastic()} return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler.
* All instances are returned in a {@link Scheduler#start() started} state.
*
* @author Stephane Maldini
*/
public abstract class Schedulers {
Default pool size, initialized by system property reactor.schedulers.defaultPoolSize
and falls back to the number of processors available to the runtime on init. See Also:
/**
* Default pool size, initialized by system property {@code reactor.schedulers.defaultPoolSize}
* and falls back to the number of processors available to the runtime on init.
*
* @see Runtime#availableProcessors()
*/
public static final int DEFAULT_POOL_SIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultPoolSize"))
.map(Integer::parseInt)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
Default maximum size for the global boundedElastic()
Scheduler
, initialized by system property reactor.schedulers.defaultBoundedElasticSize
and falls back to 10 x number of processors available to the runtime on init. See Also:
/**
* Default maximum size for the global {@link #boundedElastic()} {@link Scheduler}, initialized
* by system property {@code reactor.schedulers.defaultBoundedElasticSize} and falls back to 10 x number
* of processors available to the runtime on init.
*
* @see Runtime#availableProcessors()
* @see #boundedElastic()
*/
public static final int DEFAULT_BOUNDED_ELASTIC_SIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultBoundedElasticSize"))
.map(Integer::parseInt)
.orElseGet(() -> 10 * Runtime.getRuntime().availableProcessors());
Default maximum number of enqueued tasks PER THREAD for the global boundedElastic()
Scheduler
, initialized by system property reactor.schedulers.defaultBoundedElasticQueueSize
and falls back to a bound of 100 000 tasks per backing thread. See Also:
/**
* Default maximum number of enqueued tasks PER THREAD for the global {@link #boundedElastic()} {@link Scheduler},
* initialized by system property {@code reactor.schedulers.defaultBoundedElasticQueueSize} and falls back to
* a bound of 100 000 tasks per backing thread.
*
* @see #boundedElastic()
*/
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultBoundedElasticQueueSize"))
.map(Integer::parseInt)
.orElse(100000);
@Nullable
static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;
Create a Scheduler
which uses a backing Executor
to schedule Runnables for async operators. Tasks scheduled with workers of this Scheduler are not guaranteed to run in FIFO order and strictly non-concurrently. If FIFO order is desired, use trampoline parameter of fromExecutor(Executor, boolean)
Params: - executor – an
Executor
Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
*
* <p>Tasks scheduled with workers of this Scheduler are not guaranteed to run in FIFO
* order and strictly non-concurrently.
* If FIFO order is desired, use trampoline parameter of {@link Schedulers#fromExecutor(Executor, boolean)}
*
* @param executor an {@link Executor}
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutor(Executor executor) {
return fromExecutor(executor, false);
}
Create a Scheduler
which uses a backing Executor
to schedule Runnables for async operators. Trampolining here means tasks submitted in a burst are queued by the Worker itself, which acts as a sole task from the perspective of the ExecutorService
, so no reordering (but also no threading). Params: Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
*
* Trampolining here means tasks submitted in a burst are queued by the Worker itself,
* which acts as a sole task from the perspective of the {@link ExecutorService},
* so no reordering (but also no threading).
*
* @param executor an {@link Executor}
* @param trampoline set to false if this {@link Scheduler} is used by "operators"
* that already conflate {@link Runnable} executions (publishOn, subscribeOn...)
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutor(Executor executor, boolean trampoline) {
if(!trampoline && executor instanceof ExecutorService){
return fromExecutorService((ExecutorService) executor);
}
final ExecutorScheduler scheduler = new ExecutorScheduler(executor, trampoline);
scheduler.start();
return scheduler;
}
Create a Scheduler
which uses a backing ExecutorService
to schedule Runnables for async operators. Prefer using fromExecutorService(ExecutorService, String)
, especially if you plan on using metrics as this gives the executor a meaningful identifier.
Params: - executorService – an
ExecutorService
Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link ExecutorService} to schedule
* Runnables for async operators.
* <p>
* Prefer using {@link #fromExecutorService(ExecutorService, String)},
* especially if you plan on using metrics as this gives the executor a meaningful identifier.
*
* @param executorService an {@link ExecutorService}
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutorService(ExecutorService executorService) {
String executorServiceHashcode = Integer.toHexString(System.identityHashCode(executorService));
return fromExecutorService(executorService, "anonymousExecutor@" + executorServiceHashcode);
}
Create a Scheduler
which uses a backing ExecutorService
to schedule Runnables for async operators. Params: - executorService – an
ExecutorService
Returns: a new Scheduler
/**
* Create a {@link Scheduler} which uses a backing {@link ExecutorService} to schedule
* Runnables for async operators.
*
* @param executorService an {@link ExecutorService}
*
* @return a new {@link Scheduler}
*/
public static Scheduler fromExecutorService(ExecutorService executorService, String executorName) {
final DelegateServiceScheduler scheduler = new DelegateServiceScheduler(executorName, executorService);
scheduler.start();
return scheduler;
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the appropriate
factory to set a different value.
This scheduler is not restartable.
Returns: default instance of a Scheduler
that dynamically creates ExecutorService-based Workers and caches the threads, reusing them once the Workers have been shut down Deprecated: use boundedElastic()
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the appropriate
* factory to set a different value.
* <p>
* This scheduler is not restartable.
*
* @return default instance of a {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the threads, reusing them once the Workers have been shut
* down
* @deprecated use {@link #boundedElastic()}, to be removed in 3.5.0
*/
@Deprecated
public static Scheduler elastic() {
return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER);
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying daemon threads can be evicted if idle for more than 60
seconds. The maximum number of created threads is bounded by a cap
(by default ten times the number of available CPU cores, see DEFAULT_BOUNDED_ELASTIC_SIZE
). The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded (by default 100K additional tasks, see DEFAULT_BOUNDED_ELASTIC_QUEUESIZE
). Past that point, a RejectedExecutionException
is thrown.
By order of preference, threads backing a new Worker
are picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.
Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
a second worker could end up being backed by the same thread and see tasks rejected.
The picking of the backing thread is also done once and for all at worker creation, so
tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
despite another backing thread becoming idle in the meantime.
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying daemon
* threads can be evicted if idle for more than {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* <p>
* The maximum number of created threads is bounded by a {@code cap} (by default
* ten times the number of available CPU cores, see {@link #DEFAULT_BOUNDED_ELASTIC_SIZE}).
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded (by default 100K additional tasks, see
* {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE}). Past that point, a {@link RejectedExecutionException}
* is thrown.
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
*
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. Returns: default instance of a Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work.
*
* @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler parallel() {
return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER);
}
Executes tasks immediately instead of scheduling them.
As a consequence tasks run on the thread that submitted them (eg. the thread on which an operator is currently processing its onNext/onComplete/onError signals). This Scheduler
is typically used as a "null object" for APIs that require a Scheduler but one doesn't want to change threads.
Returns: a reusable Scheduler
that executes tasks immediately instead of scheduling them
/**
* Executes tasks immediately instead of scheduling them.
* <p>
* As a consequence tasks run on the thread that submitted them (eg. the
* thread on which an operator is currently processing its onNext/onComplete/onError signals).
* This {@link Scheduler} is typically used as a "null object" for APIs that require a
* Scheduler but one doesn't want to change threads.
*
* @return a reusable {@link Scheduler} that executes tasks immediately instead of scheduling them
*/
public static Scheduler immediate() {
return ImmediateScheduler.instance();
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the appropriate
factory to set a different value.
This scheduler is not restartable.
Params: - name – Thread prefix
Returns: a new Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down Deprecated: use newBoundedElastic(int, int, String)
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the appropriate
* factory to set a different value.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the thread pools, reusing them once the Workers have been shut
* down
* @deprecated use {@link #newBoundedElastic(int, int, String)}, to be removed in 3.5.0
*/
@Deprecated
public static Scheduler newElastic(String name) {
return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
Returns: a new Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down Deprecated: use newBoundedElastic(int, int, String, int)
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
*
* @return a new {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the thread pools, reusing them once the Workers have been shut
* down
* @deprecated use {@link #newBoundedElastic(int, int, String, int)}, to be removed in 3.5.0
*/
@Deprecated
public static Scheduler newElastic(String name, int ttlSeconds) {
return newElastic(name, ttlSeconds, false);
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down Deprecated: use newBoundedElastic(int, int, String, int, boolean)
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the thread pools, reusing them once the Workers have been shut
* down
* @deprecated use {@link #newBoundedElastic(int, int, String, int, boolean)}, to be removed in 3.5.0
*/
@Deprecated
public static Scheduler newElastic(String name, int ttlSeconds, boolean daemon) {
return newElastic(ttlSeconds,
new ReactorThreadFactory(name, ElasticScheduler.COUNTER, daemon, false,
Schedulers::defaultUncaughtException));
}
Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
This scheduler is not restartable.
Params: - ttlSeconds – Time-to-live for an idle
Worker
- threadFactory – a
ThreadFactory
to use each thread initialization
Returns: a new Scheduler
that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down Deprecated: use newBoundedElastic(int, int, ThreadFactory, int)
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
* the thread pools, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* This scheduler is not restartable.
*
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
*
* @return a new {@link Scheduler} that dynamically creates ExecutorService-based
* Workers and caches the thread pools, reusing them once the Workers have been shut
* down
* @deprecated use {@link #newBoundedElastic(int, int, ThreadFactory, int)}, to be removed in 3.5.0
*/
@Deprecated
public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
final Scheduler fromFactory = factory.newElastic(ttlSeconds, threadFactory);
fromFactory.start();
return fromFactory;
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than 60
seconds. The maximum number of created threads is bounded by the provided threadCap
. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap
. Past that point, a RejectedExecutionException
is thrown.
By order of preference, threads backing a new Worker
are picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.
Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
a second worker could end up being backed by the same thread and see tasks rejected.
The picking of the backing thread is also done once and for all at worker creation, so
tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
despite another backing thread becoming idle in the meantime.
This scheduler is restartable. Backing threads are user threads, so they will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been disposed
.
Params: - threadCap – maximum number of underlying threads to create
- queuedTaskCap – maximum number of tasks to enqueue when no more threads can be created. Can be
Integer.MAX_VALUE
for unbounded enqueueing. - name – Thread prefix
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying (user)
* threads can be evicted if idle for more than {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* <p>
* The maximum number of created threads is bounded by the provided {@code threadCap}.
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded by the provided {@code queuedTaskCap}. Past that point,
* a {@link RejectedExecutionException} is thrown.
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
* <p>
* This scheduler is restartable. Backing threads are user threads, so they will prevent the JVM
* from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole
* scheduler has been {@link Scheduler#dispose() disposed}.
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, String name) {
return newBoundedElastic(threadCap, queuedTaskCap, name, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, false);
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than the provided ttlSeconds
. The maximum number of created threads is bounded by the provided threadCap
. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap
. Past that point, a RejectedExecutionException
is thrown.
By order of preference, threads backing a new Worker
are picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.
Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
a second worker could end up being backed by the same thread and see tasks rejected.
The picking of the backing thread is also done once and for all at worker creation, so
tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
despite another backing thread becoming idle in the meantime.
This scheduler is restartable. Backing threads are user threads, so they will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been disposed
.
Params: - threadCap – maximum number of underlying threads to create
- queuedTaskCap – maximum number of tasks to enqueue when no more threads can be created. Can be
Integer.MAX_VALUE
for unbounded enqueueing. - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying (user)
* threads can be evicted if idle for more than the provided {@code ttlSeconds}.
* <p>
* The maximum number of created threads is bounded by the provided {@code threadCap}.
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded by the provided {@code queuedTaskCap}. Past that point,
* a {@link RejectedExecutionException} is thrown.
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
* <p>
* This scheduler is restartable. Backing threads are user threads, so they will prevent the JVM
* from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole
* scheduler has been {@link Scheduler#dispose() disposed}.
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds) {
return newBoundedElastic(threadCap, queuedTaskCap, name, ttlSeconds, false);
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user or daemon) threads can be evicted if idle for more than the provided ttlSeconds
. The maximum number of created threads is bounded by the provided threadCap
. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap
. Past that point, a RejectedExecutionException
is thrown.
By order of preference, threads backing a new Worker
are picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.
Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
a second worker could end up being backed by the same thread and see tasks rejected.
The picking of the backing thread is also done once and for all at worker creation, so
tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
despite another backing thread becoming idle in the meantime.
This scheduler is restartable. Depending on the daemon
parameter, backing threads can be user threads or daemon threads. Note that user threads will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been disposed
.
Params: - threadCap – maximum number of underlying threads to create
- queuedTaskCap – maximum number of tasks to enqueue when no more threads can be created. Can be
Integer.MAX_VALUE
for unbounded enqueueing. - name – Thread prefix
- ttlSeconds – Time-to-live for an idle
Worker
- daemon – are backing threads
daemon threads
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying (user or daemon)
* threads can be evicted if idle for more than the provided {@code ttlSeconds}.
* <p>
* The maximum number of created threads is bounded by the provided {@code threadCap}.
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded by the provided {@code queuedTaskCap}. Past that point,
* a {@link RejectedExecutionException} is thrown.
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
* <p>
* This scheduler is restartable. Depending on the {@code daemon} parameter, backing threads can be
* user threads or daemon threads. Note that user threads will prevent the JVM from exiting until their
* worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been
* {@link Scheduler#dispose() disposed}.
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param daemon are backing threads {@link Thread#setDaemon(boolean) daemon threads}
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, String name, int ttlSeconds, boolean daemon) {
return newBoundedElastic(threadCap, queuedTaskCap,
new ReactorThreadFactory(name, ElasticScheduler.COUNTER, daemon, false,
Schedulers::defaultUncaughtException),
ttlSeconds);
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user) threads can be evicted if idle for more than the provided ttlSeconds
. The maximum number of created threads is bounded by the provided threadCap
. The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded by the provided queuedTaskCap
. Past that point, a RejectedExecutionException
is thrown.
By order of preference, threads backing a new Worker
are picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort attempt at picking the thread backing the least amount of workers is made.
Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
a second worker could end up being backed by the same thread and see tasks rejected.
The picking of the backing thread is also done once and for all at worker creation, so
tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
despite another backing thread becoming idle in the meantime.
This scheduler is restartable. Backing threads are created by the provided ThreadFactory
, which can decide whether to create user threads or daemon threads. Note that user threads will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been disposed
.
Params: - threadCap – maximum number of underlying threads to create
- queuedTaskCap – maximum number of tasks to enqueue when no more threads can be created. Can be
Integer.MAX_VALUE
for unbounded enqueueing. - threadFactory – a
ThreadFactory
to use each thread initialization - ttlSeconds – Time-to-live for an idle
Worker
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying (user)
* threads can be evicted if idle for more than the provided {@code ttlSeconds}.
* <p>
* The maximum number of created threads is bounded by the provided {@code threadCap}.
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded by the provided {@code queuedTaskCap}. Past that point,
* a {@link RejectedExecutionException} is thrown.
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
* <p>
* This scheduler is restartable. Backing threads are created by the provided {@link ThreadFactory},
* which can decide whether to create user threads or daemon threads. Note that user threads
* will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL,
* or the whole scheduler has been {@link Scheduler#dispose() disposed}.
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) {
Scheduler fromFactory = factory.newBoundedElastic(threadCap,
queuedTaskCap,
threadFactory,
ttlSeconds);
fromFactory.start();
return fromFactory;
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name) {
return newParallel(name, DEFAULT_POOL_SIZE);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
- parallelism – Number of pooled workers.
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism) {
return newParallel(name, parallelism, false);
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Thread prefix
- parallelism – Number of pooled workers.
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
* rejects usage of blocking Reactor APIs.
*
* @param name Thread prefix
* @param parallelism Number of pooled workers.
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(String name, int parallelism, boolean daemon) {
return newParallel(parallelism,
new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon,
true, Schedulers::defaultUncaughtException));
}
Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work. Params: - parallelism – Number of pooled workers.
- threadFactory – a
ThreadFactory
to use for the fixed initialized number of Thread
Returns: a new Scheduler
that hosts a fixed pool of single-threaded ExecutorService-based workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work.
*
* @param parallelism Number of pooled workers.
* @param threadFactory a {@link ThreadFactory} to use for the fixed initialized
* number of {@link Thread}
*
* @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
* ExecutorService-based workers and is suited for parallel work
*/
public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory);
fromFactory.start();
return fromFactory;
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler
detects and rejects usage * of blocking Reactor APIs. Params: - name – Component and thread name prefix
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. This type of {@link Scheduler} detects and rejects usage
* * of blocking Reactor APIs.
*
* @param name Component and thread name prefix
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(String name) {
return newSingle(name, false);
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. This type of Scheduler
detects and rejects usage of blocking Reactor APIs. Params: - name – Component and thread name prefix
- daemon – false if the
Scheduler
requires an explicit Scheduler.dispose()
to exit the VM.
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. This type of {@link Scheduler} detects and rejects usage
* of blocking Reactor APIs.
*
* @param name Component and thread name prefix
* @param daemon false if the {@link Scheduler} requires an explicit {@link
* Scheduler#dispose()} to exit the VM.
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(String name, boolean daemon) {
return newSingle(new ReactorThreadFactory(name, SingleScheduler.COUNTER, daemon,
true, Schedulers::defaultUncaughtException));
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. Params: - threadFactory – a
ThreadFactory
to use for the unique thread of the Scheduler
Returns: a new Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work.
*
* @param threadFactory a {@link ThreadFactory} to use for the unique thread of the
* {@link Scheduler}
*
* @return a new {@link Scheduler} that hosts a single-threaded ExecutorService-based
* worker
*/
public static Scheduler newSingle(ThreadFactory threadFactory) {
final Scheduler fromFactory = factory.newSingle(threadFactory);
fromFactory.start();
return fromFactory;
}
Define a hook that is executed when a Scheduler
has handled an error
. Note that it is executed after the error has been passed to the thread uncaughtErrorHandler, which is not the case when a fatal error occurs (see Exceptions.throwIfJvmFatal(Throwable)
). Params: - c – the new hook to set.
/**
* Define a hook that is executed when a {@link Scheduler} has
* {@link #handleError(Throwable) handled an error}. Note that it is executed after
* the error has been passed to the thread uncaughtErrorHandler, which is not the
* case when a fatal error occurs (see {@link Exceptions#throwIfJvmFatal(Throwable)}).
*
* @param c the new hook to set.
*/
public static void onHandleError(BiConsumer<Thread, ? super Throwable> c) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Hooking new default: onHandleError");
}
onHandleErrorHook = Objects.requireNonNull(c, "onHandleError");
}
Check if calling a Reactor blocking API in the current Thread
is forbidden or not, by checking if the thread implements NonBlocking
(in which case it is forbidden and this method returns true
). Returns: true
if blocking is forbidden in this thread, false
otherwise
/**
* Check if calling a Reactor blocking API in the current {@link Thread} is forbidden
* or not, by checking if the thread implements {@link NonBlocking} (in which case it is
* forbidden and this method returns {@code true}).
*
* @return {@code true} if blocking is forbidden in this thread, {@code false} otherwise
*/
public static boolean isInNonBlockingThread() {
return Thread.currentThread() instanceof NonBlocking;
}
Check if calling a Reactor blocking API in the given Thread
is forbidden or not, by checking if the thread implements NonBlocking
(in which case it is forbidden and this method returns true
). Returns: true
if blocking is forbidden in that thread, false
otherwise
/**
* Check if calling a Reactor blocking API in the given {@link Thread} is forbidden
* or not, by checking if the thread implements {@link NonBlocking} (in which case it is
* forbidden and this method returns {@code true}).
*
* @return {@code true} if blocking is forbidden in that thread, {@code false} otherwise
*/
public static boolean isNonBlockingThread(Thread t) {
return t instanceof NonBlocking;
}
If Micrometer is available, set-up a decorator that will instrument any ExecutorService
that backs a Scheduler
. No-op if Micrometer isn't available. The MeterRegistry
used by reactor can be configured via MicrometerConfiguration.useRegistry(MeterRegistry)
prior to using this method, the default being globalRegistry.globalRegistry
.
Implementation Note: Note that this is added as a decorator via Schedulers when enabling metrics for schedulers, which doesn't change the Factory.
/**
* If Micrometer is available, set-up a decorator that will instrument any
* {@link ExecutorService} that backs a {@link Scheduler}.
* No-op if Micrometer isn't available.
*
* <p>
* The {@link MeterRegistry} used by reactor can be configured via
* {@link reactor.util.Metrics.MicrometerConfiguration#useRegistry(MeterRegistry)}
* prior to using this method, the default being
* {@link io.micrometer.core.instrument.Metrics#globalRegistry}.
* </p>
*
* @implNote Note that this is added as a decorator via Schedulers when enabling metrics for schedulers, which doesn't change the Factory.
*/
public static void enableMetrics() {
if (Metrics.isInstrumentationAvailable()) {
addExecutorServiceDecorator(SchedulerMetricDecorator.METRICS_DECORATOR_KEY, new SchedulerMetricDecorator());
}
}
If enableMetrics()
has been previously called, removes the decorator. No-op if enableMetrics()
hasn't been called. /**
* If {@link #enableMetrics()} has been previously called, removes the decorator.
* No-op if {@link #enableMetrics()} hasn't been called.
*/
public static void disableMetrics() {
removeExecutorServiceDecorator(SchedulerMetricDecorator.METRICS_DECORATOR_KEY);
}
Re-apply default factory to Schedulers
/**
* Re-apply default factory to {@link Schedulers}
*/
public static void resetFactory() {
setFactory(DEFAULT);
}
Replace Schedulers
factories (newParallel
, newSingle
and newBoundedElastic
). Unlike setFactory(Factory)
, doesn't shutdown previous Schedulers but capture them in a Snapshot
that can be later restored via resetFrom(Snapshot)
.
This method should be called safely and with caution, typically on app startup.
Params: - newFactory – an arbitrary
Factory
instance
Returns: a Snapshot
representing a restorable snapshot of Schedulers
/**
* Replace {@link Schedulers} factories ({@link #newParallel(String) newParallel},
* {@link #newSingle(String) newSingle} and {@link #newBoundedElastic(int, int, String) newBoundedElastic}).
* Unlike {@link #setFactory(Factory)}, doesn't shutdown previous Schedulers but
* capture them in a {@link Snapshot} that can be later restored via {@link #resetFrom(Snapshot)}.
* <p>
* This method should be called safely and with caution, typically on app startup.
*
* @param newFactory an arbitrary {@link Factory} instance
* @return a {@link Snapshot} representing a restorable snapshot of {@link Schedulers}
*/
public static Snapshot setFactoryWithSnapshot(Factory newFactory) {
//nulling out CACHED references ensures that the schedulers won't be disposed
//when setting the newFactory via setFactory
Snapshot snapshot = new Snapshot(
CACHED_ELASTIC.getAndSet(null),
CACHED_BOUNDED_ELASTIC.getAndSet(null),
CACHED_PARALLEL.getAndSet(null),
CACHED_SINGLE.getAndSet(null),
factory);
setFactory(newFactory);
return snapshot;
}
Replace the current Factory and shared Schedulers with the ones saved in a previously captured
snapshot. Passing null
re-applies the default factory.
/**
* Replace the current Factory and shared Schedulers with the ones saved in a
* previously {@link #setFactoryWithSnapshot(Factory) captured} snapshot.
* <p>
* Passing {@code null} re-applies the default factory.
*/
public static void resetFrom(@Nullable Snapshot snapshot) {
if (snapshot == null) {
resetFactory();
return;
}
//Restore the atomic references first, so that concurrent calls to Schedulers either
//get a soon-to-be-shutdown instance or the restored instance
CachedScheduler oldElastic = CACHED_ELASTIC.getAndSet(snapshot.oldElasticScheduler);
CachedScheduler oldBoundedElastic = CACHED_BOUNDED_ELASTIC.getAndSet(snapshot.oldBoundedElasticScheduler);
CachedScheduler oldParallel = CACHED_PARALLEL.getAndSet(snapshot.oldParallelScheduler);
CachedScheduler oldSingle = CACHED_SINGLE.getAndSet(snapshot.oldSingleScheduler);
//From there on, we've restored all the snapshoted instances, the factory can be
//restored too and will start backing Schedulers.newXxx().
//We thus never create a CachedScheduler by accident.
factory = snapshot.oldFactory;
//Shutdown the old CachedSchedulers, if any
if (oldElastic != null) oldElastic._dispose();
if (oldBoundedElastic != null) oldBoundedElastic._dispose();
if (oldParallel != null) oldParallel._dispose();
if (oldSingle != null) oldSingle._dispose();
}
Reset the onHandleError(BiConsumer<Thread,? super Throwable>)
hook to the default no-op behavior. /**
* Reset the {@link #onHandleError(BiConsumer)} hook to the default no-op behavior.
*/
public static void resetOnHandleError() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reset to factory defaults: onHandleError");
}
onHandleErrorHook = null;
}
Replace Schedulers
factories (newParallel
, newSingle
and newBoundedElastic
). Also shutdown Schedulers from the cached factories (like single()
) in order to also use these replacements, re-creating the shared schedulers from the new factory upon next use.
This method should be called safely and with caution, typically on app startup.
Params: - factoryInstance – an arbitrary
Factory
instance.
/**
* Replace {@link Schedulers} factories ({@link #newParallel(String) newParallel},
* {@link #newSingle(String) newSingle} and {@link #newBoundedElastic(int, int, String) newBoundedElastic}).
* Also shutdown Schedulers from the cached factories (like {@link #single()}) in order to
* also use these replacements, re-creating the shared schedulers from the new factory
* upon next use.
* <p>
* This method should be called safely and with caution, typically on app startup.
*
* @param factoryInstance an arbitrary {@link Factory} instance.
*/
public static void setFactory(Factory factoryInstance) {
Objects.requireNonNull(factoryInstance, "factoryInstance");
shutdownNow();
factory = factoryInstance;
}
Set up an additional ScheduledExecutorService
decorator for a given key only if that key is not already present. The decorator is a BiFunction
taking the Scheduler and the backing ScheduledExecutorService
as second argument. It returns the decorated ScheduledExecutorService
.
Params: - key – the key under which to set up the decorator
- decorator – the executor service decorator to add, if key not already present.
See Also: Returns: true if the decorator was added, false if a decorator was already present
for this key.
/**
* Set up an additional {@link ScheduledExecutorService} decorator for a given key
* only if that key is not already present.
* <p>
* The decorator is a {@link BiFunction} taking the Scheduler and the backing
* {@link ScheduledExecutorService} as second argument. It returns the
* decorated {@link ScheduledExecutorService}.
*
* @param key the key under which to set up the decorator
* @param decorator the executor service decorator to add, if key not already present.
* @return true if the decorator was added, false if a decorator was already present
* for this key.
* @see #setExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static boolean addExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
return DECORATORS.putIfAbsent(key, decorator) == null;
}
}
Set up an additional ScheduledExecutorService
decorator for a given key, even if that key is already present. The decorator is a BiFunction
taking the Scheduler and the backing ScheduledExecutorService
as second argument. It returns the decorated ScheduledExecutorService
.
Params: - key – the key under which to set up the decorator
- decorator – the executor service decorator to add, if key not already present.
See Also:
/**
* Set up an additional {@link ScheduledExecutorService} decorator for a given key,
* even if that key is already present.
* <p>
* The decorator is a {@link BiFunction} taking the Scheduler and the backing
* {@link ScheduledExecutorService} as second argument. It returns the
* decorated {@link ScheduledExecutorService}.
*
* @param key the key under which to set up the decorator
* @param decorator the executor service decorator to add, if key not already present.
* @see #addExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static void setExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
DECORATORS.put(key, decorator);
}
}
Remove an existing ScheduledExecutorService
decorator if it has been set up via addExecutorServiceDecorator(String, BiFunction<Scheduler,ScheduledExecutorService,ScheduledExecutorService>)
. In case the decorator implements Disposable
, it is also disposed
.
Params: - key – the key for the executor service decorator to remove
See Also: Returns: the removed decorator, or null if none was set for that key
/**
* Remove an existing {@link ScheduledExecutorService} decorator if it has been set up
* via {@link #addExecutorServiceDecorator(String, BiFunction)}.
* <p>
* In case the decorator implements {@link Disposable}, it is also
* {@link Disposable#dispose() disposed}.
*
* @param key the key for the executor service decorator to remove
* @return the removed decorator, or null if none was set for that key
* @see #addExecutorServiceDecorator(String, BiFunction)
* @see #setExecutorServiceDecorator(String, BiFunction)
*/
public static BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> removeExecutorServiceDecorator(String key) {
BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> removed;
synchronized (DECORATORS) {
removed = DECORATORS.remove(key);
}
if (removed instanceof Disposable) {
((Disposable) removed).dispose();
}
return removed;
}
This method is aimed at Scheduler
implementors, enabling custom implementations that are backed by a ScheduledExecutorService
to also have said executors decorated (ie. for instrumentation purposes).
It applies the decorators added via addExecutorServiceDecorator(String, BiFunction<Scheduler,ScheduledExecutorService,ScheduledExecutorService>)
, so it shouldn't be added as a decorator. Note also that decorators are not guaranteed to be idempotent, so this method should be called only once per executor.
Params: - owner – a
Scheduler
that owns the ScheduledExecutorService
- original – the
ScheduledExecutorService
that the Scheduler
wants to use originally
See Also: Returns: the decorated ScheduledExecutorService
, or the original if no decorator is set up
/**
* This method is aimed at {@link Scheduler} implementors, enabling custom implementations
* that are backed by a {@link ScheduledExecutorService} to also have said executors
* decorated (ie. for instrumentation purposes).
* <p>
* It <strong>applies</strong> the decorators added via
* {@link #addExecutorServiceDecorator(String, BiFunction)}, so it shouldn't be added
* as a decorator. Note also that decorators are not guaranteed to be idempotent, so
* this method should be called only once per executor.
*
* @param owner a {@link Scheduler} that owns the {@link ScheduledExecutorService}
* @param original the {@link ScheduledExecutorService} that the {@link Scheduler}
* wants to use originally
* @return the decorated {@link ScheduledExecutorService}, or the original if no decorator is set up
* @see #addExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
*/
public static ScheduledExecutorService decorateExecutorService(Scheduler owner, ScheduledExecutorService original) {
synchronized (DECORATORS) {
for (BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator : DECORATORS.values()) {
original = decorator.apply(owner, original);
}
}
return original;
}
Add or replace a named scheduling decorator
. With subsequent calls to this method, the onScheduleHook hook can be a composite of several sub-hooks, each with a different key. The sub-hook is a Function
taking the scheduled Runnable
. It returns the decorated Runnable
.
Params: - key – the key under which to set up the onScheduleHook sub-hook
- decorator – the
Runnable
decorator to add (or replace, if key is already present)
See Also:
/**
* Add or replace a named scheduling {@link Function decorator}. With subsequent calls
* to this method, the onScheduleHook hook can be a composite of several sub-hooks, each
* with a different key.
* <p>
* The sub-hook is a {@link Function} taking the scheduled {@link Runnable}.
* It returns the decorated {@link Runnable}.
*
* @param key the key under which to set up the onScheduleHook sub-hook
* @param decorator the {@link Runnable} decorator to add (or replace, if key is already present)
* @see #resetOnScheduleHook(String)
* @see #resetOnScheduleHooks()
*/
public static void onScheduleHook(String key, Function<Runnable, Runnable> decorator) {
synchronized (onScheduleHooks) {
onScheduleHooks.put(key, decorator);
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}
Reset a specific onScheduleHook sub-hook
if it has been set up via onScheduleHook(String, Function<Runnable,Runnable>)
. Params: - key – the key for onScheduleHook sub-hook to remove
See Also:
/**
* Reset a specific onScheduleHook {@link Function sub-hook} if it has been set up
* via {@link #onScheduleHook(String, Function)}.
*
* @param key the key for onScheduleHook sub-hook to remove
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHooks()
*/
public static void resetOnScheduleHook(String key) {
synchronized (onScheduleHooks) {
onScheduleHooks.remove(key);
if (onScheduleHooks.isEmpty()) {
onScheduleHook = Function.identity();
}
else {
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}
}
Remove all onScheduleHook sub-hooks
. See Also:
/**
* Remove all onScheduleHook {@link Function sub-hooks}.
*
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHook(String)
*/
public static void resetOnScheduleHooks() {
synchronized (onScheduleHooks) {
onScheduleHooks.clear();
onScheduleHook = null;
}
}
Applies the hooks registered with onScheduleHook(String, Function<Runnable,Runnable>)
. Params: Returns: decorated Runnable
if any hook is registered, the original otherwise.
/**
* Applies the hooks registered with {@link Schedulers#onScheduleHook(String, Function)}.
*
* @param runnable a {@link Runnable} submitted to a {@link Scheduler}
* @return decorated {@link Runnable} if any hook is registered, the original otherwise.
*/
public static Runnable onSchedule(Runnable runnable) {
Function<Runnable, Runnable> hook = onScheduleHook;
if (hook != null) {
return hook.apply(runnable);
}
else {
return runnable;
}
}
Clear any cached Scheduler
and call dispose on them. /**
* Clear any cached {@link Scheduler} and call dispose on them.
*/
public static void shutdownNow() {
CachedScheduler oldElastic = CACHED_ELASTIC.getAndSet(null);
CachedScheduler oldBoundedElastic = CACHED_BOUNDED_ELASTIC.getAndSet(null);
CachedScheduler oldParallel = CACHED_PARALLEL.getAndSet(null);
CachedScheduler oldSingle = CACHED_SINGLE.getAndSet(null);
if (oldElastic != null) oldElastic._dispose();
if (oldBoundedElastic != null) oldBoundedElastic._dispose();
if (oldParallel != null) oldParallel._dispose();
if (oldSingle != null) oldSingle._dispose();
}
Scheduler
that hosts a single-threaded ExecutorService-based worker and is suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose. Returns: default instance of a Scheduler
that hosts a single-threaded ExecutorService-based worker
/**
* {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is
* suited for parallel work. Will cache the returned schedulers for subsequent calls until dispose.
*
* @return default instance of a {@link Scheduler} that hosts a single-threaded
* ExecutorService-based worker
*/
public static Scheduler single() {
return cache(CACHED_SINGLE, SINGLE, SINGLE_SUPPLIER);
}
Wraps a single Worker
from some other Scheduler
and provides Worker
services on top of it. Unlike with other factory methods in this class, the delegate is assumed to be started
and won't be implicitly started by this method. Use the Scheduler.dispose()
to release the wrapped worker.
Params: Returns: a wrapping Scheduler
consistently returning a same worker from a source Scheduler
/**
* Wraps a single {@link reactor.core.scheduler.Scheduler.Worker} from some other
* {@link Scheduler} and provides {@link reactor.core.scheduler.Scheduler.Worker}
* services on top of it. Unlike with other factory methods in this class, the delegate
* is assumed to be {@link Scheduler#start() started} and won't be implicitly started
* by this method.
* <p>
* Use the {@link Scheduler#dispose()} to release the wrapped worker.
*
* @param original a {@link Scheduler} to call upon to get the single {@link
* reactor.core.scheduler.Scheduler.Worker}
*
* @return a wrapping {@link Scheduler} consistently returning a same worker from a
* source {@link Scheduler}
*/
public static Scheduler single(Scheduler original) {
return new SingleWorkerScheduler(original);
}
Public factory hook to override Schedulers behavior globally
/**
* Public factory hook to override Schedulers behavior globally
*/
public interface Factory {
Scheduler
that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down.
The maximum number of created workers is unbounded.
Params: - ttlSeconds – Time-to-live for an idle
Worker
- threadFactory – a
ThreadFactory
to use
Returns: a new Scheduler
that dynamically creates Workers resources and caches eventually, reusing them once the Workers have been shut down Deprecated: use newBoundedElastic(int, int, ThreadFactory, int)
, to be removed in 3.5.0
/**
* {@link Scheduler} that dynamically creates Workers resources and caches
* eventually, reusing them once the Workers have been shut down.
* <p>
* The maximum number of created workers is unbounded.
*
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
* @param threadFactory a {@link ThreadFactory} to use
*
* @return a new {@link Scheduler} that dynamically creates Workers resources and
* caches eventually, reusing them once the Workers have been shut down
* @deprecated use {@link Factory#newBoundedElastic(int, int, ThreadFactory, int)}, to be removed in 3.5.0
*/
@Deprecated
default Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
return new ElasticScheduler(threadFactory, ttlSeconds);
}
Scheduler
that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying (user or daemon) threads can be evicted if idle for more than ttlSeconds
. The maximum number of created thread pools is bounded by the provided cap
.
Params: - threadCap – maximum number of underlying threads to create
- queuedTaskCap – maximum number of tasks to enqueue when no more threads can be created. Can be
Integer.MAX_VALUE
for unbounded enqueueing. - threadFactory – a
ThreadFactory
to use each thread initialization - ttlSeconds – Time-to-live for an idle
Worker
Returns: a new Scheduler
that dynamically create workers with an upper bound to the number of backing threads, reuses threads and evict idle ones
/**
* {@link Scheduler} that dynamically creates a bounded number of ExecutorService-based
* Workers, reusing them once the Workers have been shut down. The underlying (user or daemon)
* threads can be evicted if idle for more than {@code ttlSeconds}.
* <p>
* The maximum number of created thread pools is bounded by the provided {@code cap}.
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
* @param ttlSeconds Time-to-live for an idle {@link reactor.core.scheduler.Scheduler.Worker}
*
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads, reuses threads and evict idle ones
*/
default Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) {
return new BoundedElasticScheduler(threadCap, queuedTaskCap, threadFactory, ttlSeconds);
}
Scheduler
that hosts a fixed pool of workers and is suited for parallel work. Params: - parallelism – Number of pooled workers.
- threadFactory – a
ThreadFactory
to use for the fixed initialized number of Thread
Returns: a new Scheduler
that hosts a fixed pool of workers and is suited for parallel work
/**
* {@link Scheduler} that hosts a fixed pool of workers and is suited for parallel
* work.
*
* @param parallelism Number of pooled workers.
* @param threadFactory a {@link ThreadFactory} to use for the fixed initialized
* number of {@link Thread}
*
* @return a new {@link Scheduler} that hosts a fixed pool of workers and is
* suited for parallel work
*/
default Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
return new ParallelScheduler(parallelism, threadFactory);
}
Scheduler
that hosts a single worker and is suited for non-blocking work. Params: - threadFactory – a
ThreadFactory
to use for the unique resource of the Scheduler
Returns: a new Scheduler
that hosts a single worker
/**
* {@link Scheduler} that hosts a single worker and is suited for non-blocking
* work.
*
* @param threadFactory a {@link ThreadFactory} to use for the unique resource of
* the {@link Scheduler}
*
* @return a new {@link Scheduler} that hosts a single worker
*/
default Scheduler newSingle(ThreadFactory threadFactory) {
return new SingleScheduler(threadFactory);
}
}
It is also Disposable
in case you don't want to restore the live Schedulers
/**
* It is also {@link Disposable} in case you don't want to restore the live {@link Schedulers}
*/
public static final class Snapshot implements Disposable {
@Nullable
final CachedScheduler oldElasticScheduler;
@Nullable
final CachedScheduler oldBoundedElasticScheduler;
@Nullable
final CachedScheduler oldParallelScheduler;
@Nullable
final CachedScheduler oldSingleScheduler;
final Factory oldFactory;
private Snapshot(@Nullable CachedScheduler oldElasticScheduler,
@Nullable CachedScheduler oldBoundedElasticScheduler,
@Nullable CachedScheduler oldParallelScheduler,
@Nullable CachedScheduler oldSingleScheduler,
Factory factory) {
this.oldElasticScheduler = oldElasticScheduler;
this.oldBoundedElasticScheduler = oldBoundedElasticScheduler;
this.oldParallelScheduler = oldParallelScheduler;
this.oldSingleScheduler = oldSingleScheduler;
oldFactory = factory;
}
@Override
public boolean isDisposed() {
return
(oldElasticScheduler == null || oldElasticScheduler.isDisposed()) &&
(oldBoundedElasticScheduler == null || oldBoundedElasticScheduler.isDisposed()) &&
(oldParallelScheduler == null || oldParallelScheduler.isDisposed()) &&
(oldSingleScheduler == null || oldSingleScheduler.isDisposed());
}
@Override
public void dispose() {
if (oldElasticScheduler != null) oldElasticScheduler._dispose();
if (oldBoundedElasticScheduler != null) oldBoundedElasticScheduler._dispose();
if (oldParallelScheduler != null) oldParallelScheduler._dispose();
if (oldSingleScheduler != null) oldSingleScheduler._dispose();
}
}
// Internals
static final String ELASTIC = "elastic"; // IO stuff
static final String BOUNDED_ELASTIC = "boundedElastic"; // Blocking stuff with scale to zero
static final String PARALLEL = "parallel"; //scale up common tasks
static final String SINGLE = "single"; //non blocking tasks
static final String IMMEDIATE = "immediate";
static final String FROM_EXECUTOR = "fromExecutor";
static final String FROM_EXECUTOR_SERVICE = "fromExecutorService";
// Cached schedulers in atomic references:
static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_BOUNDED_ELASTIC = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>();
static AtomicReference<CachedScheduler> CACHED_SINGLE = new AtomicReference<>();
static final Supplier<Scheduler> ELASTIC_SUPPLIER =
() -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);
static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);
static final Supplier<Scheduler> PARALLEL_SUPPLIER =
() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);
static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true);
static final Factory DEFAULT = new Factory() { };
static final Map<String, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService>>
DECORATORS = new LinkedHashMap<>();
static volatile Factory factory = DEFAULT;
private static final LinkedHashMap<String, Function<Runnable, Runnable>> onScheduleHooks = new LinkedHashMap<>(1);
@Nullable
private static Function<Runnable, Runnable> onScheduleHook;
Get a CachedScheduler
out of the reference
or create one using the Supplier
if the reference is empty, effectively creating a single instance to be reused as a default scheduler for the given key
category. Params: - reference – the cache reference that holds the scheduler
- key – the "name" for the Scheduler's category/type
- supplier – the
Scheduler
generator to use and wrap into a CachedScheduler
. Note that in case of a race, an extraneous Scheduler can be created, but it'll get immediately disposed
.
Returns: a CachedScheduler
to be reused, either pre-existing or created
/**
* Get a {@link CachedScheduler} out of the {@code reference} or create one using the
* {@link Supplier} if the reference is empty, effectively creating a single instance
* to be reused as a default scheduler for the given {@code key} category.
*
* @param reference the cache reference that holds the scheduler
* @param key the "name" for the Scheduler's category/type
* @param supplier the {@link Scheduler} generator to use and wrap into a {@link CachedScheduler}.
* Note that in case of a race, an extraneous Scheduler can be created, but it'll get
* immediately {@link Scheduler#dispose() disposed}.
* @return a {@link CachedScheduler} to be reused, either pre-existing or created
*/
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
CachedScheduler s = reference.get();
if (s != null) {
return s;
}
s = new CachedScheduler(key, supplier.get());
if (reference.compareAndSet(null, s)) {
return s;
}
//the reference was updated in the meantime with a cached scheduler
//fallback to it and dispose the extraneous one
s._dispose();
return reference.get();
}
static final Logger LOGGER = Loggers.getLogger(Schedulers.class);
static final void defaultUncaughtException(Thread t, Throwable e) {
Schedulers.LOGGER.error("Scheduler worker in group " + t.getThreadGroup().getName()
+ " failed with an uncaught exception", e);
}
static void handleError(Throwable ex) {
Thread thread = Thread.currentThread();
Throwable t = unwrap(ex);
Thread.UncaughtExceptionHandler x = thread.getUncaughtExceptionHandler();
if (x != null) {
x.uncaughtException(thread, t);
}
else {
LOGGER.error("Scheduler worker failed with an uncaught exception", t);
}
BiConsumer<Thread, ? super Throwable> hook = onHandleErrorHook;
if (hook != null) {
hook.accept(thread, t);
}
}
static class CachedScheduler implements Scheduler, Supplier<Scheduler>, Scannable {
final Scheduler cached;
final String stringRepresentation;
CachedScheduler(String key, Scheduler cached) {
this.cached = cached;
this.stringRepresentation = "Schedulers." + key + "()";
}
@Override
public Disposable schedule(Runnable task) {
return cached.schedule(task);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
return cached.schedule(task, delay, unit);
}
@Override
public Disposable schedulePeriodically(Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
return cached.schedulePeriodically(task, initialDelay, period, unit);
}
@Override
public Worker createWorker() {
return cached.createWorker();
}
@Override
public long now(TimeUnit unit) {
return cached.now(unit);
}
@Override
public void start() {
cached.start();
}
@Override
public void dispose() {
}
@Override
public boolean isDisposed() {
return cached.isDisposed();
}
@Override
public String toString() {
return stringRepresentation;
}
@Override
public Object scanUnsafe(Attr key) {
if (Attr.NAME == key) return stringRepresentation;
return Scannable.from(cached).scanUnsafe(key);
}
Get the Scheduler
that is cached and wrapped inside this CachedScheduler
. Returns: the cached Scheduler
/**
* Get the {@link Scheduler} that is cached and wrapped inside this
* {@link CachedScheduler}.
*
* @return the cached Scheduler
*/
@Override
public Scheduler get() {
return cached;
}
void _dispose() {
cached.dispose();
}
}
static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
@Nullable Disposable parent,
long delay,
TimeUnit unit) {
task = onSchedule(task);
SchedulerTask sr = new SchedulerTask(task, parent);
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
return sr;
}
static Disposable directSchedulePeriodically(ScheduledExecutorService exec,
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
task = onSchedule(task);
if (period <= 0L) {
InstantPeriodicWorkerTask isr =
new InstantPeriodicWorkerTask(task, exec);
Future<?> f;
if (initialDelay <= 0L) {
f = exec.submit(isr);
}
else {
f = exec.schedule(isr, initialDelay, unit);
}
isr.setFirst(f);
return isr;
}
else {
PeriodicSchedulerTask sr = new PeriodicSchedulerTask(task);
Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.setFuture(f);
return sr;
}
}
static Disposable workerSchedule(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long delay,
TimeUnit unit) {
task = onSchedule(task);
WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
return sr;
}
static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
task = onSchedule(task);
if (period <= 0L) {
InstantPeriodicWorkerTask isr =
new InstantPeriodicWorkerTask(task, exec, tasks);
if (!tasks.add(isr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f;
if (initialDelay <= 0L) {
f = exec.submit(isr);
}
else {
f = exec.schedule(isr, initialDelay, unit);
}
isr.setFirst(f);
}
catch (RejectedExecutionException ex) {
isr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
catch (IllegalArgumentException | NullPointerException ex) {
isr.dispose();
//IllegalArgumentException are wrapped into RejectedExecutionException and propagated up
throw new RejectedExecutionException(ex);
}
return isr;
}
PeriodicWorkerTask sr = new PeriodicWorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
catch (IllegalArgumentException | NullPointerException ex) {
sr.dispose();
//IllegalArgumentException are wrapped into RejectedExecutionException and propagated up
throw new RejectedExecutionException(ex);
}
return sr;
}
Scan an Executor
or ExecutorService
, recognizing several special implementations. Unwraps some decorating schedulers, recognizes Scannable
schedulers and delegates to their Scannable.scanUnsafe(Attr)
method, introspects ThreadPoolExecutor
instances. If no data can be extracted, defaults to the provided orElse
scanUnsafe
.
Params: - executor – the executor to introspect in a best effort manner.
- key – the key to scan for. CAPACITY and BUFFERED mainly.
Returns: an equivalent of Scannable.scanUnsafe(Attr)
but that can also work on some implementations of Executor
/**
* Scan an {@link Executor} or {@link ExecutorService}, recognizing several special
* implementations. Unwraps some decorating schedulers, recognizes {@link Scannable}
* schedulers and delegates to their {@link Scannable#scanUnsafe(Scannable.Attr)}
* method, introspects {@link ThreadPoolExecutor} instances.
* <p>
* If no data can be extracted, defaults to the provided {@code orElse}
* {@link Scannable#scanUnsafe(Scannable.Attr) scanUnsafe}.
*
* @param executor the executor to introspect in a best effort manner.
* @param key the key to scan for. CAPACITY and BUFFERED mainly.
* @return an equivalent of {@link Scannable#scanUnsafe(Scannable.Attr)} but that can
* also work on some implementations of {@link Executor}
*/
@Nullable
static final Object scanExecutor(Executor executor, Scannable.Attr key) {
if (executor instanceof DelegateServiceScheduler.UnsupportedScheduledExecutorService) {
executor = ((DelegateServiceScheduler.UnsupportedScheduledExecutorService) executor).get();
}
if (executor instanceof Scannable) {
return ((Scannable) executor).scanUnsafe(key);
}
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
if (key == Scannable.Attr.TERMINATED) return service.isTerminated();
if (key == Scannable.Attr.CANCELLED) return service.isShutdown();
}
if (executor instanceof ThreadPoolExecutor) {
final ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) executor;
if (key == Scannable.Attr.CAPACITY) return poolExecutor.getMaximumPoolSize();
if (key == Scannable.Attr.BUFFERED) return ((Long) (poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount())).intValue();
if (key == Scannable.Attr.LARGE_BUFFERED) return poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount();
}
return null;
}
}