package com.conversantmedia.util.concurrent;
/*
* #%L
* Conversant Disruptor
* ~~
* Conversantmedia.com © 2016, Conversant, Inc. Conversant® is a trademark of Conversant, Inc.
* ~~
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
Concurrent "lock-free" version of a stack.
Author: John Cairns
Date: 7/9/12
/**
* Concurrent "lock-free" version of a stack.
*
* @author John Cairns
* <p>Date: 7/9/12</p>
*/
public final class ConcurrentStack<N> implements BlockingStack<N> {
private final int size;
private final AtomicReferenceArray<N> stack;
// representing the top of the stack
private final ContendedAtomicInteger stackTop = new ContendedAtomicInteger(0);
private final SequenceLock seqLock = new SequenceLock();
private final Condition stackNotFullCondition;
private final Condition stackNotEmptyCondition;
public ConcurrentStack(final int size) {
this(size, SpinPolicy.WAITING);
}
construct a new stack of given capacity
@param size - the stack size
@param spinPolicy - determine the level of cpu aggressiveness in waiting
/**
* construct a new stack of given capacity
*
* @param size - the stack size
* @param spinPolicy - determine the level of cpu aggressiveness in waiting
*/
public ConcurrentStack(final int size, final SpinPolicy spinPolicy) {
int stackSize = 1;
while(stackSize < size) stackSize <<=1;
this.size = stackSize;
stack = new AtomicReferenceArray<N>(stackSize);
switch(spinPolicy) {
case BLOCKING:
stackNotFullCondition = new StackNotFull();
stackNotEmptyCondition = new StackNotEmpty();
break;
case SPINNING:
stackNotFullCondition = new SpinningStackNotFull();
stackNotEmptyCondition = new SpinningStackNotEmpty();
break;
case WAITING:
default:
stackNotFullCondition = new WaitingStackNotFull();
stackNotEmptyCondition = new WaitingStackNotEmpty();
}
}
@Override
public final boolean push(final N n, final long time, final TimeUnit unit) throws InterruptedException {
final long endDate = System.nanoTime() + unit.toNanos(time);
while(!push(n)) {
if(endDate - System.nanoTime() < 0) {
return false;
}
Condition.waitStatus(time, unit, stackNotFullCondition);
}
stackNotEmptyCondition.signal();
return true;
}
@Override
public final void pushInterruptibly(final N n) throws InterruptedException {
while(!push(n)) {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
stackNotFullCondition.await();
}
stackNotEmptyCondition.signal();
}
@Override
public final boolean contains(final N n) {
if(n != null) {
for(int i = 0; i<stackTop.get(); i++) {
if(n.equals(stack.get(i))) return true;
}
}
return false;
}
add an element to the stack, failing if the stack is unable to grow
Params: - n – - the element to push
Returns: boolean - false if stack overflow, true otherwise
/**
* add an element to the stack, failing if the stack is unable to grow
*
* @param n - the element to push
*
* @return boolean - false if stack overflow, true otherwise
*/
@Override
public final boolean push(final N n) {
int spin = 0;
for(;;) {
final long writeLock = seqLock.tryWriteLock();
if(writeLock>0L) {
try {
final int stackTop = this.stackTop.get();
if(size>stackTop) {
try {
stack.set(stackTop, n);
stackNotEmptyCondition.signal();
return true;
} finally {
this.stackTop.set(stackTop+1);
}
} else {
return false;
}
} finally {
seqLock.unlock(writeLock);
}
}
spin = Condition.progressiveYield(spin);
}
}
peek at the top of the stack
Returns: N - the object at the top of the stack
/**
* peek at the top of the stack
*
* @return N - the object at the top of the stack
*/
@Override
public final N peek() {
// read the current cursor
int spin = 0;
for(;;) {
final long readLock = seqLock.readLock();
final int stackTop = this.stackTop.get();
if(stackTop > 0) {
final N n = stack.get(stackTop-1);
if(seqLock.readLockHeld(readLock)) {
return n;
} // else loop again
} else {
return null;
}
spin = Condition.progressiveYield(spin);
}
}
pop the next element off the stack
Returns: N - The object on the top of the stack
/**
* pop the next element off the stack
* @return N - The object on the top of the stack
*/
@Override
public final N pop() {
int spin = 0;
// now pop the stack
for(;;) {
final long writeLock = seqLock.tryWriteLock();
if(writeLock > 0) {
try {
final int stackTop = this.stackTop.get();
final int lastRef = stackTop-1;
if(stackTop>0) {
try {
// if we can modify the stack - i.e. nobody else is modifying
final N n = stack.get(lastRef);
stack.set(lastRef, null);
stackNotFullCondition.signal();
return n;
} finally {
this.stackTop.set(lastRef);
}
} else {
return null;
}
} finally {
seqLock.unlock(writeLock);
}
}
spin = Condition.progressiveYield(spin);
}
}
@Override
public final N pop(final long time, final TimeUnit unit) throws InterruptedException {
final long endTime = System.nanoTime() + unit.toNanos(time);
for(;;) {
final N n = pop();
if(n != null) {
stackNotFullCondition.signal();
return n;
} else {
if(endTime - System.nanoTime() < 0) {
return null;
}
}
Condition.waitStatus(time, unit, stackNotEmptyCondition);
}
}
@Override
public final N popInterruptibly() throws InterruptedException {
for(;;) {
final N n = pop();
if(n != null) {
stackNotFullCondition.signal();
return n;
} else {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
}
stackNotEmptyCondition.await();
}
}
Return the size of the stack
Returns: int - number of elements in the stack
/**
* Return the size of the stack
* @return int - number of elements in the stack
*/
@Override
public final int size() {
return stackTop.get();
}
how much available space in the stack
/**
* how much available space in the stack
*/
@Override
public final int remainingCapacity() {
return size - stackTop.get();
}
Returns: boolean - true if stack is currently empty
/**
* @return boolean - true if stack is currently empty
*/
@Override
public final boolean isEmpty() {
return stackTop.get()==0;
}
clear the stack - does not null old references
/**
* clear the stack - does not null old references
*/
@Override
public final void clear() {
int spin = 0;
for(;;) {
final long writeLock = seqLock.tryWriteLock();
if(writeLock > 0L) {
final int stackTop = this.stackTop.get();
if(stackTop>0) {
try {
for(int i = 0; i<stackTop; i++) {
stack.set(i, null);
}
stackNotFullCondition.signal();
return;
} finally {
this.stackTop.set(0);
}
} else {
return;
}
}
spin = Condition.progressiveYield(spin);
}
}
private boolean isFull() {
return size == stackTop.get();
}
// condition used for signaling queue is full
private final class WaitingStackNotFull extends AbstractWaitingCondition {
@Override
// @return boolean - true if the queue is full
public final boolean test() {
return isFull();
}
}
// condition used for signaling queue is empty
private final class WaitingStackNotEmpty extends AbstractWaitingCondition {
@Override
// @return boolean - true if the queue is empty
public final boolean test() {
return isEmpty();
}
}
// condition used for signaling queue is full
private final class SpinningStackNotFull extends AbstractSpinningCondition {
@Override
// @return boolean - true if the queue is full
public final boolean test() {
return isFull();
}
}
// condition used for signaling queue is empty
private final class SpinningStackNotEmpty extends AbstractSpinningCondition {
@Override
// @return boolean - true if the queue is empty
public final boolean test() {
return isEmpty();
}
}
// condition used for signaling queue is full
private final class StackNotFull extends AbstractCondition {
@Override
// @return boolean - true if the queue is full
public final boolean test() {
return isFull();
}
}
// condition used for signaling queue is empty
private final class StackNotEmpty extends AbstractCondition {
@Override
// @return boolean - true if the queue is empty
public final boolean test() {
return isEmpty();
}
}
}