/*
 * Copyright 2015 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Abstract base class for EventExecutors that want to support scheduling.
/** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR = new Comparator<ScheduledFutureTask<?>>() { @Override public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) { return o1.compareTo(o2); } }; PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; protected AbstractScheduledEventExecutor() { } protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } protected static long nanoTime() { return ScheduledFutureTask.nanoTime(); } PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>( SCHEDULED_FUTURE_TASK_COMPARATOR, // Use same initial capacity as java.util.PriorityQueue 11); } return scheduledTaskQueue; } private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) { return queue == null || queue.isEmpty(); }
Cancel all scheduled tasks. This method MUST be called only when AbstractEventExecutor.inEventLoop() is true.
/** * Cancel all scheduled tasks. * * This method MUST be called only when {@link #inEventLoop()} is {@code true}. */
protected void cancelScheduledTasks() { assert inEventLoop(); PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (isNullOrEmpty(scheduledTaskQueue)) { return; } final ScheduledFutureTask<?>[] scheduledTasks = scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]); for (ScheduledFutureTask<?> task: scheduledTasks) { task.cancelWithoutRemove(false); } scheduledTaskQueue.clearIgnoringIndexes(); }
See Also:
  • pollScheduledTask(long)
/** * @see #pollScheduledTask(long) */
protected final Runnable pollScheduledTask() { return pollScheduledTask(nanoTime()); }
Return the Runnable which is ready to be executed with the given nanoTime. You should use nanoTime() to retrieve the correct nanoTime.
/** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. */
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null; }
Return the nanoseconds when the next scheduled task is ready to be run or -1 if no task is scheduled.
/** * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. */
protected final long nextScheduledTaskNano() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return -1; } return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); } final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (scheduledTaskQueue == null) { return null; } return scheduledTaskQueue.peek(); }
Returns true if a scheduled task is ready for processing.
/** * Returns {@code true} if a scheduled task is ready for processing. */
protected final boolean hasScheduledTasks() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled(delay, unit); return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(callable, "callable"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled(delay, unit); return schedule(new ScheduledFutureTask<V>( this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (period <= 0) { throw new IllegalArgumentException( String.format("period: %d (expected: > 0)", period)); } validateScheduled(initialDelay, unit); validateScheduled(period, unit); return schedule(new ScheduledFutureTask<Void>( this, Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (delay <= 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: > 0)", delay)); } validateScheduled(initialDelay, unit); validateScheduled(delay, unit); return schedule(new ScheduledFutureTask<Void>( this, Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); }
Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
/** * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task. */
protected void validateScheduled(long amount, TimeUnit unit) { // NOOP } <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; } final void removeScheduled(final ScheduledFutureTask<?> task) { if (inEventLoop()) { scheduledTaskQueue().removeTyped(task); } else { execute(new Runnable() { @Override public void run() { removeScheduled(task); } }); } } }