package com.conversantmedia.util.concurrent;
/*
* #%L
* Conversant Disruptor
* ~~
* Conversantmedia.com © 2016, Conversant, Inc. Conversant® is a trademark of Conversant, Inc.
* ~~
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.concurrent.atomic.LongAdder;
Tuned version of Martin Thompson's push pull queue
Transfers from a single thread writer to a single thread reader are orders of nanoseconds (3-5)
This code is optimized and tested using a 64bit HotSpot JVM on an Intel x86-64 environment. Other
environments should be carefully tested before using in production.
Created by jcairns on 5/28/14.
/**
* Tuned version of Martin Thompson's push pull queue
*
* Transfers from a single thread writer to a single thread reader are orders of nanoseconds (3-5)
*
* This code is optimized and tested using a 64bit HotSpot JVM on an Intel x86-64 environment. Other
* environments should be carefully tested before using in production.
*
* Created by jcairns on 5/28/14.
*/
public class PushPullConcurrentQueue<E> implements ConcurrentQueue<E> {
final int size;
final long mask;
final LongAdder tail = new LongAdder();
long p1, p2, p3, p4, p5, p6, p7;
long tailCache = 0L;
long a1, a2, a3, a4, a5, a6, a7, a8;
final E[] buffer;
long r1, r2, r3, r4, r5, r6, r7;
long headCache = 0L;
long c1, c2, c3, c4, c5, c6, c7, c8;
final LongAdder head = new LongAdder();
public PushPullConcurrentQueue(final int capacity) {
this.size = Capacity.getCapacity(capacity);
this.mask = this.size-1;
buffer = (E[])new Object[this.size];
}
@Override
public boolean offer(final E e) {
if(e != null) {
final long tail = this.tail.sum();
final long queueStart = tail - size;
if((headCache > queueStart) || ((headCache = head.sum()) > queueStart)) {
final int dx = (int) (tail & mask);
buffer[dx] = e;
this.tail.increment();
return true;
} else {
return false;
}
} else {
throw new NullPointerException("Invalid element");
}
}
@Override
public E poll() {
final long head = this.head.sum();
if((head < tailCache) || (head < (tailCache = tail.sum()))) {
final int dx = (int)(head & mask);
final E e = buffer[dx];
buffer[dx] = null;
this.head.increment();
return e;
} else {
return null;
}
}
@Override
public int remove(final E[] e) {
int n = 0;
headCache = this.head.sum();
final int nMax = e.length;
for(long i = headCache, end = tail.sum(); n<nMax && i<end; i++) {
final int dx = (int) (i & mask);
e[n++] = buffer[dx];
buffer[dx] = null;
}
this.head.add(n);
return n;
}
@Override
public void clear() {
for(int i=0; i<buffer.length; i++) {
buffer[i] = null;
}
head.add(tail.sum()-head.sum());
}
@Override
public final E peek() {
return buffer[(int)(head.sum() & mask)];
}
This implemention is known to be broken if preemption were to occur after
reading the tail pointer.
Code should not depend on size for a correct result.
Returns: int - possibly the size, or possibly any value less than capacity()
/**
* This implemention is known to be broken if preemption were to occur after
* reading the tail pointer.
*
* Code should not depend on size for a correct result.
*
* @return int - possibly the size, or possibly any value less than capacity()
*/
@Override
public final int size() {
return (int)Math.max(tail.sum() - head.sum(), 0);
}
@Override
public int capacity() {
return size;
}
@Override
public final boolean isEmpty() {
return tail.sum() == head.sum();
}
@Override
public final boolean contains(Object o) {
if(o != null) {
for(long i = head.sum(), end = tail.sum(); i<end; i++) {
final E e = buffer[(int)(i & mask)];
if(o.equals(e)) {
return true;
}
}
}
return false;
}
long sumToAvoidOptimization() {
return p1+p2+p3+p4+p5+p6+p7+a1+a2+a3+a4+a5+a6+a7+a8+r1+r2+r3+r4+r5+r6+r7+c1+c2+c3+c4+c5+c6+c7+c8+headCache+tailCache;
}
}