/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.scheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Stream;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
Dynamically creates ScheduledExecutorService-based Workers and caches the thread pools, reusing
them once the Workers have been shut down. This scheduler is time-capable (can schedule
with delay / periodically).
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the
appropriate constructor to set a different value.
This scheduler is not restartable (may be later).
Author: Stephane Maldini, Simon Baslé
/**
* Dynamically creates ScheduledExecutorService-based Workers and caches the thread pools, reusing
* them once the Workers have been shut down. This scheduler is time-capable (can schedule
* with delay / periodically).
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the
* appropriate constructor to set a different value.
* <p>
* This scheduler is not restartable (may be later).
*
* @author Stephane Maldini
* @author Simon Baslé
*/
final class ElasticScheduler implements Scheduler, Supplier<ScheduledExecutorService>, Scannable {
static final AtomicLong COUNTER = new AtomicLong();
static final ThreadFactory EVICTOR_FACTORY = r -> {
Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
};
static final CachedService SHUTDOWN = new CachedService(null);
static final int DEFAULT_TTL_SECONDS = 60;
final ThreadFactory factory;
final int ttlSeconds;
final Queue<ScheduledExecutorServiceExpiry> cache;
final Queue<CachedService> all;
final ScheduledExecutorService evictor;
volatile boolean shutdown;
ElasticScheduler(ThreadFactory factory, int ttlSeconds) {
if (ttlSeconds < 0) {
throw new IllegalArgumentException("ttlSeconds must be positive, was: " + ttlSeconds);
}
this.ttlSeconds = ttlSeconds;
this.factory = factory;
this.cache = new ConcurrentLinkedQueue<>();
this.all = new ConcurrentLinkedQueue<>();
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction,
ttlSeconds,
ttlSeconds,
TimeUnit.SECONDS);
}
Instantiates the default ScheduledExecutorService
for the ElasticScheduler (Executors.newScheduledThreadPoolExecutor
with core and max pool size of 1). /**
* Instantiates the default {@link ScheduledExecutorService} for the ElasticScheduler
* ({@code Executors.newScheduledThreadPoolExecutor} with core and max pool size of 1).
*/
@Override
public ScheduledExecutorService get() {
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1, factory);
poolExecutor.setMaximumPoolSize(1);
return poolExecutor;
}
@Override
public void start() {
throw new UnsupportedOperationException("Restarting not supported yet");
}
@Override
public boolean isDisposed() {
return shutdown;
}
@Override
public void dispose() {
if (shutdown) {
return;
}
shutdown = true;
evictor.shutdownNow();
cache.clear();
CachedService cached;
while ((cached = all.poll()) != null) {
cached.exec.shutdownNow();
}
}
CachedService pick() {
if (shutdown) {
return SHUTDOWN;
}
CachedService result;
ScheduledExecutorServiceExpiry e = cache.poll();
if (e != null) {
return e.cached;
}
result = new CachedService(this);
all.offer(result);
if (shutdown) {
all.remove(result);
return SHUTDOWN;
}
return result;
}
@Override
public Disposable schedule(Runnable task) {
CachedService cached = pick();
return Schedulers.directSchedule(cached.exec,
new DirectScheduleTask(task, cached),
0L,
TimeUnit.MILLISECONDS);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
CachedService cached = pick();
return Schedulers.directSchedule(cached.exec,
new DirectScheduleTask(task, cached),
delay,
unit);
}
@Override
public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
CachedService cached = pick();
return Disposables.composite(Schedulers.directSchedulePeriodically(cached.exec,
task,
initialDelay,
period,
unit), cached);
}
@Override
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.ELASTIC)
.append('(');
if (factory instanceof ReactorThreadFactory) {
ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"');
}
ts.append(')');
return ts.toString();
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();
if (key == Attr.CAPACITY) return Integer.MAX_VALUE;
if (key == Attr.BUFFERED) return cache.size(); //BUFFERED: number of workers alive
if (key == Attr.NAME) return this.toString();
return null;
}
@Override
public Stream<? extends Scannable> inners() {
return cache.stream()
.map(cached -> cached.cached);
}
@Override
public Worker createWorker() {
return new ElasticWorker(pick());
}
void eviction() {
long now = System.currentTimeMillis();
List<ScheduledExecutorServiceExpiry> list = new ArrayList<>(cache);
for (ScheduledExecutorServiceExpiry e : list) {
if (e.expireMillis < now) {
if (cache.remove(e)) {
e.cached.exec.shutdownNow();
all.remove(e.cached);
}
}
}
}
static final class CachedService implements Disposable, Scannable {
final ElasticScheduler parent;
final ScheduledExecutorService exec;
CachedService(@Nullable ElasticScheduler parent) {
this.parent = parent;
if (parent != null) {
this.exec =
Schedulers.decorateExecutorService(Schedulers.ELASTIC, parent);
}
else {
this.exec = Executors.newSingleThreadScheduledExecutor();
this.exec.shutdownNow();
}
}
@Override
public void dispose() {
if (exec != null) {
if (this != SHUTDOWN && !parent.shutdown) {
ScheduledExecutorServiceExpiry e = new
ScheduledExecutorServiceExpiry(this,
System.currentTimeMillis() + parent.ttlSeconds * 1000L);
parent.cache.offer(e);
if (parent.shutdown) {
if (parent.cache.remove(e)) {
exec.shutdownNow();
}
}
}
}
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.NAME) return parent.scanUnsafe(key);
if (key == Attr.PARENT) return parent;
if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();
if (key == Attr.CAPACITY) {
//assume 1 if unknown, otherwise use the one from underlying executor
Integer capacity = (Integer) Schedulers.scanExecutor(exec, key);
if (capacity == null || capacity == -1) return 1;
}
return Schedulers.scanExecutor(exec, key);
}
}
static final class DirectScheduleTask implements Runnable {
final Runnable delegate;
final CachedService cached;
DirectScheduleTask(Runnable delegate, CachedService cached) {
this.delegate = delegate;
this.cached = cached;
}
@Override
public void run() {
try {
delegate.run();
}
catch (Throwable ex) {
Schedulers.handleError(ex);
}
finally {
cached.dispose();
}
}
}
static final class ScheduledExecutorServiceExpiry {
final CachedService cached;
final long expireMillis;
ScheduledExecutorServiceExpiry(CachedService cached, long expireMillis) {
this.cached = cached;
this.expireMillis = expireMillis;
}
}
static final class ElasticWorker extends AtomicBoolean implements Worker, Scannable {
final CachedService cached;
final Disposable.Composite tasks;
ElasticWorker(CachedService cached) {
this.cached = cached;
this.tasks = Disposables.composite();
}
@Override
public Disposable schedule(Runnable task) {
return Schedulers.workerSchedule(cached.exec,
tasks,
task,
0L,
TimeUnit.MILLISECONDS);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
return Schedulers.workerSchedule(cached.exec, tasks, task, delay, unit);
}
@Override
public Disposable schedulePeriodically(Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
return Schedulers.workerSchedulePeriodically(cached.exec,
tasks,
task,
initialDelay,
period,
unit);
}
@Override
public void dispose() {
if (compareAndSet(false, true)) {
tasks.dispose();
cached.dispose();
}
}
@Override
public boolean isDisposed() {
return tasks.isDisposed();
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();
if (key == Attr.NAME) return cached.scanUnsafe(key) + ".worker";
if (key == Attr.PARENT) return cached.parent;
return cached.scanUnsafe(key);
}
}
}