/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;


import sun.misc.Unsafe;

import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.Util;

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static
    {
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
        else
        {
            throw new IllegalStateException("Unknown pointer size");
        }
        BUFFER_PAD = 128 / scale;
        // Including the buffer pad in the array base offset
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }

    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;

    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence)
    {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.
Type parameters:
  • <E> – implementation storing the data for sharing during exchange or parallel coordination of an event.
/** * Ring based store of reusable entries containing the data representing * an event being exchanged between event producer and {@link EventProcessor}s. * * @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event. */
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7;
Construct a RingBuffer with the full option set.
Params:
  • eventFactory – to newInstance entries for filling the RingBuffer
  • sequencer – sequencer to handle the ordering of events moving through the RingBuffer.
Throws:
/** * Construct a RingBuffer with the full option set. * * @param eventFactory to newInstance entries for filling the RingBuffer * @param sequencer sequencer to handle the ordering of events moving through the RingBuffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */
RingBuffer( EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); }
Create a new multiple producer RingBuffer with the specified wait strategy.
Params:
  • factory – used to create the events within the ring buffer.
  • bufferSize – number of elements to create within the ring buffer.
  • waitStrategy – used to determine how to wait for new elements to become available.
Type parameters:
  • <E> – Class of the event stored in the ring buffer.
Throws:
See Also:
Returns:a constructed ring buffer.
/** * Create a new multiple producer RingBuffer with the specified wait strategy. * * @param <E> Class of the event stored in the ring buffer. * @param factory used to create the events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @return a constructed ring buffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 * @see MultiProducerSequencer */
public static <E> RingBuffer<E> createMultiProducer( EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); }
Create a new multiple producer RingBuffer using the default wait strategy BlockingWaitStrategy.
Params:
  • factory – used to create the events within the ring buffer.
  • bufferSize – number of elements to create within the ring buffer.
Type parameters:
  • <E> – Class of the event stored in the ring buffer.
Throws:
See Also:
Returns:a constructed ring buffer.
/** * Create a new multiple producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}. * * @param <E> Class of the event stored in the ring buffer. * @param factory used to create the events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @return a constructed ring buffer. * @throws IllegalArgumentException if <tt>bufferSize</tt> is less than 1 or not a power of 2 * @see MultiProducerSequencer */
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize) { return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy()); }
Create a new single producer RingBuffer with the specified wait strategy.
Params:
  • factory – used to create the events within the ring buffer.
  • bufferSize – number of elements to create within the ring buffer.
  • waitStrategy – used to determine how to wait for new elements to become available.
Type parameters:
  • <E> – Class of the event stored in the ring buffer.
Throws:
See Also:
Returns:a constructed ring buffer.
/** * Create a new single producer RingBuffer with the specified wait strategy. * * @param <E> Class of the event stored in the ring buffer. * @param factory used to create the events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @return a constructed ring buffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 * @see SingleProducerSequencer */
public static <E> RingBuffer<E> createSingleProducer( EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); }
Create a new single producer RingBuffer using the default wait strategy BlockingWaitStrategy.
Params:
  • factory – used to create the events within the ring buffer.
  • bufferSize – number of elements to create within the ring buffer.
Type parameters:
  • <E> – Class of the event stored in the ring buffer.
Throws:
See Also:
Returns:a constructed ring buffer.
/** * Create a new single producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}. * * @param <E> Class of the event stored in the ring buffer. * @param factory used to create the events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @return a constructed ring buffer. * @throws IllegalArgumentException if <tt>bufferSize</tt> is less than 1 or not a power of 2 * @see MultiProducerSequencer */
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize) { return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy()); }
Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
Params:
  • producerType – producer type to use ProducerType.
  • factory – used to create events within the ring buffer.
  • bufferSize – number of elements to create within the ring buffer.
  • waitStrategy – used to determine how to wait for new elements to become available.
Type parameters:
  • <E> – Class of the event stored in the ring buffer.
Throws:
Returns:a constructed ring buffer.
/** * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) * * @param <E> Class of the event stored in the ring buffer. * @param producerType producer type to use {@link ProducerType}. * @param factory used to create events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @return a constructed ring buffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */
public static <E> RingBuffer<E> create( ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }

