/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.lang.reflect.Field;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;
import sun.misc.Unsafe;
import static java.util.Arrays.copyOf;
Ring based store of reusable entries containing the data representing an event being exchanged between event producer
and ringbuffer consumers.
Type parameters: - <E> – implementation storing the data for sharing during exchange or parallel coordination of an event.
This is an adaption of the original LMAX Disruptor RingBuffer code from
https://lmax-exchange.github.io/disruptor/.
/**
* Ring based store of reusable entries containing the data representing an event being exchanged between event producer
* and ringbuffer consumers.
* @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
*
* This is an adaption of the original LMAX Disruptor RingBuffer code from
* https://lmax-exchange.github.io/disruptor/.
*/
abstract class RingBuffer<E> implements LongSupplier {
static <T> void addSequence(final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
final Sequence sequence) {
Sequence[] updatedSequences;
Sequence[] currentSequences;
do {
currentSequences = updater.get(holder);
updatedSequences = copyOf(currentSequences, currentSequences.length + 1);
updatedSequences[currentSequences.length] = sequence;
}
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
}
private static <T> int countMatching(T[] values, final T toMatch) {
int numToRemove = 0;
for (T value : values) {
if (value == toMatch) // Specifically uses identity
{
numToRemove++;
}
}
return numToRemove;
}
static <T> boolean removeSequence(final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
final Sequence sequence) {
int numToRemove;
Sequence[] oldSequences;
Sequence[] newSequences;
do {
oldSequences = sequenceUpdater.get(holder);
numToRemove = countMatching(oldSequences, sequence);
if (0 == numToRemove) {
break;
}
final int oldSize = oldSequences.length;
newSequences = new Sequence[oldSize - numToRemove];
for (int i = 0, pos = 0; i < oldSize; i++) {
final Sequence testSequence = oldSequences[i];
if (sequence != testSequence) {
newSequences[pos++] = testSequence;
}
}
}
while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
return numToRemove != 0;
}
Set to -1 as sequence starting point
/**
* Set to -1 as sequence starting point
*/
static final long INITIAL_CURSOR_VALUE = -1L;
Create a new multiple producer RingBuffer with the specified wait strategy.
See MultiProducerRingBuffer
.
Params: - factory – used to create the events within the ring buffer.
- bufferSize – number of elements to create within the ring buffer.
- waitStrategy – used to determine how to wait for new elements to become available.
- spinObserver – the Runnable to call on a spin loop wait
Type parameters: - <E> – the element type
Returns: the new RingBuffer instance
/**
* Create a new multiple producer RingBuffer with the specified wait strategy.
* <p>See {@code MultiProducerRingBuffer}.
* @param <E> the element type
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @param spinObserver the Runnable to call on a spin loop wait
* @return the new RingBuffer instance
*/
static <E> RingBuffer<E> createMultiProducer(Supplier<E> factory,
int bufferSize,
WaitStrategy waitStrategy, Runnable spinObserver) {
if (hasUnsafe()) {
MultiProducerRingBuffer sequencer = new MultiProducerRingBuffer(bufferSize, waitStrategy, spinObserver);
return new UnsafeRingBuffer<>(factory, sequencer);
}
else {
throw new IllegalStateException("This JVM does not support sun.misc.Unsafe");
}
}
Create a new single producer RingBuffer with the specified wait strategy.
See MultiProducerRingBuffer
.
Params: - factory – used to create the events within the ring buffer.
- bufferSize – number of elements to create within the ring buffer.
- waitStrategy – used to determine how to wait for new elements to become available.
Type parameters: - <E> – the element type
Returns: the new RingBuffer instance
/**
* Create a new single producer RingBuffer with the specified wait strategy.
* <p>See {@code MultiProducerRingBuffer}.
* @param <E> the element type
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @return the new RingBuffer instance
*/
static <E> RingBuffer<E> createSingleProducer(Supplier<E> factory,
int bufferSize,
WaitStrategy waitStrategy) {
return createSingleProducer(factory, bufferSize, waitStrategy, null);
}
Create a new single producer RingBuffer with the specified wait strategy.
See MultiProducerRingBuffer
.
Params: - factory – used to create the events within the ring buffer.
- bufferSize – number of elements to create within the ring buffer.
- waitStrategy – used to determine how to wait for new elements to become available.
- spinObserver – called each time the next claim is spinning and waiting for a slot
Type parameters: - <E> – the element type
Returns: the new RingBuffer instance
/**
* Create a new single producer RingBuffer with the specified wait strategy.
* <p>See {@code MultiProducerRingBuffer}.
* @param <E> the element type
* @param factory used to create the events within the ring buffer.
* @param bufferSize number of elements to create within the ring buffer.
* @param waitStrategy used to determine how to wait for new elements to become available.
* @param spinObserver called each time the next claim is spinning and waiting for a slot
* @return the new RingBuffer instance
*/
static <E> RingBuffer<E> createSingleProducer(Supplier<E> factory,
int bufferSize,
WaitStrategy waitStrategy,
@Nullable Runnable spinObserver) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy, spinObserver);
if (hasUnsafe() && Queues.isPowerOfTwo(bufferSize)) {
return new UnsafeRingBuffer<>(factory, sequencer);
}
else {
return new NotFunRingBuffer<>(factory, sequencer);
}
}
Get the minimum sequence from an array of Sequence
s. Params: - sequences – to compare.
- minimum – an initial default minimum. If the array is empty this value will be returned.
Returns: the minimum sequence found or Long.MAX_VALUE if the array is empty.
/**
* Get the minimum sequence from an array of {@link Sequence}s.
*
* @param sequences to compare.
* @param minimum an initial default minimum. If the array is empty this value will be returned.
*
* @return the minimum sequence found or Long.MAX_VALUE if the array is empty.
*/
static long getMinimumSequence(final Sequence[] sequences, long minimum) {
for (int i = 0, n = sequences.length; i < n; i++) {
long value = sequences[i].getAsLong();
minimum = Math.min(minimum, value);
}
return minimum;
}
Get the minimum sequence from an array of Sequence
s. Params: - excludeSequence – to exclude from search.
- sequences – to compare.
- minimum – an initial default minimum. If the array is empty this value will be returned.
Returns: the minimum sequence found or Long.MAX_VALUE if the array is empty.
/**
* Get the minimum sequence from an array of {@link Sequence}s.
*
* @param excludeSequence to exclude from search.
* @param sequences to compare.
* @param minimum an initial default minimum. If the array is empty this value will be returned.
*
* @return the minimum sequence found or Long.MAX_VALUE if the array is empty.
*/
static long getMinimumSequence(@Nullable Sequence excludeSequence, final Sequence[] sequences, long minimum) {
for (int i = 0, n = sequences.length; i < n; i++) {
if (excludeSequence == null || sequences[i] != excludeSequence) {
long value = sequences[i].getAsLong();
minimum = Math.min(minimum, value);
}
}
return minimum;
}
Return the sun.misc.Unsafe
instance if found on the classpath and can be used for acclerated direct memory access. Type parameters: - <T> – the Unsafe type
Returns: the Unsafe instance
/**
* Return the {@code sun.misc.Unsafe} instance if found on the classpath and can be used for acclerated
* direct memory access.
*
* @param <T> the Unsafe type
* @return the Unsafe instance
*/
@SuppressWarnings("unchecked")
static <T> T getUnsafe() {
return (T) UnsafeSupport.getUnsafe();
}
Calculate the log base 2 of the supplied integer, essentially reports the location of the highest bit.
Params: - i – Value to calculate log2 for.
Returns: The log2 value
/**
* Calculate the log base 2 of the supplied integer, essentially reports the location of the highest bit.
*
* @param i Value to calculate log2 for.
*
* @return The log2 value
*/
static int log2(int i) {
int r = 0;
while ((i >>= 1) != 0) {
++r;
}
return r;
}
Params: - init – the initial value
Returns: a safe or unsafe sequence set to the passed init value
/**
* @param init the initial value
*
* @return a safe or unsafe sequence set to the passed init value
*/
static Sequence newSequence(long init) {
if (hasUnsafe()) {
return new UnsafeSequence(init);
}
else {
return new AtomicSequence(init);
}
}
Add the specified gating sequence to this instance of the Disruptor. It will safely and atomically be added to
the list of gating sequences and not RESET to the current ringbuffer cursor.
Params: - gatingSequence – The sequences to add.
/**
* Add the specified gating sequence to this instance of the Disruptor. It will safely and atomically be added to
* the list of gating sequences and not RESET to the current ringbuffer cursor.
* @param gatingSequence The sequences to add.
*/
abstract void addGatingSequence(Sequence gatingSequence);
Returns: the fixed buffer size
/**
* @return the fixed buffer size
*/
abstract int bufferSize();
Get the event for a given sequence in the RingBuffer.
This call has 2 uses. Firstly use this call when publishing to a ring buffer. After calling next()
use this call to get hold of the preallocated event to fill with data before calling publish(long)
.
Secondly use this call when consuming data from the ring buffer. After calling
* Reader#waitFor)
call this method with any value greater than that your current consumer sequence and less than or equal to the value returned from the Reader#waitFor)
method.
Params: - sequence – for the event
Returns: the event for the given sequence
/**
* <p>Get the event for a given sequence in the RingBuffer.</p>
*
* <p>This call has 2 uses. Firstly use this call when publishing to a ring buffer. After calling {@link
* RingBuffer#next()} use this call to get hold of the preallocated event to fill with data before calling {@link
* RingBuffer#publish(long)}.</p>
*
* <p>Secondly use this call when consuming data from the ring buffer. After calling {@link
* Reader#waitFor)} call this method with any value greater than that your
* current consumer sequence
* and less than or equal to the value returned from the {@link Reader#waitFor)} method.</p>
* @param sequence for the event
* @return the event for the given sequence
*/
abstract E get(long sequence);
@Override
public long getAsLong() {
return getCursor();
}
Get the current cursor value for the ring buffer. The actual value recieved will depend on the type of
RingBufferProducer
that is being used. See MultiProducerRingBuffer
. See SingleProducerSequencer
Returns: the current cursor value
/**
* Get the current cursor value for the ring buffer. The actual value recieved will depend on the type of {@code
* RingBufferProducer} that is being used.
* <p>
* See {@code MultiProducerRingBuffer}.
* See {@code SingleProducerSequencer}
* @return the current cursor value
*/
abstract long getCursor();
Get the minimum sequence value from all of the gating sequences added to this ringBuffer.
Returns: The minimum gating sequence or the cursor sequence if no sequences have been added.
/**
* Get the minimum sequence value from all of the gating sequences added to this ringBuffer.
* @return The minimum gating sequence or the cursor sequence if no sequences have been added.
*/
abstract long getMinimumGatingSequence();
Get the minimum sequence value from all of the gating sequences added to this ringBuffer.
Params: - sequence – the target sequence
Returns: The minimum gating sequence or the cursor sequence if no sequences have been added.
/**
* Get the minimum sequence value from all of the gating sequences added to this ringBuffer.
* @param sequence the target sequence
* @return The minimum gating sequence or the cursor sequence if no sequences have been added.
*/
abstract long getMinimumGatingSequence(Sequence sequence);
Get the buffered count
Returns: the buffered count
/**
* Get the buffered count
* @return the buffered count
*/
abstract int getPending();
Returns: the current list of read cursors
/**
*
* @return the current list of read cursors
*/
Sequence[] getSequenceReceivers() {
return getSequencer().getGatingSequences();
}
Create a new Reader
to track which messages are available to be read from the ring buffer given a list of sequences to track. See Also: Returns: A sequence barrier that will track the ringbuffer.
/**
* Create a new {@link Reader} to track
* which
* messages are available to be read
* from the ring buffer given a list of sequences to track.
* @return A sequence barrier that will track the ringbuffer.
* @see Reader
*/
abstract Reader newReader();
Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always
publish the sequence afterward. E.g.
long sequence = ringBuffer.next();
try {
Event e = ringBuffer.get(sequence);
// Do some work with the event.
} finally {
ringBuffer.publish(sequence);
}
See Also: Returns: The next sequence to publish to.
/**
* Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always
* publish the sequence afterward. E.g.
* <pre>
* long sequence = ringBuffer.next();
* try {
* Event e = ringBuffer.get(sequence);
* // Do some work with the event.
* } finally {
* ringBuffer.publish(sequence);
* }
* </pre>
* @return The next sequence to publish to.
* @see RingBuffer#publish(long)
* @see RingBuffer#get(long)
*/
abstract long next();
The same functionality as next()
, but allows the caller to claim the next n sequences. See RingBufferProducer.next(int)
Params: - n – number of slots to claim
Returns: sequence number of the highest slot claimed
/**
* The same functionality as {@link RingBuffer#next()}, but allows the caller to claim the next n sequences.
* <p>
* See {@code RingBufferProducer.next(int)}
* @param n number of slots to claim
* @return sequence number of the highest slot claimed
*/
abstract long next(int n);
Publish the specified sequence. This action marks this particular message as being available to be read.
Params: - sequence – the sequence to publish.
/**
* Publish the specified sequence. This action marks this particular message as being available to be read.
* @param sequence the sequence to publish.
*/
abstract void publish(long sequence);
Remove the specified sequence from this ringBuffer.
Params: - sequence – to be removed.
Returns: true if this sequence was found, false otherwise.
/**
* Remove the specified sequence from this ringBuffer.
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
*/
abstract boolean removeGatingSequence(Sequence sequence);
abstract RingBufferProducer getSequencer();/*
/**
* Return {@code true} if {@code sun.misc.Unsafe} was found on the classpath and can be used for acclerated
* direct memory access.
* @return true if unsafe is present
*/
static boolean hasUnsafe() {
return HAS_UNSAFE;
}
static boolean hasUnsafe0() {
return UnsafeSupport.hasUnsafe();
}
private static final boolean HAS_UNSAFE = hasUnsafe0();
Concurrent sequence class used for tracking the progress of
the ring buffer and event processors. Support a number
of concurrent operations including CAS and order writes.
Also attempts to be more efficient with regards to false
sharing by adding padding around the volatile field.
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
interface Sequence extends LongSupplier
{
long INITIAL_VALUE = INITIAL_CURSOR_VALUE;
Perform an ordered write of this sequence. The intent is
a Store/Store barrier between this write and any previous
store.
Params: - value – The new value for the sequence.
/**
* Perform an ordered write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* store.
*
* @param value The new value for the sequence.
*/
void set(long value);
Perform a compare and set operation on the sequence.
Params: - expectedValue – The expected current value.
- newValue – The value to update to.
Returns: true if the operation succeeds, false otherwise.
/**
* Perform a compare and set operation on the sequence.
*
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
*/
boolean compareAndSet(long expectedValue, long newValue);
}
Used for Gating ringbuffer consumers on a cursor sequence and optional dependent ringbuffer consumer(s),
using the given WaitStrategy.
/**
* Used for Gating ringbuffer consumers on a cursor sequence and optional dependent ringbuffer consumer(s),
* using the given WaitStrategy.
*/
static final class Reader {
private final WaitStrategy waitStrategy;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final RingBufferProducer sequenceProducer;
Reader(final RingBufferProducer sequenceProducer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence) {
this.sequenceProducer = sequenceProducer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
}
Wait for the given sequence to be available for consumption.
Params: - consumer – a spin observer to invoke when nothing is available to read
- sequence – to wait for
Throws: - InterruptedException – if the thread needs awaking on a condition variable.
Returns: the sequence up to which is available
/**
* Wait for the given sequence to be available for consumption.
*
* @param consumer a spin observer to invoke when nothing is available to read
* @param sequence to wait for
* @return the sequence up to which is available
* @throws InterruptedException if the thread needs awaking on a condition variable.
*/
long waitFor(final long sequence, Runnable consumer)
throws InterruptedException {
if (alerted)
{
WaitStrategy.alert();
}
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, consumer);
if (availableSequence < sequence) {
return availableSequence;
}
return sequenceProducer.getHighestPublishedSequence(sequence, availableSequence);
}
The current alert status for the barrier.
Returns: true if in alert otherwise false.
/**
* The current alert status for the barrier.
*
* @return true if in alert otherwise false.
*/
boolean isAlerted() {
return alerted;
}
Alert the ringbuffer consumers of a status change and stay in this status until cleared.
/**
* Alert the ringbuffer consumers of a status change and stay in this status until cleared.
*/
void alert() {
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
Signal the ringbuffer consumers.
/**
* Signal the ringbuffer consumers.
*/
void signal() {
waitStrategy.signalAllWhenBlocking();
}
Clear the current alert status.
/**
* Clear the current alert status.
*/
void clearAlert() {
alerted = false;
}
}
}
// UnsafeSupport static initialization is derived from Netty's PlatformDependent0
// static initialization, the original licence of which is included below, verbatim.
// Modifications to the source material are:
// - a shorter amount of checks and fields (focusing on Unsafe and buffer address)
// - modifications to the logging messages and their level
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
enum UnsafeSupport {
;
static final Logger logger = Loggers.getLogger(UnsafeSupport.class);
static {
String javaSpecVersion = System.getProperty("java.specification.version");
logger.debug("Starting UnsafeSupport init in Java " + javaSpecVersion);
ByteBuffer direct = ByteBuffer.allocateDirect(1);
Unsafe unsafe;
// Get an Unsafe instance first, via the (still legit as of Java 9)
// deep reflection trick on theUnsafe Field
Object maybeUnsafe;
try {
final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
// the unsafe instance
maybeUnsafe = unsafeField.get(null);
} catch (NoSuchFieldException | SecurityException | IllegalAccessException e) {
maybeUnsafe = e;
}
// the conditional check here can not be replaced with checking that maybeUnsafe
// is an instanceof Unsafe and reversing the if and else blocks; this is because an
// instanceof check against Unsafe will trigger a class load and we might not have
// the runtime permission accessClassInPackage.sun.misc
if (maybeUnsafe instanceof Throwable) {
unsafe = null;
logger.debug("Unsafe unavailable - Could not get it via Field sun.misc.Unsafe.theUnsafe", (Throwable) maybeUnsafe);
} else {
unsafe = (Unsafe) maybeUnsafe;
logger.trace("sun.misc.Unsafe.theUnsafe ok");
}
// ensure the unsafe supports all necessary methods to work around the mistake in the latest OpenJDK
// https://github.com/netty/netty/issues/1061
// http://www.mail-archive.com/jdk6-dev@openjdk.java.net/msg00698.html
if (unsafe != null) {
final Unsafe finalUnsafe = unsafe;
Object maybeException;
try {
finalUnsafe.getClass().getDeclaredMethod(
"copyMemory", Object.class, long.class, Object.class, long.class, long.class);
maybeException = null;
} catch (NoSuchMethodException | SecurityException e) {
maybeException = e;
}
if (maybeException == null) {
logger.trace("sun.misc.Unsafe.copyMemory ok");
} else {
unsafe = null;
logger.debug("Unsafe unavailable - failed on sun.misc.Unsafe.copyMemory", (Throwable) maybeException);
}
}
// finally check the Buffer#address
if (unsafe != null) {
final Unsafe finalUnsafe = unsafe;
Object maybeAddressField;
try {
final Field field = Buffer.class.getDeclaredField("address");
// Use Unsafe to read value of the address field.
// This way it will not fail on JDK9+ which forbids changing the
// access level via reflection.
final long offset = finalUnsafe.objectFieldOffset(field);
final long heapAddress = finalUnsafe.getLong(ByteBuffer.allocate(1), offset);
final long directAddress = finalUnsafe.getLong(direct, offset);
if (heapAddress != 0 && "1.8".equals(javaSpecVersion)) {
maybeAddressField = new IllegalStateException("A heap buffer must have 0 address in Java 8, got " + heapAddress);
}
else if (heapAddress == 0 && !"1.8".equals(javaSpecVersion)) {
maybeAddressField = new IllegalStateException("A heap buffer must have non-zero address in Java " + javaSpecVersion);
}
else if (directAddress == 0) {
maybeAddressField = new IllegalStateException("A direct buffer must have non-zero address");
}
else {
maybeAddressField = field;
}
} catch (NoSuchFieldException | SecurityException e) {
maybeAddressField = e;
}
if (maybeAddressField instanceof Throwable) {
logger.debug("Unsafe unavailable - failed on java.nio.Buffer.address", (Throwable) maybeAddressField);
// If we cannot access the address of a direct buffer, there's no point in using unsafe.
// Let's just pretend unsafe is unavailable for overall simplicity.
unsafe = null;
}
else {
logger.trace("java.nio.Buffer.address ok");
logger.debug("Unsafe is available");
}
}
UNSAFE = unsafe;
}
static Unsafe getUnsafe(){
return UNSAFE;
}
static boolean hasUnsafe() {
return UNSAFE != null;
}
private static final Unsafe UNSAFE;
}
Base class for the various sequencer types (single/multi). Provides common functionality like the management of
gating sequences (add/remove) and ownership of the current cursor.
/**
* Base class for the various sequencer types (single/multi). Provides common functionality like the management of
* gating sequences (add/remove) and ownership of the current cursor.
*/
abstract class RingBufferProducer {
static final AtomicReferenceFieldUpdater<RingBufferProducer, RingBuffer.Sequence[]>
SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(RingBufferProducer.class, RingBuffer.Sequence[].class,
"gatingSequences");
final Runnable spinObserver;
final int bufferSize;
final WaitStrategy waitStrategy;
final RingBuffer.Sequence cursor = RingBuffer.newSequence(RingBuffer.INITIAL_CURSOR_VALUE);
volatile RingBuffer.Sequence[] gatingSequences = new RingBuffer.Sequence[0];
Create with the specified buffer size and wait strategy.
Params: - bufferSize – The total number of entries, must be a positive power of 2.
- waitStrategy – The
WaitStrategy
to use. - spinObserver – an iteration observer
/**
* Create with the specified buffer size and wait strategy.
*
* @param bufferSize The total number of entries, must be a positive power of 2.
* @param waitStrategy The {@link WaitStrategy} to use.
* @param spinObserver an iteration observer
*/
RingBufferProducer(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver) {
this.spinObserver = spinObserver;
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
Get the current cursor value.
Returns: current cursor value
/**
* Get the current cursor value.
*
* @return current cursor value
*/
final long getCursor() {
return cursor.getAsLong();
}
The capacity of the data structure to hold entries.
Returns: the size of the RingBuffer.
/**
* The capacity of the data structure to hold entries.
*
* @return the size of the RingBuffer.
*/
final int getBufferSize() {
return bufferSize;
}
Add the specified gating sequences to this instance of the Disruptor. They will
safely and atomically added to the list of gating sequences.
Params: - gatingSequence – The sequences to add.
/**
* Add the specified gating sequences to this instance of the Disruptor. They will
* safely and atomically added to the list of gating sequences.
*
* @param gatingSequence The sequences to add.
*/
final void addGatingSequence(RingBuffer.Sequence gatingSequence) {
RingBuffer.addSequence(this, SEQUENCE_UPDATER, gatingSequence);
}
Remove the specified sequence from this sequencer.
Params: - sequence – to be removed.
Returns: true if this sequence was found, false otherwise.
/**
* Remove the specified sequence from this sequencer.
*
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
*/
boolean removeGatingSequence(RingBuffer.Sequence sequence) {
return RingBuffer.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
Get the minimum sequence value from all of the gating sequences
added to this ringBuffer.
Params: - excludeSequence – to exclude from search
Returns: The minimum gating sequence or the cursor sequence if
no sequences have been added.
/**
* Get the minimum sequence value from all of the gating sequences
* added to this ringBuffer.
*
* @param excludeSequence to exclude from search
* @return The minimum gating sequence or the cursor sequence if
* no sequences have been added.
*/
long getMinimumSequence(@Nullable RingBuffer.Sequence excludeSequence) {
return RingBuffer.getMinimumSequence(excludeSequence, gatingSequences, cursor.getAsLong());
}
Create a new Reader
to be used by an EventProcessor to track which messages are available to be read from the ring buffer See Also: Returns: A sequence barrier that will track the specified sequences.
/**
* Create a new {@link RingBuffer.Reader} to be used by an EventProcessor to track which messages
* are available to be read from the ring buffer
*
* @see RingBuffer.Reader
* @return A sequence barrier that will track the specified sequences.
*/
RingBuffer.Reader newBarrier() {
return new RingBuffer.Reader(this, waitStrategy, cursor);
}
Get the highest sequence number that can be safely read from the ring buffer. Depending
on the implementation of the Sequencer this call may need to get a number of values
in the Sequencer. The get will range from nextSequence to availableSequence. If
there are no available values >= nextSequence
the return value will be
nextSequence - 1
. To work correctly a consumer should pass a value that
it 1 higher than the last sequence that was successfully processed.
Params: - nextSequence – The sequence to start scanning from.
- availableSequence – The sequence to get to.
Returns: The highest value that can be safely read, will be at least nextSequence - 1
.
/**
* Get the highest sequence number that can be safely read from the ring buffer. Depending
* on the implementation of the Sequencer this call may need to get a number of values
* in the Sequencer. The get will range from nextSequence to availableSequence. If
* there are no available values <code>>= nextSequence</code> the return value will be
* <code>nextSequence - 1</code>. To work correctly a consumer should pass a value that
* it 1 higher than the last sequence that was successfully processed.
*
* @param nextSequence The sequence to start scanning from.
* @param availableSequence The sequence to get to.
* @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
*/
abstract long getHighestPublishedSequence(long nextSequence, long availableSequence);
Get the pending capacity for this sequencer.
Returns: The number of slots pending consuming.
/**
* Get the pending capacity for this sequencer.
* @return The number of slots pending consuming.
*/
abstract long getPending();
Claim the next event in sequence for publishing.
Returns: the claimed sequence value
/**
* Claim the next event in sequence for publishing.
* @return the claimed sequence value
*/
abstract long next();
Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
requires a little care and some math.
int n = 10;
long hi = sequencer.next(n);
long lo = hi - (n - 1);
for (long sequence = lo; sequence <= hi; sequence++) {
// Do work.
}
sequencer.publish(lo, hi);
Params: - n – the number of sequences to claim
Returns: the highest claimed sequence value
/**
* Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
* requires a little care and some math.
* <pre>
* int n = 10;
* long hi = sequencer.next(n);
* long lo = hi - (n - 1);
* for (long sequence = lo; sequence <= hi; sequence++) {
* // Do work.
* }
* sequencer.publish(lo, hi);
* </pre>
*
* @param n the number of sequences to claim
* @return the highest claimed sequence value
*/
abstract long next(int n);
Publishes a sequence. Call when the event has been filled.
Params: - sequence – the sequence number to be published
/**
* Publishes a sequence. Call when the event has been filled.
*
* @param sequence the sequence number to be published
*/
abstract void publish(long sequence);
Returns: the gating sequences array
/**
*
* @return the gating sequences array
*/
RingBuffer.Sequence[] getGatingSequences() {
return gatingSequences;
}
}
abstract class SingleProducerSequencerPad extends RingBufferProducer
{
protected long p1, p2, p3, p4, p5, p6, p7;
SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver)
{
super(bufferSize, waitStrategy, spinObserver);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver)
{
super(bufferSize, waitStrategy, spinObserver);
}
Set to -1 as sequence starting point /** Set to -1 as sequence starting point */
protected long nextValue = RingBuffer.Sequence.INITIAL_VALUE;
protected long cachedValue = RingBuffer.Sequence.INITIAL_VALUE;
}
Coordinator for claiming sequences for access to a data structure while tracking dependent Sequence
s. Not safe for use from multiple threads as it does not implement any barriers.
Note on RingBufferProducer.getCursor()
: With this sequencer the cursor value is updated after the call to RingBufferProducer.publish(long)
is made.
/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link RingBuffer.Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
*
* <p>Note on {@code RingBufferProducer.getCursor()}: With this sequencer the cursor value is updated after the call
* to {@code RingBufferProducer.publish(long)} is made.
*/
final class SingleProducerSequencer extends SingleProducerSequencerFields {
protected long p1, p2, p3, p4, p5, p6, p7;
Construct a Sequencer with the selected wait strategy and buffer size.
Params: - bufferSize – the size of the buffer that this will sequence over.
- waitStrategy – for those waiting on sequences.
- spinObserver – the runnable to call on a spin-wait
/**
* Construct a Sequencer with the selected wait strategy and buffer size.
*
* @param bufferSize the size of the buffer that this will sequence over.
* @param waitStrategy for those waiting on sequences.
* @param spinObserver the runnable to call on a spin-wait
*/
SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy, @Nullable Runnable spinObserver) {
super(bufferSize, waitStrategy, spinObserver);
}
See RingBufferProducer.next()
. /**
* See {@code RingBufferProducer.next()}.
*/
@Override
long next() {
return next(1);
}
See RingBufferProducer.next(int)
. /**
* See {@code RingBufferProducer.next(int)}.
*/
@Override
long next(int n) {
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence;
while (wrapPoint > (minSequence = RingBuffer.getMinimumSequence(gatingSequences, nextValue)))
{
if(spinObserver != null) {
spinObserver.run();
}
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
See RingBufferProducer.producerCapacity()
. /**
* See {@code RingBufferProducer.producerCapacity()}.
*/
@Override
public long getPending() {
long nextValue = this.nextValue;
long consumed = RingBuffer.getMinimumSequence(gatingSequences, nextValue);
long produced = nextValue;
return produced - consumed;
}
See RingBufferProducer.publish(long)
. /**
* See {@code RingBufferProducer.publish(long)}.
*/
@Override
void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
@Override
long getHighestPublishedSequence(long lowerBound, long availableSequence) {
return availableSequence;
}
}
abstract class NotFunRingBufferFields<E> extends RingBuffer<E>
{
private final long indexMask;
private final Object[] entries;
final int bufferSize;
final RingBufferProducer sequenceProducer;
NotFunRingBufferFields(Supplier<E> eventFactory,
RingBufferProducer sequenceProducer)
{
this.sequenceProducer = sequenceProducer;
this.bufferSize = sequenceProducer.getBufferSize();
this.indexMask = bufferSize - 1;
this.entries = new Object[sequenceProducer.getBufferSize()];
fill(eventFactory);
}
private void fill(Supplier<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[i] = eventFactory.get();
}
}
@SuppressWarnings("unchecked")
final E elementAt(long sequence)
{
return (E) entries[(int) (sequence & indexMask)];
}
}
Ring based store of reusable entries containing the data representing
an event being exchanged between event producer and ringbuffer consumers.
Type parameters: - <E> – implementation storing the data for sharing during exchange or parallel coordination of an event.
/**
* Ring based store of reusable entries containing the data representing
* an event being exchanged between event producer and ringbuffer consumers.
*
* @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
final class NotFunRingBuffer<E> extends NotFunRingBufferFields<E>
{
Construct a RingBuffer with the full option set.
Params: - eventFactory – to newInstance entries for filling the RingBuffer
- sequenceProducer – sequencer to handle the ordering of events moving through the RingBuffer.
Throws: - IllegalArgumentException – if bufferSize is less than 1 or not a power of 2
/**
* Construct a RingBuffer with the full option set.
*
* @param eventFactory to newInstance entries for filling the RingBuffer
* @param sequenceProducer sequencer to handle the ordering of events moving through the RingBuffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
*/
NotFunRingBuffer(Supplier<E> eventFactory,
RingBufferProducer sequenceProducer)
{
super(eventFactory, sequenceProducer);
}
@Override
E get(long sequence)
{
return elementAt(sequence);
}
@Override
long next()
{
return next(1);
}
@Override
long next(int n)
{
return sequenceProducer.next(n);
}
@Override
void addGatingSequence(Sequence gatingSequence)
{
sequenceProducer.addGatingSequence(gatingSequence);
}
@Override
long getMinimumGatingSequence()
{
return getMinimumGatingSequence(null);
}
@Override
long getMinimumGatingSequence(@Nullable Sequence sequence)
{
return sequenceProducer.getMinimumSequence(sequence);
}
@Override
boolean removeGatingSequence(Sequence sequence)
{
return sequenceProducer.removeGatingSequence(sequence);
}
@Override
Reader newReader()
{
return sequenceProducer.newBarrier();
}
@Override
long getCursor()
{
return sequenceProducer.getCursor();
}
@Override
int bufferSize()
{
return bufferSize;
}
@Override
void publish(long sequence)
{
sequenceProducer.publish(sequence);
}
@Override
int getPending() {
return (int)sequenceProducer.getPending();
}
@Override
RingBufferProducer getSequencer() {
return sequenceProducer;
}
}
final class AtomicSequence extends RhsPadding implements LongSupplier, RingBuffer.Sequence
{
private static final AtomicLongFieldUpdater<Value> UPDATER =
AtomicLongFieldUpdater.newUpdater(Value.class, "value");
Create a sequence with a specified initial value.
Params: - initialValue – The initial value for this sequence.
/**
* Create a sequence with a specified initial value.
*
* @param initialValue The initial value for this sequence.
*/
AtomicSequence(final long initialValue)
{
UPDATER.lazySet(this, initialValue);
}
@Override
public long getAsLong()
{
return value;
}
@Override
public void set(final long value)
{
UPDATER.set(this, value);
}
@Override
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UPDATER.compareAndSet(this, expectedValue, newValue);
}
}
abstract class RingBufferPad<E> extends RingBuffer<E>
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad<E>
{
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = RingBuffer.getUnsafe();
static {
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3;
} else {
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final RingBufferProducer sequenceProducer;
RingBufferFields(Supplier<E> eventFactory,
RingBufferProducer sequenceProducer) {
this.sequenceProducer = sequenceProducer;
this.bufferSize = sequenceProducer.getBufferSize();
this.indexMask = bufferSize - 1;
this.entries = new Object[sequenceProducer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(Supplier<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.get();
}
}
@SuppressWarnings("unchecked")
final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
Ring based store of reusable entries containing the data representing
an event being exchanged between event producer and ringbuffer consumers.
Type parameters: - <E> – implementation storing the data for sharing during exchange or parallel coordination of an event.
/**
* Ring based store of reusable entries containing the data representing
* an event being exchanged between event producer and ringbuffer consumers.
*
* @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
final class UnsafeRingBuffer<E> extends RingBufferFields<E>
{
protected long p1, p2, p3, p4, p5, p6, p7;
Construct a RingBuffer with the full option set.
Params: - eventFactory – to newInstance entries for filling the RingBuffer
- sequenceProducer – sequencer to handle the ordering of events moving through the RingBuffer.
Throws: - IllegalArgumentException – if bufferSize is less than 1 or not a power of 2
/**
* Construct a RingBuffer with the full option set.
*
* @param eventFactory to newInstance entries for filling the RingBuffer
* @param sequenceProducer sequencer to handle the ordering of events moving through the RingBuffer.
* @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
*/
UnsafeRingBuffer(Supplier<E> eventFactory,
RingBufferProducer sequenceProducer)
{
super(eventFactory, sequenceProducer);
}
@Override
E get(long sequence)
{
return elementAt(sequence);
}
@Override
long next()
{
return sequenceProducer.next();
}
@Override
long next(int n)
{
return sequenceProducer.next(n);
}
@Override
void addGatingSequence(Sequence gatingSequence)
{
sequenceProducer.addGatingSequence(gatingSequence);
}
@Override
long getMinimumGatingSequence()
{
return getMinimumGatingSequence(null);
}
@Override
long getMinimumGatingSequence(@Nullable Sequence sequence)
{
return sequenceProducer.getMinimumSequence(sequence);
}
@Override
boolean removeGatingSequence(Sequence sequence)
{
return sequenceProducer.removeGatingSequence(sequence);
}
@Override
Reader newReader()
{
return sequenceProducer.newBarrier();
}
@Override
long getCursor()
{
return sequenceProducer.getCursor();
}
@Override
int bufferSize()
{
return bufferSize;
}
@Override
void publish(long sequence)
{
sequenceProducer.publish(sequence);
}
@Override
int getPending() {
return (int)sequenceProducer.getPending();
}
@Override
RingBufferProducer getSequencer() {
return sequenceProducer;
}
}
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
Concurrent sequence class used for tracking the progress of
the ring buffer and event processors. Support a number
of concurrent operations including CAS and order writes.
Also attempts to be more efficient with regards to false
sharing by adding padding around the volatile field.
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
final class UnsafeSequence extends RhsPadding implements RingBuffer.Sequence, LongSupplier
{
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = RingBuffer.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
Create a sequence with a specified initial value.
Params: - initialValue – The initial value for this sequence.
/**
* Create a sequence with a specified initial value.
*
* @param initialValue The initial value for this sequence.
*/
UnsafeSequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
@Override
public long getAsLong()
{
return value;
}
@Override
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
@Override
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
}
Coordinator for claiming sequences for access to a data structure while tracking dependent Sequence
s. Suitable for use for sequencing across multiple publisher threads.
Note on RingBufferProducer.getCursor()
: With this sequencer the cursor value is updated after the call to RingBufferProducer.next()
, to determine the highest available sequence that can be read, then RingBufferProducer.getHighestPublishedSequence(long, long)
should be used.
/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link RingBuffer.Sequence}s.
* Suitable for use for sequencing across multiple publisher threads.</p>
*
* <p>
* <p>Note on {@code RingBufferProducer.getCursor()}: With this sequencer the cursor value is updated after the call
* to {@code RingBufferProducer.next()}, to determine the highest available sequence that can be read, then
* {@code RingBufferProducer.getHighestPublishedSequence(long, long)} should be used.
*/
final class MultiProducerRingBuffer extends RingBufferProducer
{
private static final Unsafe UNSAFE = RingBuffer.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
private final RingBuffer.Sequence gatingSequenceCache = new UnsafeSequence(RingBuffer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
Construct a Sequencer with the selected wait strategy and buffer size.
Params: - bufferSize – the size of the buffer that this will sequence over.
- waitStrategy – for those waiting on sequences.
/**
* Construct a Sequencer with the selected wait strategy and buffer size.
*
* @param bufferSize the size of the buffer that this will sequence over.
* @param waitStrategy for those waiting on sequences.
*/
MultiProducerRingBuffer(int bufferSize, final WaitStrategy waitStrategy, Runnable spinObserver) {
super(bufferSize, waitStrategy, spinObserver);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = RingBuffer.log2(bufferSize);
initialiseAvailableBuffer();
}
See RingBufferProducer.next()
. /**
* See {@code RingBufferProducer.next()}.
*/
@Override
long next()
{
return next(1);
}
See RingBufferProducer.next(int)
. /**
* See {@code RingBufferProducer.next(int)}.
*/
@Override
long next(int n)
{
long current;
long next;
do
{
current = cursor.getAsLong();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.getAsLong();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
if(spinObserver != null) {
spinObserver.run();
}
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
See RingBufferProducer.producerCapacity()
. /**
* See {@code RingBufferProducer.producerCapacity()}.
*/
@Override
long getPending()
{
long consumed = RingBuffer.getMinimumSequence(gatingSequences, cursor.getAsLong());
long produced = cursor.getAsLong();
return produced - consumed;
}
private void initialiseAvailableBuffer()
{
for (int i = availableBuffer.length - 1; i != 0; i--)
{
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
See RingBufferProducer.publish(long)
. /**
* See {@code RingBufferProducer.publish(long)}.
*/
@Override
void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
The below methods work on the availableBuffer flag.
The prime reason is to avoid a shared sequence object between publisher threads.
(Keeping single pointers tracking start and end would require coordination
between the threads).
-- Firstly we have the constraint that the delta between the cursor and minimum
gating sequence will never be larger than the buffer size (the code in
next/tryNext in the Sequence takes care of that).
-- Given that; take the sequence value and mask off the lower portion of the
sequence as the index into the buffer (indexMask). (aka modulo operator)
-- The upper portion of the sequence becomes the value to check for availability.
ie: it tells us how many times around the ring buffer we've been (aka division)
-- Because we can't wrap without the gating sequences moving forward (i.e. the
minimum gating sequence is effectively our last available position in the
buffer), when we have new data and successfully claimed a slot we can simply
write over the top.
/**
* The below methods work on the availableBuffer flag.
*
* The prime reason is to avoid a shared sequence object between publisher threads.
* (Keeping single pointers tracking start and end would require coordination
* between the threads).
*
* -- Firstly we have the constraint that the delta between the cursor and minimum
* gating sequence will never be larger than the buffer size (the code in
* next/tryNext in the Sequence takes care of that).
* -- Given that; take the sequence value and mask off the lower portion of the
* sequence as the index into the buffer (indexMask). (aka modulo operator)
* -- The upper portion of the sequence becomes the value to check for availability.
* ie: it tells us how many times around the ring buffer we've been (aka division)
* -- Because we can't wrap without the gating sequences moving forward (i.e. the
* minimum gating sequence is effectively our last available position in the
* buffer), when we have new data and successfully claimed a slot we can simply
* write over the top.
*/
private void setAvailable(final long sequence)
{
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
See RingBufferProducer.isAvailable(long)
/**
* See {@code RingBufferProducer.isAvailable(long)}
*/
boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
@Override
long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
private int calculateAvailabilityFlag(final long sequence)
{
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence)
{
return ((int) sequence) & indexMask;
}
}