Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the License for the specific language governing permissions and limitations under the License.
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
/*
* The code was inspired by the similarly named JCTools class:
* https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic
*/
package io.reactivex.internal.queue;
import java.util.concurrent.atomic.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.util.Pow2;
A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
This implementation is a mashup of the Fast Flow
algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast
Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
For convenience the relevant papers are available in the resources folder:
2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
This implementation is wait free.
Type parameters: - <E> – the element type of the queue
/**
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
* <p>
* This implementation is a mashup of the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
* algorithm with an optimization of the offer method taken from the <a
* href="http://staff.ustc.edu.cn/~bhua/publications/IJPP_draft.pdf">BQueue</a> algorithm (a variation on Fast
* Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.<br>
* For convenience the relevant papers are available in the resources folder:<br>
* <i>2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf<br>
* 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf <br>
* </i> This implementation is wait free.
*
* @param <E> the element type of the queue
*/
public final class SpscArrayQueue<E> extends AtomicReferenceArray<E> implements SimplePlainQueue<E> {
private static final long serialVersionUID = -1296597691183856449L;
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final int mask;
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}
@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
producerLookAhead = index + step;
} else if (null != lvElement(offset)) {
return false;
}
}
soElement(offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}
@Override
public boolean offer(E v1, E v2) {
// FIXME
return offer(v1) && offer(v2);
}
@Nullable
@Override
public E poll() {
final long index = consumerIndex.get();
final int offset = calcElementOffset(index);
// local load of field to avoid repeated loads after volatile reads
final E e = lvElement(offset); // LoadLoad
if (null == e) {
return null;
}
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(offset, null); // StoreStore
return e;
}
@Override
public boolean isEmpty() {
return producerIndex.get() == consumerIndex.get();
}
void soProducerIndex(long newIndex) {
producerIndex.lazySet(newIndex);
}
void soConsumerIndex(long newIndex) {
consumerIndex.lazySet(newIndex);
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty()) { } // NOPMD
}
int calcElementOffset(long index, int mask) {
return (int)index & mask;
}
int calcElementOffset(long index) {
return (int)index & mask;
}
void soElement(int offset, E value) {
lazySet(offset, value);
}
E lvElement(int offset) {
return get(offset);
}
}