/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces. The Iterator provided in method iterator() is not guaranteed to traverse the elements of the DelayQueue in any particular order.

This class is a member of the Java Collections Framework.

Author:Doug Lea
Type parameters:
  • <E> – the type of elements held in this queue
Since:1.5
/** * An unbounded {@linkplain BlockingQueue blocking queue} of * {@code Delayed} elements, in which an element can only be taken * when its delay has expired. The <em>head</em> of the queue is that * {@code Delayed} element whose delay expired furthest in the * past. If no delay has expired there is no head and {@code poll} * will return {@code null}. Expiration occurs when an element's * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less * than or equal to zero. Even though unexpired elements cannot be * removed using {@code take} or {@code poll}, they are otherwise * treated as normal elements. For example, the {@code size} method * returns the count of both expired and unexpired elements. * This queue does not permit null elements. * * <p>This class and its iterator implement all of the <em>optional</em> * methods of the {@link Collection} and {@link Iterator} interfaces. * The Iterator provided in method {@link #iterator()} is <em>not</em> * guaranteed to traverse the elements of the DelayQueue in any * particular order. * * <p>This class is a member of the * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this queue */
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>();
Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting.
/** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */
private Thread leader;
Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.
/** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */
private final Condition available = lock.newCondition();
Creates a new DelayQueue that is initially empty.
/** * Creates a new {@code DelayQueue} that is initially empty. */
public DelayQueue() {}
Creates a DelayQueue initially containing the elements of the given collection of Delayed instances.
Params:
  • c – the collection of elements to initially contain
Throws:
/** * Creates a {@code DelayQueue} initially containing the elements of the * given collection of {@link Delayed} instances. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */
public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
Inserts the specified element into this delay queue.
Params:
  • e – the element to add
Throws:
Returns:true (as specified by Collection.add)
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */
public boolean add(E e) { return offer(e); }
Inserts the specified element into this delay queue.
Params:
  • e – the element to add
Throws:
Returns:true
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
Inserts the specified element into this delay queue. As the queue is unbounded this method will never block.
Params:
  • e – the element to add
Throws:
/** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */
public void put(E e) { offer(e); }
Inserts the specified element into this delay queue. As the queue is unbounded this method will never block.
Params:
  • e – the element to add
  • timeout – This parameter is ignored as the method never blocks
  • unit – This parameter is ignored as the method never blocks
Throws:
Returns:true
/** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return {@code true} * @throws NullPointerException {@inheritDoc} */
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }
Retrieves and removes the head of this queue, or returns null if this queue has no elements with an expired delay.
Returns:the head of this queue, or null if this queue has no elements with an expired delay
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : q.poll(); } finally { lock.unlock(); } }
Retrieves and removes the head of this queue, waiting if necessary until an element with an expired delay is available on this queue.
Throws:
Returns:the head of this queue
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
Retrieves and removes the head of this queue, waiting if necessary until an element with an expired delay is available on this queue, or the specified wait time expires.
Throws:
Returns:the head of this queue, or null if the specified waiting time elapses before an element with an expired delay becomes available
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0L) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); if (nanos <= 0L) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. Unlike poll, if no expired elements are available in the queue, this method returns the element that will expire next, if one exists.
Returns:the head of this queue, or null if this queue is empty
/** * Retrieves, but does not remove, the head of this queue, or * returns {@code null} if this queue is empty. Unlike * {@code poll}, if no expired elements are available in the queue, * this method returns the element that will expire next, * if one exists. * * @return the head of this queue, or {@code null} if this * queue is empty */
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
Throws:
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); }
Throws:
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */
public int drainTo(Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (E first; n < maxElements && (first = q.peek()) != null && first.getDelay(NANOSECONDS) <= 0;) { c.add(first); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } }
Atomically removes all of the elements from this delay queue. The queue will be empty after this call returns. Elements with an unexpired delay are not waited for; they are simply discarded from the queue.
/** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. * Elements with an unexpired delay are not waited for; they are * simply discarded from the queue. */
public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } }
Always returns Integer.MAX_VALUE because a DelayQueue is not capacity constrained.
Returns:Integer.MAX_VALUE
/** * Always returns {@code Integer.MAX_VALUE} because * a {@code DelayQueue} is not capacity constrained. * * @return {@code Integer.MAX_VALUE} */
public int remainingCapacity() { return Integer.MAX_VALUE; }
Returns an array containing all of the elements in this queue. The returned array elements are in no particular order.

The returned array will be "safe" in that no references to it are maintained by this queue. (In other words, this method must allocate a new array). The caller is thus free to modify the returned array.

This method acts as bridge between array-based and collection-based APIs.

Returns:an array containing all of the elements in this queue
/** * Returns an array containing all of the elements in this queue. * The returned array elements are in no particular order. * * <p>The returned array will be "safe" in that no references to it are * maintained by this queue. (In other words, this method must allocate * a new array). The caller is thus free to modify the returned array. * * <p>This method acts as bridge between array-based and collection-based * APIs. * * @return an array containing all of the elements in this queue */
public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } }
Returns an array containing all of the elements in this queue; the runtime type of the returned array is that of the specified array. The returned array elements are in no particular order. If the queue fits in the specified array, it is returned therein. Otherwise, a new array is allocated with the runtime type of the specified array and the size of this queue.

If this queue fits in the specified array with room to spare (i.e., the array has more elements than this queue), the element in the array immediately following the end of the queue is set to null.

Like the toArray() method, this method acts as bridge between array-based and collection-based APIs. Further, this method allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.

The following code can be used to dump a delay queue into a newly allocated array of Delayed:

 Delayed[] a = q.toArray(new Delayed[0]);
Note that toArray(new Object[0]) is identical in function to toArray().
Params:
  • a – the array into which the elements of the queue are to be stored, if it is big enough; otherwise, a new array of the same runtime type is allocated for this purpose
Throws:
Returns:an array containing all of the elements in this queue
/** * Returns an array containing all of the elements in this queue; the * runtime type of the returned array is that of the specified array. * The returned array elements are in no particular order. * If the queue fits in the specified array, it is returned therein. * Otherwise, a new array is allocated with the runtime type of the * specified array and the size of this queue. * * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * * <p>The following code can be used to dump a delay queue into a newly * allocated array of {@code Delayed}: * * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre> * * Note that {@code toArray(new Object[0])} is identical in function to * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */
public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } }
Removes a single instance of the specified element from this queue, if it is present, whether or not it has expired.
/** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. */
public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } }
Identity-based version for use in Itr.remove.
/** * Identity-based version for use in Itr.remove. */
void removeEQ(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { for (Iterator<E> it = q.iterator(); it.hasNext(); ) { if (o == it.next()) { it.remove(); break; } } } finally { lock.unlock(); } }
Returns an iterator over all the elements (both expired and unexpired) in this queue. The iterator does not return the elements in any particular order.

The returned iterator is weakly consistent.

Returns:an iterator over the elements in this queue
/** * Returns an iterator over all the elements (both expired and * unexpired) in this queue. The iterator does not return the * elements in any particular order. * * <p>The returned iterator is * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. * * @return an iterator over the elements in this queue */
public Iterator<E> iterator() { return new Itr(toArray()); }
Snapshot iterator that works off copy of underlying q array.
/** * Snapshot iterator that works off copy of underlying q array. */
private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); return (E)array[lastRet = cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } } }