/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.collect;
import com.google.common.annotations.Beta;
import com.google.common.annotations.GwtCompatible;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
Static utility methods pertaining to Queue
and Deque
instances. Also see this class's counterparts Lists
, Sets
, and Maps
. Author: Kurt Alfred Kluever Since: 11.0
/**
* Static utility methods pertaining to {@link Queue} and {@link Deque} instances. Also see this
* class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}.
*
* @author Kurt Alfred Kluever
* @since 11.0
*/
@GwtCompatible(emulated = true)
public final class Queues {
private Queues() {}
// ArrayBlockingQueue
Creates an empty ArrayBlockingQueue
with the given (fixed) capacity and nonfair access policy. /**
* Creates an empty {@code ArrayBlockingQueue} with the given (fixed) capacity and nonfair access
* policy.
*/
@GwtIncompatible // ArrayBlockingQueue
public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
return new ArrayBlockingQueue<E>(capacity);
}
// ArrayDeque
Creates an empty ArrayDeque
. Since: 12.0
/**
* Creates an empty {@code ArrayDeque}.
*
* @since 12.0
*/
public static <E> ArrayDeque<E> newArrayDeque() {
return new ArrayDeque<E>();
}
Creates an ArrayDeque
containing the elements of the specified iterable, in the order they are returned by the iterable's iterator. Since: 12.0
/**
* Creates an {@code ArrayDeque} containing the elements of the specified iterable, in the order
* they are returned by the iterable's iterator.
*
* @since 12.0
*/
public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new ArrayDeque<E>(Collections2.cast(elements));
}
ArrayDeque<E> deque = new ArrayDeque<E>();
Iterables.addAll(deque, elements);
return deque;
}
// ConcurrentLinkedQueue
Creates an empty ConcurrentLinkedQueue
. /** Creates an empty {@code ConcurrentLinkedQueue}. */
@GwtIncompatible // ConcurrentLinkedQueue
public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
return new ConcurrentLinkedQueue<E>();
}
Creates a ConcurrentLinkedQueue
containing the elements of the specified iterable, in the order they are returned by the iterable's iterator. /**
* Creates a {@code ConcurrentLinkedQueue} containing the elements of the specified iterable, in
* the order they are returned by the iterable's iterator.
*/
@GwtIncompatible // ConcurrentLinkedQueue
public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
}
ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
Iterables.addAll(queue, elements);
return queue;
}
// LinkedBlockingDeque
Creates an empty LinkedBlockingDeque
with a capacity of Integer.MAX_VALUE
. Since: 12.0
/**
* Creates an empty {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
*
* @since 12.0
*/
@GwtIncompatible // LinkedBlockingDeque
public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() {
return new LinkedBlockingDeque<E>();
}
Creates an empty LinkedBlockingDeque
with the given (fixed) capacity. Throws: - IllegalArgumentException – if
capacity
is less than 1
Since: 12.0
/**
* Creates an empty {@code LinkedBlockingDeque} with the given (fixed) capacity.
*
* @throws IllegalArgumentException if {@code capacity} is less than 1
* @since 12.0
*/
@GwtIncompatible // LinkedBlockingDeque
public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) {
return new LinkedBlockingDeque<E>(capacity);
}
Creates a LinkedBlockingDeque
with a capacity of Integer.MAX_VALUE
, containing the elements of the specified iterable, in the order they are returned by the iterable's iterator. Since: 12.0
/**
* Creates a {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}, containing
* the elements of the specified iterable, in the order they are returned by the iterable's
* iterator.
*
* @since 12.0
*/
@GwtIncompatible // LinkedBlockingDeque
public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new LinkedBlockingDeque<E>(Collections2.cast(elements));
}
LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>();
Iterables.addAll(deque, elements);
return deque;
}
// LinkedBlockingQueue
Creates an empty LinkedBlockingQueue
with a capacity of Integer.MAX_VALUE
. /** Creates an empty {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}. */
@GwtIncompatible // LinkedBlockingQueue
public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
return new LinkedBlockingQueue<E>();
}
Creates an empty LinkedBlockingQueue
with the given (fixed) capacity. Throws: - IllegalArgumentException – if
capacity
is less than 1
/**
* Creates an empty {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @throws IllegalArgumentException if {@code capacity} is less than 1
*/
@GwtIncompatible // LinkedBlockingQueue
public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
return new LinkedBlockingQueue<E>(capacity);
}
Creates a LinkedBlockingQueue
with a capacity of Integer.MAX_VALUE
, containing the elements of the specified iterable, in the order they are returned by the iterable's iterator. Params: - elements – the elements that the queue should contain, in order
Returns: a new LinkedBlockingQueue
containing those elements
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}, containing
* the elements of the specified iterable, in the order they are returned by the iterable's
* iterator.
*
* @param elements the elements that the queue should contain, in order
* @return a new {@code LinkedBlockingQueue} containing those elements
*/
@GwtIncompatible // LinkedBlockingQueue
public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new LinkedBlockingQueue<E>(Collections2.cast(elements));
}
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
Iterables.addAll(queue, elements);
return queue;
}
// LinkedList: see {@link com.google.common.collect.Lists}
// PriorityBlockingQueue
Creates an empty PriorityBlockingQueue
with the ordering given by its elements' natural ordering. Since: 11.0 (requires that E
be Comparable
since 15.0).
/**
* Creates an empty {@code PriorityBlockingQueue} with the ordering given by its elements' natural
* ordering.
*
* @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
*/
@GwtIncompatible // PriorityBlockingQueue
public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
return new PriorityBlockingQueue<E>();
}
Creates a PriorityBlockingQueue
containing the given elements. Note: If the specified iterable is a SortedSet
or a PriorityQueue
, this priority queue will be ordered according to the same ordering.
Since: 11.0 (requires that E
be Comparable
since 15.0).
/**
* Creates a {@code PriorityBlockingQueue} containing the given elements.
*
* <p><b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
* this priority queue will be ordered according to the same ordering.
*
* @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
*/
@GwtIncompatible // PriorityBlockingQueue
public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue(
Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new PriorityBlockingQueue<E>(Collections2.cast(elements));
}
PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
Iterables.addAll(queue, elements);
return queue;
}
// PriorityQueue
Creates an empty PriorityQueue
with the ordering given by its elements' natural ordering. Since: 11.0 (requires that E
be Comparable
since 15.0).
/**
* Creates an empty {@code PriorityQueue} with the ordering given by its elements' natural
* ordering.
*
* @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
*/
public static <E extends Comparable> PriorityQueue<E> newPriorityQueue() {
return new PriorityQueue<E>();
}
Creates a PriorityQueue
containing the given elements. Note: If the specified iterable is a SortedSet
or a PriorityQueue
, this priority queue will be ordered according to the same ordering.
Since: 11.0 (requires that E
be Comparable
since 15.0).
/**
* Creates a {@code PriorityQueue} containing the given elements.
*
* <p><b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
* this priority queue will be ordered according to the same ordering.
*
* @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
*/
public static <E extends Comparable> PriorityQueue<E> newPriorityQueue(
Iterable<? extends E> elements) {
if (elements instanceof Collection) {
return new PriorityQueue<E>(Collections2.cast(elements));
}
PriorityQueue<E> queue = new PriorityQueue<E>();
Iterables.addAll(queue, elements);
return queue;
}
// SynchronousQueue
Creates an empty SynchronousQueue
with nonfair access policy. /** Creates an empty {@code SynchronousQueue} with nonfair access policy. */
@GwtIncompatible // SynchronousQueue
public static <E> SynchronousQueue<E> newSynchronousQueue() {
return new SynchronousQueue<E>();
}
Drains the queue as BlockingQueue.drainTo(Collection, int)
, but if the requested
numElements
elements are not available, it will wait for them up to the specified timeout. Params: - q – the blocking queue to be drained
- buffer – where to add the transferred elements
- numElements – the number of elements to be waited for
- timeout – how long to wait before giving up, in units of
unit
- unit – a
TimeUnit
determining how to interpret the timeout parameter
Throws: - InterruptedException – if interrupted while waiting
Returns: the number of elements transferred
/**
* Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested {@code
* numElements} elements are not available, it will wait for them up to the specified timeout.
*
* @param q the blocking queue to be drained
* @param buffer where to add the transferred elements
* @param numElements the number of elements to be waited for
* @param timeout how long to wait before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
* @return the number of elements transferred
* @throws InterruptedException if interrupted while waiting
*/
@Beta
@CanIgnoreReturnValue
@GwtIncompatible // BlockingQueue
public static <E> int drain(
BlockingQueue<E> q,
Collection<? super E> buffer,
int numElements,
long timeout,
TimeUnit unit)
throws InterruptedException {
Preconditions.checkNotNull(buffer);
/*
* This code performs one System.nanoTime() more than necessary, and in return, the time to
* execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
* the timeout arbitrarily inaccurate, given a queue that is slow to drain).
*/
long deadline = System.nanoTime() + unit.toNanos(timeout);
int added = 0;
while (added < numElements) {
// we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
// elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
added += q.drainTo(buffer, numElements - added);
if (added < numElements) { // not enough elements immediately available; will have to poll
E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
if (e == null) {
break; // we already waited enough, and there are no more elements in sight
}
buffer.add(e);
added++;
}
}
return added;
}
Drains the queue as drain(BlockingQueue<Object>, Collection<? super Object>, int, long, TimeUnit), but with a different behavior in case it is interrupted while waiting. In that case, the operation will continue as usual, and in the end the thread's interruption status will be set (no
InterruptedException
is thrown). Params: - q – the blocking queue to be drained
- buffer – where to add the transferred elements
- numElements – the number of elements to be waited for
- timeout – how long to wait before giving up, in units of
unit
- unit – a
TimeUnit
determining how to interpret the timeout parameter
Returns: the number of elements transferred
/**
* Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)}, but
* with a different behavior in case it is interrupted while waiting. In that case, the operation
* will continue as usual, and in the end the thread's interruption status will be set (no {@code
* InterruptedException} is thrown).
*
* @param q the blocking queue to be drained
* @param buffer where to add the transferred elements
* @param numElements the number of elements to be waited for
* @param timeout how long to wait before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
* @return the number of elements transferred
*/
@Beta
@CanIgnoreReturnValue
@GwtIncompatible // BlockingQueue
public static <E> int drainUninterruptibly(
BlockingQueue<E> q,
Collection<? super E> buffer,
int numElements,
long timeout,
TimeUnit unit) {
Preconditions.checkNotNull(buffer);
long deadline = System.nanoTime() + unit.toNanos(timeout);
int added = 0;
boolean interrupted = false;
try {
while (added < numElements) {
// we could rely solely on #poll, but #drainTo might be more efficient when there are
// multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
added += q.drainTo(buffer, numElements - added);
if (added < numElements) { // not enough elements immediately available; will have to poll
E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
while (true) {
try {
e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
break;
} catch (InterruptedException ex) {
interrupted = true; // note interruption and retry
}
}
if (e == null) {
break; // we already waited enough, and there are no more elements in sight
}
buffer.add(e);
added++;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
return added;
}
Returns a synchronized (thread-safe) queue backed by the specified queue. In order to guarantee
serial access, it is critical that all access to the backing queue is accomplished
through the returned queue.
It is imperative that the user manually synchronize on the returned queue when accessing the
queue's iterator:
Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create());
...
queue.add(element); // Needn't be in synchronized block
...
synchronized (queue) { // Must synchronize on queue!
Iterator<E> i = queue.iterator(); // Must be in synchronized block
while (i.hasNext()) {
foo(i.next());
}
}
Failure to follow this advice may result in non-deterministic behavior.
The returned queue will be serializable if the specified queue is serializable.
Params: - queue – the queue to be wrapped in a synchronized view
Returns: a synchronized view of the specified queue Since: 14.0
/**
* Returns a synchronized (thread-safe) queue backed by the specified queue. In order to guarantee
* serial access, it is critical that <b>all</b> access to the backing queue is accomplished
* through the returned queue.
*
* <p>It is imperative that the user manually synchronize on the returned queue when accessing the
* queue's iterator:
*
* <pre>{@code
* Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create());
* ...
* queue.add(element); // Needn't be in synchronized block
* ...
* synchronized (queue) { // Must synchronize on queue!
* Iterator<E> i = queue.iterator(); // Must be in synchronized block
* while (i.hasNext()) {
* foo(i.next());
* }
* }
* }</pre>
*
* <p>Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned queue will be serializable if the specified queue is serializable.
*
* @param queue the queue to be wrapped in a synchronized view
* @return a synchronized view of the specified queue
* @since 14.0
*/
public static <E> Queue<E> synchronizedQueue(Queue<E> queue) {
return Synchronized.queue(queue, null);
}
Returns a synchronized (thread-safe) deque backed by the specified deque. In order to guarantee
serial access, it is critical that all access to the backing deque is accomplished
through the returned deque.
It is imperative that the user manually synchronize on the returned deque when accessing any
of the deque's iterators:
Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque());
...
deque.add(element); // Needn't be in synchronized block
...
synchronized (deque) { // Must synchronize on deque!
Iterator<E> i = deque.iterator(); // Must be in synchronized block
while (i.hasNext()) {
foo(i.next());
}
}
Failure to follow this advice may result in non-deterministic behavior.
The returned deque will be serializable if the specified deque is serializable.
Params: - deque – the deque to be wrapped in a synchronized view
Returns: a synchronized view of the specified deque Since: 15.0
/**
* Returns a synchronized (thread-safe) deque backed by the specified deque. In order to guarantee
* serial access, it is critical that <b>all</b> access to the backing deque is accomplished
* through the returned deque.
*
* <p>It is imperative that the user manually synchronize on the returned deque when accessing any
* of the deque's iterators:
*
* <pre>{@code
* Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque());
* ...
* deque.add(element); // Needn't be in synchronized block
* ...
* synchronized (deque) { // Must synchronize on deque!
* Iterator<E> i = deque.iterator(); // Must be in synchronized block
* while (i.hasNext()) {
* foo(i.next());
* }
* }
* }</pre>
*
* <p>Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned deque will be serializable if the specified deque is serializable.
*
* @param deque the deque to be wrapped in a synchronized view
* @return a synchronized view of the specified deque
* @since 15.0
*/
public static <E> Deque<E> synchronizedDeque(Deque<E> deque) {
return Synchronized.deque(deque, null);
}
}