Get the event for a given sequence in the RingBuffer.

This call has 2 uses. Firstly use this call when publishing to a ring buffer. After calling next() use this call to get hold of the preallocated event to fill with data before calling publish(long).

Secondly use this call when consuming data from the ring buffer. After calling SequenceBarrier.waitFor(long) call this method with any value greater than that your current consumer sequence and less than or equal to the value returned from the SequenceBarrier.waitFor(long) method.

Params:
  • sequence – for the event
Returns:the event for the given sequence
/** * <p>Get the event for a given sequence in the RingBuffer.</p> * * <p>This call has 2 uses. Firstly use this call when publishing to a ring buffer. * After calling {@link RingBuffer#next()} use this call to get hold of the * preallocated event to fill with data before calling {@link RingBuffer#publish(long)}.</p> * * <p>Secondly use this call when consuming data from the ring buffer. After calling * {@link SequenceBarrier#waitFor(long)} call this method with any value greater than * that your current consumer sequence and less than or equal to the value returned from * the {@link SequenceBarrier#waitFor(long)} method.</p> * * @param sequence for the event * @return the event for the given sequence */
@Override public E get(long sequence) { return elementAt(sequence); }
Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always publish the sequence afterward. E.g.
long sequence = ringBuffer.next();
try {
    Event e = ringBuffer.get(sequence);
    // Do some work with the event.
} finally {
    ringBuffer.publish(sequence);
}
See Also:
Returns:The next sequence to publish to.
/** * Increment and return the next sequence for the ring buffer. Calls of this * method should ensure that they always publish the sequence afterward. E.g. * <pre> * long sequence = ringBuffer.next(); * try { * Event e = ringBuffer.get(sequence); * // Do some work with the event. * } finally { * ringBuffer.publish(sequence); * } * </pre> * * @return The next sequence to publish to. * @see RingBuffer#publish(long) * @see RingBuffer#get(long) */
@Override public long next() { return sequencer.next(); }
The same functionality as next(), but allows the caller to claim the next n sequences.
Params:
  • n – number of slots to claim
See Also:
Returns:sequence number of the highest slot claimed
/** * The same functionality as {@link RingBuffer#next()}, but allows the caller to claim * the next n sequences. * * @param n number of slots to claim * @return sequence number of the highest slot claimed * @see Sequencer#next(int) */
@Override public long next(int n) { return sequencer.next(n); }

Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always publish the sequence afterward. E.g.

long sequence = ringBuffer.next();
try {
    Event e = ringBuffer.get(sequence);
    // Do some work with the event.
} finally {
    ringBuffer.publish(sequence);
}

This method will not block if there is not space available in the ring buffer, instead it will throw an InsufficientCapacityException.

Throws:
See Also:
Returns:The next sequence to publish to.
/** * <p>Increment and return the next sequence for the ring buffer. Calls of this * method should ensure that they always publish the sequence afterward. E.g.</p> * <pre> * long sequence = ringBuffer.next(); * try { * Event e = ringBuffer.get(sequence); * // Do some work with the event. * } finally { * ringBuffer.publish(sequence); * } * </pre> * <p>This method will not block if there is not space available in the ring * buffer, instead it will throw an {@link InsufficientCapacityException}.</p> * * @return The next sequence to publish to. * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available * @see RingBuffer#publish(long) * @see RingBuffer#get(long) */
@Override public long tryNext() throws InsufficientCapacityException { return sequencer.tryNext(); }
The same functionality as tryNext(), but allows the caller to attempt to claim the next n sequences.
Params:
  • n – number of slots to claim
Throws:
Returns:sequence number of the highest slot claimed
/** * The same functionality as {@link RingBuffer#tryNext()}, but allows the caller to attempt * to claim the next n sequences. * * @param n number of slots to claim * @return sequence number of the highest slot claimed * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available */
@Override public long tryNext(int n) throws InsufficientCapacityException { return sequencer.tryNext(n); }
Resets the cursor to a specific value. This can be applied at any time, but it is worth noting that it can cause a data race and should only be used in controlled circumstances. E.g. during initialisation.
Params:
  • sequence – The sequence to reset too.
Throws:
/** * Resets the cursor to a specific value. This can be applied at any time, but it is worth noting * that it can cause a data race and should only be used in controlled circumstances. E.g. during * initialisation. * * @param sequence The sequence to reset too. * @throws IllegalStateException If any gating sequences have already been specified. */
@Deprecated public void resetTo(long sequence) { sequencer.claim(sequence); sequencer.publish(sequence); }
Sets the cursor to a specific sequence and returns the preallocated entry that is stored there. This can cause a data race and should only be done in controlled circumstances, e.g. during initialisation.
Params:
  • sequence – The sequence to claim.
Returns:The preallocated event.
/** * Sets the cursor to a specific sequence and returns the preallocated entry that is stored there. This * can cause a data race and should only be done in controlled circumstances, e.g. during initialisation. * * @param sequence The sequence to claim. * @return The preallocated event. */
public E claimAndGetPreallocated(long sequence) { sequencer.claim(sequence); return get(sequence); }
Determines if a particular entry is available. Note that using this when not within a context that is maintaining a sequence barrier, it is likely that using this to determine if you can read a value is likely to result in a race condition and broken code.
Params:
  • sequence – The sequence to identify the entry.
Returns:If the value can be read or not.
Deprecated:Please don't use this method. It probably won't do what you think that it does.
/** * Determines if a particular entry is available. Note that using this when not within a context that is * maintaining a sequence barrier, it is likely that using this to determine if you can read a value is likely * to result in a race condition and broken code. * * @param sequence The sequence to identify the entry. * @return If the value can be read or not. * @deprecated Please don't use this method. It probably won't * do what you think that it does. */
@Deprecated public boolean isPublished(long sequence) { return sequencer.isAvailable(sequence); }
Add the specified gating sequences to this instance of the Disruptor. They will safely and atomically added to the list of gating sequences.
Params:
  • gatingSequences – The sequences to add.
/** * Add the specified gating sequences to this instance of the Disruptor. They will * safely and atomically added to the list of gating sequences. * * @param gatingSequences The sequences to add. */
public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); }
Get the minimum sequence value from all of the gating sequences added to this ringBuffer.
Returns:The minimum gating sequence or the cursor sequence if no sequences have been added.
/** * Get the minimum sequence value from all of the gating sequences * added to this ringBuffer. * * @return The minimum gating sequence or the cursor sequence if * no sequences have been added. */
public long getMinimumGatingSequence() { return sequencer.getMinimumSequence(); }
Remove the specified sequence from this ringBuffer.
Params:
  • sequence – to be removed.
Returns:true if this sequence was found, false otherwise.
/** * Remove the specified sequence from this ringBuffer. * * @param sequence to be removed. * @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise. */
public boolean removeGatingSequence(Sequence sequence) { return sequencer.removeGatingSequence(sequence); }
Create a new SequenceBarrier to be used by an EventProcessor to track which messages are available to be read from the ring buffer given a list of sequences to track.
Params:
  • sequencesToTrack – the additional sequences to track
See Also:
Returns:A sequence barrier that will track the specified sequences.
/** * Create a new SequenceBarrier to be used by an EventProcessor to track which messages * are available to be read from the ring buffer given a list of sequences to track. * * @param sequencesToTrack the additional sequences to track * @return A sequence barrier that will track the specified sequences. * @see SequenceBarrier */
public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return sequencer.newBarrier(sequencesToTrack); }
Creates an event poller for this ring buffer gated on the supplied sequences.
Params:
  • gatingSequences – to be gated on.
Returns:A poller that will gate on this ring buffer and the supplied sequences.
/** * Creates an event poller for this ring buffer gated on the supplied sequences. * * @param gatingSequences to be gated on. * @return A poller that will gate on this ring buffer and the supplied sequences. */
public EventPoller<E> newPoller(Sequence... gatingSequences) { return sequencer.newPoller(this, gatingSequences); }
Get the current cursor value for the ring buffer. The actual value received will depend on the type of Sequencer that is being used.
See Also:
/** * Get the current cursor value for the ring buffer. The actual value received * will depend on the type of {@link Sequencer} that is being used. * * @see MultiProducerSequencer * @see SingleProducerSequencer */
@Override public long getCursor() { return sequencer.getCursor(); }
The size of the buffer.
/** * The size of the buffer. */
public int getBufferSize() { return bufferSize; }
Given specified requiredCapacity determines if that amount of space is available. Note, you can not assume that if this method returns true that a call to next() will not block. Especially true if this ring buffer is set up to handle multiple producers.
Params:
  • requiredCapacity – The capacity to check for.
