/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Stream;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

Dynamically creates ScheduledExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down. This scheduler is time-capable (can schedule with delay / periodically).

The maximum number of created thread pools is unbounded.

The default time-to-live for unused thread pools is 60 seconds, use the appropriate constructor to set a different value.

This scheduler is not restartable (may be later).

Author:Stephane Maldini, Simon Baslé
/** * Dynamically creates ScheduledExecutorService-based Workers and caches the thread pools, reusing * them once the Workers have been shut down. This scheduler is time-capable (can schedule * with delay / periodically). * <p> * The maximum number of created thread pools is unbounded. * <p> * The default time-to-live for unused thread pools is 60 seconds, use the * appropriate constructor to set a different value. * <p> * This scheduler is not restartable (may be later). * * @author Stephane Maldini * @author Simon Baslé */
final class ElasticScheduler implements Scheduler, Supplier<ScheduledExecutorService>, Scannable { static final AtomicLong COUNTER = new AtomicLong(); static final ThreadFactory EVICTOR_FACTORY = r -> { Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet()); t.setDaemon(true); return t; }; static final CachedService SHUTDOWN = new CachedService(null); static final int DEFAULT_TTL_SECONDS = 60; final ThreadFactory factory; final int ttlSeconds; final Queue<ScheduledExecutorServiceExpiry> cache; final Queue<CachedService> all; final ScheduledExecutorService evictor; volatile boolean shutdown; ElasticScheduler(ThreadFactory factory, int ttlSeconds) { if (ttlSeconds < 0) { throw new IllegalArgumentException("ttlSeconds must be positive, was: " + ttlSeconds); } this.ttlSeconds = ttlSeconds; this.factory = factory; this.cache = new ConcurrentLinkedQueue<>(); this.all = new ConcurrentLinkedQueue<>(); this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY); this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS); }
Instantiates the default ScheduledExecutorService for the ElasticScheduler (Executors.newScheduledThreadPoolExecutor with core and max pool size of 1).
/** * Instantiates the default {@link ScheduledExecutorService} for the ElasticScheduler * ({@code Executors.newScheduledThreadPoolExecutor} with core and max pool size of 1). */
@Override public ScheduledExecutorService get() { ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1, factory); poolExecutor.setMaximumPoolSize(1); return poolExecutor; } @Override public void start() { throw new UnsupportedOperationException("Restarting not supported yet"); } @Override public boolean isDisposed() { return shutdown; } @Override public void dispose() { if (shutdown) { return; } shutdown = true; evictor.shutdownNow(); cache.clear(); CachedService cached; while ((cached = all.poll()) != null) { cached.exec.shutdownNow(); } } CachedService pick() { if (shutdown) { return SHUTDOWN; } CachedService result; ScheduledExecutorServiceExpiry e = cache.poll(); if (e != null) { return e.cached; } result = new CachedService(this); all.offer(result); if (shutdown) { all.remove(result); return SHUTDOWN; } return result; } @Override public Disposable schedule(Runnable task) { CachedService cached = pick(); return Schedulers.directSchedule(cached.exec, new DirectScheduleTask(task, cached), 0L, TimeUnit.MILLISECONDS); } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { CachedService cached = pick(); return Schedulers.directSchedule(cached.exec, new DirectScheduleTask(task, cached), delay, unit); } @Override public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { CachedService cached = pick(); return Disposables.composite(Schedulers.directSchedulePeriodically(cached.exec, task, initialDelay, period, unit), cached); } @Override public String toString() { StringBuilder ts = new StringBuilder(Schedulers.ELASTIC) .append('('); if (factory instanceof ReactorThreadFactory) { ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"'); } ts.append(')'); return ts.toString(); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.CAPACITY) return Integer.MAX_VALUE; if (key == Attr.BUFFERED) return cache.size(); //BUFFERED: number of workers alive if (key == Attr.NAME) return this.toString(); return null; } @Override public Stream<? extends Scannable> inners() { return cache.stream() .map(cached -> cached.cached); } @Override public Worker createWorker() { return new ElasticWorker(pick()); } void eviction() { long now = System.currentTimeMillis(); List<ScheduledExecutorServiceExpiry> list = new ArrayList<>(cache); for (ScheduledExecutorServiceExpiry e : list) { if (e.expireMillis < now) { if (cache.remove(e)) { e.cached.exec.shutdownNow(); all.remove(e.cached); } } } } static final class CachedService implements Disposable, Scannable { final ElasticScheduler parent; final ScheduledExecutorService exec; CachedService(@Nullable ElasticScheduler parent) { this.parent = parent; if (parent != null) { this.exec = Schedulers.decorateExecutorService(Schedulers.ELASTIC, parent); } else { this.exec = Executors.newSingleThreadScheduledExecutor(); this.exec.shutdownNow(); } } @Override public void dispose() { if (exec != null) { if (this != SHUTDOWN && !parent.shutdown) { ScheduledExecutorServiceExpiry e = new ScheduledExecutorServiceExpiry(this, System.currentTimeMillis() + parent.ttlSeconds * 1000L); parent.cache.offer(e); if (parent.shutdown) { if (parent.cache.remove(e)) { exec.shutdownNow(); } } } } } @Override public Object scanUnsafe(Attr key) { if (key == Attr.NAME) return parent.scanUnsafe(key); if (key == Attr.PARENT) return parent; if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.CAPACITY) { //assume 1 if unknown, otherwise use the one from underlying executor Integer capacity = (Integer) Schedulers.scanExecutor(exec, key); if (capacity == null || capacity == -1) return 1; } return Schedulers.scanExecutor(exec, key); } } static final class DirectScheduleTask implements Runnable { final Runnable delegate; final CachedService cached; DirectScheduleTask(Runnable delegate, CachedService cached) { this.delegate = delegate; this.cached = cached; } @Override public void run() { try { delegate.run(); } catch (Throwable ex) { Schedulers.handleError(ex); } finally { cached.dispose(); } } } static final class ScheduledExecutorServiceExpiry { final CachedService cached; final long expireMillis; ScheduledExecutorServiceExpiry(CachedService cached, long expireMillis) { this.cached = cached; this.expireMillis = expireMillis; } } static final class ElasticWorker extends AtomicBoolean implements Worker, Scannable { final CachedService cached; final Disposable.Composite tasks; ElasticWorker(CachedService cached) { this.cached = cached; this.tasks = Disposables.composite(); } @Override public Disposable schedule(Runnable task) { return Schedulers.workerSchedule(cached.exec, tasks, task, 0L, TimeUnit.MILLISECONDS); } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { return Schedulers.workerSchedule(cached.exec, tasks, task, delay, unit); } @Override public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { return Schedulers.workerSchedulePeriodically(cached.exec, tasks, task, initialDelay, period, unit); } @Override public void dispose() { if (compareAndSet(false, true)) { tasks.dispose(); cached.dispose(); } } @Override public boolean isDisposed() { return tasks.isDisposed(); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.NAME) return cached.scanUnsafe(key) + ".worker"; if (key == Attr.PARENT) return cached.parent; return cached.scanUnsafe(key); } } }