package reactor.core.scheduler;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.Scannable;
final class SingleScheduler implements Scheduler, Supplier<ScheduledExecutorService>,
Scannable {
static final AtomicLong COUNTER = new AtomicLong();
final ThreadFactory factory;
volatile ScheduledExecutorService executor;
static final AtomicReferenceFieldUpdater<SingleScheduler, ScheduledExecutorService> EXECUTORS =
AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class,
ScheduledExecutorService.class,
"executor");
static final ScheduledExecutorService TERMINATED;
static {
TERMINATED = Executors.newSingleThreadScheduledExecutor();
TERMINATED.shutdownNow();
}
SingleScheduler(ThreadFactory factory) {
this.factory = factory;
}
@Override
public ScheduledExecutorService get() {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, this.factory);
e.setRemoveOnCancelPolicy(true);
e.setMaximumPoolSize(1);
return e;
}
@Override
public boolean isDisposed() {
return executor == TERMINATED;
}
@Override
public void start() {
ScheduledExecutorService b = null;
for (; ; ) {
ScheduledExecutorService a = executor;
if (a != TERMINATED && a != null) {
if (b != null) {
b.shutdownNow();
}
return;
}
if (b == null) {
b = Schedulers.decorateExecutorService(this, this.get());
}
if (EXECUTORS.compareAndSet(this, a, b)) {
return;
}
}
}
@Override
public void dispose() {
ScheduledExecutorService a = executor;
if (a != TERMINATED) {
a = EXECUTORS.getAndSet(this, TERMINATED);
if (a != TERMINATED && a != null) {
a.shutdownNow();
}
}
}
@Override
public Disposable schedule(Runnable task) {
return Schedulers.directSchedule(executor, task, null, 0L, TimeUnit.MILLISECONDS);
}
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
return Schedulers.directSchedule(executor, task, null, delay, unit);
}
@Override
public Disposable schedulePeriodically(Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
return Schedulers.directSchedulePeriodically(executor,
task,
initialDelay,
period,
unit);
}
@Override
public String toString() {
StringBuilder ts = new StringBuilder(Schedulers.SINGLE)
.append('(');
if (factory instanceof ReactorThreadFactory) {
ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"');
}
return ts.append(')').toString();
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed();
if (key == Attr.NAME) return this.toString();
if (key == Attr.CAPACITY || key == Attr.BUFFERED) return 1;
return Schedulers.scanExecutor(executor, key);
}
@Override
public Worker createWorker() {
return new ExecutorServiceWorker(executor);
}
}