Returns:true If the specified requiredCapacity is available false if not.
/** * Given specified <tt>requiredCapacity</tt> determines if that amount of space * is available. Note, you can not assume that if this method returns <tt>true</tt> * that a call to {@link RingBuffer#next()} will not block. Especially true if this * ring buffer is set up to handle multiple producers. * * @param requiredCapacity The capacity to check for. * @return <tt>true</tt> If the specified <tt>requiredCapacity</tt> is available * <tt>false</tt> if not. */
public boolean hasAvailableCapacity(int requiredCapacity) { return sequencer.hasAvailableCapacity(requiredCapacity); }
See Also:
  • publishEvent.publishEvent(EventTranslator)
/** * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslator) */
@Override public void publishEvent(EventTranslator<E> translator) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence); }
See Also:
  • tryPublishEvent.tryPublishEvent(EventTranslator)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslator) */
@Override public boolean tryPublishEvent(EventTranslator<E> translator) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
/** * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object) * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, A) */
@Override public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object) * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, A) */
@Override public <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, arg0); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
/** * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object) * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B) */
@Override public <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0, arg1); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object) * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B) */
@Override public <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, arg0, arg1); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
/** * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object) * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C) */
@Override public <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0, arg1, arg2); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object) * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C) */
@Override public <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, arg0, arg1, arg2); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • publishEvent.publishEvent(EventTranslatorVararg, Object...)
/** * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...) */
@Override public void publishEvent(EventTranslatorVararg<E> translator, Object... args) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, args); }
See Also:
  • tryPublishEvent.tryPublishEvent(EventTranslatorVararg, Object...)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...) */
@Override public boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, args); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • publishEvents.publishEvents(EventTranslator[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[]) */
@Override public void publishEvents(EventTranslator<E>[] translators) { publishEvents(translators, 0, translators.length); }
See Also:
  • publishEvents.publishEvents(EventTranslator[], int, int)
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[], int, int) */
@Override public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); }
See Also:
  • tryPublishEvents.tryPublishEvents(EventTranslator[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[]) */
@Override public boolean tryPublishEvents(EventTranslator<E>[] translators) { return tryPublishEvents(translators, 0, translators.length); }
See Also:
  • tryPublishEvents.tryPublishEvents(EventTranslator[], int, int)
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[], int, int) */
@Override public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[]) */
@Override public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) { publishEvents(translator, 0, arg0.length, arg0); }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[]) */
@Override public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[]) */
@Override public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) { return tryPublishEvents(translator, 0, arg0.length, arg0); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[]) */
@Override public <A> boolean tryPublishEvents( EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[]) */
@Override public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) { publishEvents(translator, 0, arg0.length, arg0, arg1); }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[]) */
@Override public <A, B> void publishEvents( EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1) { checkBounds(arg0, arg1, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[]) */
@Override public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[]) */
@Override public <A, B> boolean tryPublishEvents( EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1) { checkBounds(arg0, arg1, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[]) */
@Override public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2) { publishEvents(translator, 0, arg0.length, arg0, arg1, arg2); }
See Also:
  • com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[]) * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[]) */
@Override public <A, B, C> void publishEvents( EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2) { checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[]) */
@Override public <A, B, C> boolean tryPublishEvents( EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2); }
See Also:
  • com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[]) * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[]) */
@Override public <A, B, C> boolean tryPublishEvents( EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2) { checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
See Also:
  • publishEvents.publishEvents(EventTranslatorVararg, Object[][])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][]) */
@Override public void publishEvents(EventTranslatorVararg<E> translator, Object[]... args) { publishEvents(translator, 0, args.length, args); }
See Also:
  • publishEvents.publishEvents(EventTranslatorVararg, int, int, Object[][])
/** * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][]) */
@Override public void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args) { checkBounds(batchStartsAt, batchSize, args); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args); }
See Also:
  • tryPublishEvents.tryPublishEvents(EventTranslatorVararg, Object[][])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][]) */
