package reactor.core.scheduler;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
final class BoundedElasticScheduler implements Scheduler, Scannable {
static final int DEFAULT_TTL_SECONDS = 60;
static final AtomicLong EVICTOR_COUNTER = new AtomicLong();
static final ThreadFactory EVICTOR_FACTORY = r -> {
Thread t = new Thread(r, Schedulers.BOUNDED_ELASTIC + "-evictor-" + EVICTOR_COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
};
static final BoundedServices SHUTDOWN;
static final BoundedState CREATING;
static {
SHUTDOWN = new BoundedServices();
SHUTDOWN.dispose();
ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
s.shutdownNow();
CREATING = new BoundedState(SHUTDOWN, s) {
@Override
public String toString() {
return "CREATING BoundedState";
}
};
CREATING.markCount = -1;
CREATING.idleSinceTimestamp = -1;
}
final int maxThreads;
final int maxTaskQueuedPerThread;
final Clock clock;
final ThreadFactory factory;
final long ttlMillis;
volatile BoundedServices boundedServices;
static final AtomicReferenceFieldUpdater<BoundedElasticScheduler, BoundedServices> BOUNDED_SERVICES =
AtomicReferenceFieldUpdater.newUpdater(BoundedElasticScheduler.class, BoundedServices.class, "boundedServices");
volatile ScheduledExecutorService evictor;
static final AtomicReferenceFieldUpdater<BoundedElasticScheduler, ScheduledExecutorService> EVICTOR =
AtomicReferenceFieldUpdater.newUpdater(BoundedElasticScheduler.class, ScheduledExecutorService.class, "evictor");
BoundedElasticScheduler(int maxThreads, int maxTaskQueuedPerThread,
ThreadFactory threadFactory, long ttlMillis, Clock clock) {
if (ttlMillis <= 0) {
throw new IllegalArgumentException("TTL must be strictly positive, was " + ttlMillis + "ms");
}
if (maxThreads <= 0) {
throw new IllegalArgumentException("maxThreads must be strictly positive, was " + maxThreads);
}
if (maxTaskQueuedPerThread <= 0) {
throw new IllegalArgumentException("maxTaskQueuedPerThread must be strictly positive, was " + maxTaskQueuedPerThread);
}
this.maxThreads = maxThreads;
this.maxTaskQueuedPerThread = maxTaskQueuedPerThread;
this.factory = threadFactory;
this.clock = Objects.requireNonNull(clock, "A Clock must be provided");
this.ttlMillis = ttlMillis;
this.boundedServices = SHUTDOWN;
}
BoundedElasticScheduler(int maxThreads, int maxTaskQueuedPerThread, ThreadFactory factory, int ttlSeconds) {
this(maxThreads, maxTaskQueuedPerThread, factory, ttlSeconds * 1000L,
Clock.tickSeconds(BoundedServices.ZONE_UTC));
}
BoundedScheduledExecutorService createBoundedExecutorService() {
return new BoundedScheduledExecutorService(this.maxTaskQueuedPerThread, this.factory);
}
@Override
public boolean isDisposed() {
return BOUNDED_SERVICES.get(this) == SHUTDOWN;
}
@Override
public void start() {
for (;;) {
BoundedServices services = BOUNDED_SERVICES.get(this);
if (services != SHUTDOWN) {
return;
}
BoundedServices newServices = new BoundedServices(this);
if (BOUNDED_SERVICES.compareAndSet(this, services, newServices)) {
ScheduledExecutorService e = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
if (EVICTOR.compareAndSet(this, null, e)) {
try {
e.scheduleAtFixedRate(newServices::eviction, ttlMillis, ttlMillis, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ree) {
if (!isDisposed()) {
throw ree;
}
}
}
else {
e.shutdownNow();
}
return;
}
}
}
@Override
public void dispose() {
BoundedServices services = BOUNDED_SERVICES.get(this);
if (services != SHUTDOWN && BOUNDED_SERVICES.compareAndSet(this, services, SHUTDOWN)) {
ScheduledExecutorService e = EVICTOR.getAndSet(this, null);
if (e != null) {
e.shutdownNow();
}
services.dispose();
}
}
@Override
public Disposable schedule(Runnable task) {
BoundedState picked = BOUNDED_SERVICES.get(this).pick();
return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
final BoundedState picked = BOUNDED_SERVICES.get(this).pick();
return Schedulers.directSchedule(picked.executor, task, picked, delay, unit);
}
@Override
public Disposable schedulePeriodically(Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
final BoundedState picked = BOUNDED_SERVICES.get(this).pick();
Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor,
task,
initialDelay,
period,
unit);
return Disposables.composite(scheduledTask, picked);
}
@Override
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.BOUNDED_ELASTIC)
.append('(');
if (factory instanceof ReactorThreadFactory) {
ts.append('\"').append(((ReactorThreadFactory) factory).get()).append("\",");
}
ts.append("maxThreads=").append(maxThreads)
.append(",maxTaskQueuedPerThread=").append(maxTaskQueuedPerThread == Integer.MAX_VALUE ? "unbounded" : maxTaskQueuedPerThread)
.append(",ttl=");
if (ttlMillis < 1000) {
ts.append(ttlMillis).append("ms)");
}
else {
ts.append(ttlMillis / 1000).append("s)");
}
return ts.toString();
}
int estimateSize() {
return BOUNDED_SERVICES.get(this).get();
}
int estimateBusy() {
return BOUNDED_SERVICES.get(this).busyQueue.size();
}
int estimateIdle() {
return BOUNDED_SERVICES.get(this).idleQueue.size();
}
int estimateRemainingTaskCapacity() {
Queue<BoundedState> busyQueue = BOUNDED_SERVICES.get(this).busyQueue;
int totalTaskCapacity = maxTaskQueuedPerThread * maxThreads;
for (BoundedState state : busyQueue) {
int stateQueueSize = state.estimateQueueSize();
if (stateQueueSize >= 0) {
totalTaskCapacity -= stateQueueSize;
}
else {
return -1;
}
}
return totalTaskCapacity;
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();
if (key == Attr.BUFFERED) return estimateSize();
if (key == Attr.CAPACITY) return maxThreads;
if (key == Attr.NAME) return this.toString();
return null;
}
@Override
public Stream<? extends Scannable> inners() {
BoundedServices services = BOUNDED_SERVICES.get(this);
return Stream.concat(services.busyQueue.stream(), services.idleQueue.stream())
.filter(obj -> obj != null && obj != CREATING);
}
@Override
public Worker createWorker() {
BoundedState picked = BOUNDED_SERVICES.get(this)
.pick();
ExecutorServiceWorker worker = new ExecutorServiceWorker(picked.executor);
worker.disposables.add(picked);
return worker;
}
static final class BoundedServices extends AtomicInteger implements Disposable {
static final int DISPOSED = -1;
static final ZoneId ZONE_UTC = ZoneId.of("UTC");
final BoundedElasticScheduler parent;
final Clock clock;
final Deque<BoundedState> idleQueue;
final PriorityBlockingQueue<BoundedState> busyQueue;
private BoundedServices() {
this.parent = null;
this.clock = Clock.fixed(Instant.EPOCH, ZONE_UTC);
this.busyQueue = new PriorityBlockingQueue<>();
this.idleQueue = new ConcurrentLinkedDeque<>();
}
BoundedServices(BoundedElasticScheduler parent) {
this.parent = parent;
this.clock = parent.clock;
this.busyQueue = new PriorityBlockingQueue<>(parent.maxThreads,
Comparator.comparingInt(bs -> bs.markCount));
this.idleQueue = new ConcurrentLinkedDeque<>();
}
void eviction() {
final long evictionTimestamp = parent.clock.millis();
List<BoundedState> idleCandidates = new ArrayList<>(idleQueue);
for (BoundedState candidate : idleCandidates) {
if (candidate.tryEvict(evictionTimestamp, parent.ttlMillis)) {
idleQueue.remove(candidate);
decrementAndGet();
}
}
}
BoundedState pick() {
for (;;) {
int a = get();
if (a == DISPOSED) {
return CREATING;
}
if (!idleQueue.isEmpty()) {
BoundedState bs = idleQueue.pollLast();
if (bs != null && bs.markPicked()) {
busyQueue.add(bs);
return bs;
}
}
else if (a < parent.maxThreads) {
if (compareAndSet(a, a + 1)) {
ScheduledExecutorService s = Schedulers.decorateExecutorService(parent, parent.createBoundedExecutorService());
BoundedState newState = new BoundedState(this, s);
if (newState.markPicked()) {
busyQueue.add(newState);
return newState;
}
}
}
else {
BoundedState s = busyQueue.poll();
if (s != null && s.markPicked()) {
busyQueue.add(s);
return s;
}
}
}
}
void setIdle(BoundedState boundedState) {
if (this.busyQueue.remove(boundedState)) {
this.idleQueue.add(boundedState);
}
}
@Override
public boolean isDisposed() {
return get() == DISPOSED;
}
@Override
public void dispose() {
set(DISPOSED);
idleQueue.forEach(BoundedState::shutdown);
busyQueue.forEach(BoundedState::shutdown);
}
}
static class BoundedState implements Disposable, Scannable {
static final int EVICTED = -1;
final BoundedServices parent;
final ScheduledExecutorService executor;
long idleSinceTimestamp = -1L;
volatile int markCount;
static final AtomicIntegerFieldUpdater<BoundedState> MARK_COUNT = AtomicIntegerFieldUpdater.newUpdater(BoundedState.class, "markCount");
BoundedState(BoundedServices parent, ScheduledExecutorService executor) {
this.parent = parent;
this.executor = executor;
}
int estimateQueueSize() {
if (executor instanceof ScheduledThreadPoolExecutor) {
return ((ScheduledThreadPoolExecutor) executor).getQueue().size();
}
return -1;
}
boolean markPicked() {
for(;;) {
int i = MARK_COUNT.get(this);
if (i == EVICTED) {
return false;
}
if (MARK_COUNT.compareAndSet(this, i, i + 1)) {
return true;
}
}
}
boolean tryEvict(long evictionTimestamp, long ttlMillis) {
long idleSince = this.idleSinceTimestamp;
if (idleSince < 0) return false;
long elapsed = evictionTimestamp - idleSince;
if (elapsed >= ttlMillis) {
if (MARK_COUNT.compareAndSet(this, 0, EVICTED)) {
executor.shutdownNow();
return true;
}
}
return false;
}
void release() {
int picked = MARK_COUNT.decrementAndGet(this);
if (picked < 0) {
return;
}
if (picked == 0) {
this.idleSinceTimestamp = parent.clock.millis();
parent.setIdle(this);
}
else {
this.idleSinceTimestamp = -1L;
}
}
void shutdown() {
this.idleSinceTimestamp = -1L;
MARK_COUNT.set(this, EVICTED);
this.executor.shutdownNow();
}
@Override
public void dispose() {
this.release();
}
@Override
public boolean isDisposed() {
return MARK_COUNT.get(this) <= 0;
}
@Override
public Object scanUnsafe(Attr key) {
return Schedulers.scanExecutor(executor, key);
}
@Override
public String toString() {
return "BoundedState@" + System.identityHashCode(this) + "{" + " backing=" +MARK_COUNT.get(this) + ", idleSince=" + idleSinceTimestamp + ", executor=" + executor + '}';
}
}
static final class BoundedScheduledExecutorService extends ScheduledThreadPoolExecutor
implements Scannable {
final int queueCapacity;
BoundedScheduledExecutorService(int queueCapacity, ThreadFactory factory) {
super(1, factory);
setMaximumPoolSize(1);
setRemoveOnCancelPolicy(true);
if (queueCapacity < 1) {
throw new IllegalArgumentException(
"was expecting a non-zero positive queue capacity");
}
this.queueCapacity = queueCapacity;
}
@Override
public Object scanUnsafe(Attr key) {
if (Attr.TERMINATED == key) return isTerminated();
if (Attr.BUFFERED == key) return getQueue().size();
if (Attr.CAPACITY == key) return this.queueCapacity;
return null;
}
@Override
public String toString() {
int queued = getQueue().size();
long completed = getCompletedTaskCount();
String state = getActiveCount() > 0 ? "ACTIVE" : "IDLE";
if (this.queueCapacity == Integer.MAX_VALUE) {
return "BoundedScheduledExecutorService{" + state + ", queued=" + queued + "/unbounded, completed=" + completed + '}';
}
return "BoundedScheduledExecutorService{" + state + ", queued=" + queued + "/" + queueCapacity + ", completed=" + completed + '}';
}
private void ensureQueueCapacity(int taskCount) {
if (queueCapacity == Integer.MAX_VALUE) {
return;
}
int queueSize = super.getQueue().size();
if ((queueSize + taskCount) > queueCapacity) {
throw Exceptions.failWithRejected("Task capacity of bounded elastic scheduler reached while scheduling " + taskCount + " tasks (" + (queueSize + taskCount) + "/" + queueCapacity + ")");
}
}
@Override
public synchronized ScheduledFuture<?> schedule(
Runnable command,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return super.schedule(command, delay, unit);
}
@Override
public synchronized <V> ScheduledFuture<V> schedule(
Callable<V> callable,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return super.schedule(callable, delay, unit);
}
@Override
public synchronized ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
ensureQueueCapacity(1);
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
ensureQueueCapacity(1);
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public void shutdown() {
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
@Override
public boolean isShutdown() {
return super.isShutdown();
}
@Override
public boolean isTerminated() {
return super.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return super.awaitTermination(timeout, unit);
}
@Override
public synchronized <T> Future<T> submit(Callable<T> task) {
ensureQueueCapacity(1);
return super.submit(task);
}
@Override
public synchronized <T> Future<T> submit(Runnable task, T result) {
ensureQueueCapacity(1);
return super.submit(task, result);
}
@Override
public synchronized Future<?> submit(Runnable task) {
ensureQueueCapacity(1);
return super.submit(task);
}
@Override
public synchronized <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks)
throws InterruptedException {
ensureQueueCapacity(tasks.size());
return super.invokeAll(tasks);
}
@Override
public synchronized <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException {
ensureQueueCapacity(tasks.size());
return super.invokeAll(tasks, timeout, unit);
}
@Override
public synchronized <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
ensureQueueCapacity(tasks.size());
return super.invokeAny(tasks);
}
@Override
public synchronized <T> T invokeAny(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
ensureQueueCapacity(tasks.size());
return super.invokeAny(tasks, timeout, unit);
}
@Override
public synchronized void execute(Runnable command) {
ensureQueueCapacity(1);
super.submit(command);
}
}
}