package org.ehcache.impl.internal.executor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.ehcache.impl.internal.executor.OutOfBandScheduledExecutor.OutOfBandRsf;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.ehcache.impl.internal.executor.ExecutorUtil.waitFor;
class PartitionedScheduledExecutor extends AbstractExecutorService implements ScheduledExecutorService {
private final OutOfBandScheduledExecutor scheduler;
private final ExecutorService worker;
private volatile boolean shutdown;
private volatile Future<List<Runnable>> termination;
PartitionedScheduledExecutor(OutOfBandScheduledExecutor scheduler, ExecutorService worker) {
this.scheduler = scheduler;
this.worker = worker;
}
@Override
public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
if (shutdown) {
throw new RejectedExecutionException();
} else {
ScheduledFuture<?> scheduled = scheduler.schedule(worker, command, delay, unit);
if (shutdown && scheduled.cancel(false)) {
throw new RejectedExecutionException();
} else {
return scheduled;
}
}
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (shutdown) {
throw new RejectedExecutionException();
} else {
ScheduledFuture<V> scheduled = scheduler.schedule(worker, callable, delay, unit);
if (shutdown && scheduled.cancel(false)) {
throw new RejectedExecutionException();
} else {
return scheduled;
}
}
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (shutdown) {
throw new RejectedExecutionException();
} else {
ScheduledFuture<?> scheduled = scheduler.scheduleAtFixedRate(worker, command, initialDelay, period, unit);
if (shutdown && scheduled.cancel(false)) {
throw new RejectedExecutionException();
} else {
return scheduled;
}
}
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (shutdown) {
throw new RejectedExecutionException();
} else {
ScheduledFuture<?> scheduled = scheduler.scheduleWithFixedDelay(worker, command, initialDelay, delay, unit);
if (shutdown && scheduled.cancel(false)) {
throw new RejectedExecutionException();
} else {
return scheduled;
}
}
}
@Override
public void shutdown() {
shutdown = true;
try {
final Long longestDelay = waitFor(scheduler.schedule(null, this::getMaxDelay, 0, NANOSECONDS));
termination = scheduler.schedule(worker, () -> {
worker.shutdown();
return emptyList();
}, longestDelay + 1, NANOSECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
private long getMaxDelay() {
long maxDelay = 0;
for (Iterator<Runnable> it = scheduler.getQueue().iterator(); it.hasNext(); ) {
Runnable job = it.next();
if (job instanceof OutOfBandRsf) {
OutOfBandRsf<?> oobJob = (OutOfBandRsf<?>) job;
if (oobJob.getExecutor() == worker) {
if (oobJob.isPeriodic()) {
oobJob.cancel(false);
it.remove();
} else {
maxDelay = Math.max(maxDelay, oobJob.getDelay(NANOSECONDS));
}
}
}
}
return maxDelay;
}
@Override
public List<Runnable> shutdownNow() {
shutdown = true;
try {
termination = scheduler.schedule(null, this::abortTasks, 0L, NANOSECONDS);
return waitFor(termination);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
private List<Runnable> abortTasks() {
List<Runnable> abortedTasks = new ArrayList<>();
for (Iterator<Runnable> it = scheduler.getQueue().iterator(); it.hasNext(); ) {
Runnable job = it.next();
if (job instanceof OutOfBandRsf) {
OutOfBandRsf<?> oobJob = (OutOfBandRsf<?>) job;
if (oobJob.getExecutor() == worker) {
abortedTasks.add(job);
it.remove();
}
}
}
abortedTasks.addAll(worker.shutdownNow());
return abortedTasks;
}
@Override
public boolean isShutdown() {
return shutdown;
}
@Override
public boolean isTerminated() {
if (isShutdown()) {
return termination.isDone() && worker.isTerminated();
} else {
return false;
}
}
@Override
public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
if (isShutdown()) {
if (termination.isDone()) {
return worker.awaitTermination(time, unit);
} else {
long end = System.nanoTime() + unit.toNanos(time);
try {
termination.get(time, unit);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (TimeoutException e) {
return false;
}
return worker.awaitTermination(end - System.nanoTime(), NANOSECONDS);
}
} else {
return false;
}
}
@Override
public void execute(Runnable runnable) {
schedule(runnable, 0, NANOSECONDS);
}
}