/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.scheduler;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;

import reactor.core.Disposable;
import reactor.core.Scannable;

Scheduler that works with a single-threaded ScheduledExecutorService and is suited for same-thread work (like an event dispatch thread). This scheduler is time-capable (can schedule with delay / periodically).
/** * Scheduler that works with a single-threaded ScheduledExecutorService and is suited for * same-thread work (like an event dispatch thread). This scheduler is time-capable (can * schedule with delay / periodically). */
final class SingleScheduler implements Scheduler, Supplier<ScheduledExecutorService>, Scannable { static final AtomicLong COUNTER = new AtomicLong(); final ThreadFactory factory; volatile ScheduledExecutorService executor; static final AtomicReferenceFieldUpdater<SingleScheduler, ScheduledExecutorService> EXECUTORS = AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class, ScheduledExecutorService.class, "executor"); static final ScheduledExecutorService TERMINATED; static { TERMINATED = Executors.newSingleThreadScheduledExecutor(); TERMINATED.shutdownNow(); } SingleScheduler(ThreadFactory factory) { this.factory = factory; }
Instantiates the default ScheduledExecutorService for the SingleScheduler (Executors.newScheduledThreadPoolExecutor with core and max pool size of 1).
/** * Instantiates the default {@link ScheduledExecutorService} for the SingleScheduler * ({@code Executors.newScheduledThreadPoolExecutor} with core and max pool size of 1). */
@Override public ScheduledExecutorService get() { ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, this.factory); e.setRemoveOnCancelPolicy(true); e.setMaximumPoolSize(1); return e; } @Override public boolean isDisposed() { return executor == TERMINATED; } @Override public void start() { //TODO SingleTimedScheduler didn't implement start, check if any particular reason? ScheduledExecutorService b = null; for (; ; ) { ScheduledExecutorService a = executor; if (a != TERMINATED && a != null) { if (b != null) { b.shutdownNow(); } return; } if (b == null) { b = Schedulers.decorateExecutorService(this, this.get()); } if (EXECUTORS.compareAndSet(this, a, b)) { return; } } } @Override public void dispose() { ScheduledExecutorService a = executor; if (a != TERMINATED) { a = EXECUTORS.getAndSet(this, TERMINATED); if (a != TERMINATED && a != null) { a.shutdownNow(); } } } @Override public Disposable schedule(Runnable task) { return Schedulers.directSchedule(executor, task, null, 0L, TimeUnit.MILLISECONDS); } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { return Schedulers.directSchedule(executor, task, null, delay, unit); } @Override public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { return Schedulers.directSchedulePeriodically(executor, task, initialDelay, period, unit); } @Override public String toString() { StringBuilder ts = new StringBuilder(Schedulers.SINGLE) .append('('); if (factory instanceof ReactorThreadFactory) { ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"'); } return ts.append(')').toString(); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); if (key == Attr.NAME) return this.toString(); if (key == Attr.CAPACITY || key == Attr.BUFFERED) return 1; //BUFFERED: number of workers doesn't vary return Schedulers.scanExecutor(executor, key); } @Override public Worker createWorker() { return new ExecutorServiceWorker(executor); } }