@Override public boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args) { return tryPublishEvents(translator, 0, args.length, args); }
See Also:
  • tryPublishEvents.tryPublishEvents(EventTranslatorVararg, int, int, Object[][])
/** * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][]) */
@Override public boolean tryPublishEvents( EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args) { checkBounds(args, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args); return true; } catch (InsufficientCapacityException e) { return false; } }
Publish the specified sequence. This action marks this particular message as being available to be read.
Params:
  • sequence – the sequence to publish.
/** * Publish the specified sequence. This action marks this particular * message as being available to be read. * * @param sequence the sequence to publish. */
@Override public void publish(long sequence) { sequencer.publish(sequence); }
Publish the specified sequences. This action marks these particular messages as being available to be read.
Params:
  • lo – the lowest sequence number to be published
  • hi – the highest sequence number to be published
See Also:
/** * Publish the specified sequences. This action marks these particular * messages as being available to be read. * * @param lo the lowest sequence number to be published * @param hi the highest sequence number to be published * @see Sequencer#next(int) */
@Override public void publish(long lo, long hi) { sequencer.publish(lo, hi); }
Get the remaining capacity for this ringBuffer.
Returns:The number of slots remaining.
/** * Get the remaining capacity for this ringBuffer. * * @return The number of slots remaining. */
public long remainingCapacity() { return sequencer.remainingCapacity(); } private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(translators, batchStartsAt, batchSize); } private void checkBatchSizing(int batchStartsAt, int batchSize) { if (batchStartsAt < 0 || batchSize < 0) { throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + batchSize); } else if (batchSize > bufferSize) { throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + bufferSize + " entities."); } } private <A> void checkBounds(final A[] arg0, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); } private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); batchOverRuns(arg1, batchStartsAt, batchSize); } private <A, B, C> void checkBounds( final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); batchOverRuns(arg1, batchStartsAt, batchSize); batchOverRuns(arg2, batchStartsAt, batchSize); } private void checkBounds(final int batchStartsAt, final int batchSize, final Object[][] args) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(args, batchStartsAt, batchSize); } private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, final int batchSize) { if (batchStartsAt + batchSize > arg0.length) { throw new IllegalArgumentException( "A batchSize of: " + batchSize + " with batchStatsAt of: " + batchStartsAt + " will overrun the available number of arguments: " + (arg0.length - batchStartsAt)); } } private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } private <A, B> void translateAndPublish(EventTranslatorTwoArg<E, A, B> translator, long sequence, A arg0, B arg1) { try { translator.translateTo(get(sequence), sequence, arg0, arg1); } finally { sequencer.publish(sequence); } } private <A, B, C> void translateAndPublish( EventTranslatorThreeArg<E, A, B, C> translator, long sequence, A arg0, B arg1, C arg2) { try { translator.translateTo(get(sequence), sequence, arg0, arg1, arg2); } finally { sequencer.publish(sequence); } } private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence, Object... args) { try { translator.translateTo(get(sequence), sequence, args); } finally { sequencer.publish(sequence); } } private void translateAndPublishBatch( final EventTranslator<E>[] translators, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { final EventTranslator<E> translator = translators[i]; translator.translateTo(get(sequence), sequence++); } } finally { sequencer.publish(initialSequence, finalSequence); } } private <A> void translateAndPublishBatch( final EventTranslatorOneArg<E, A> translator, final A[] arg0, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { translator.translateTo(get(sequence), sequence++, arg0[i]); } } finally { sequencer.publish(initialSequence, finalSequence); } } private <A, B> void translateAndPublishBatch( final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1, int batchStartsAt, int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i]); } } finally { sequencer.publish(initialSequence, finalSequence); } } private <A, B, C> void translateAndPublishBatch( final EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, final C[] arg2, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i], arg2[i]); } } finally { sequencer.publish(initialSequence, finalSequence); } } private void translateAndPublishBatch( final EventTranslatorVararg<E> translator, int batchStartsAt, final int batchSize, final long finalSequence, final Object[][] args) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { translator.translateTo(get(sequence), sequence++, args[i]); } } finally { sequencer.publish(initialSequence, finalSequence); } } @Override public String toString() { return "RingBuffer{" + "bufferSize=" + bufferSize + ", sequencer=" + sequencer + "}"; } }