/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor.dsl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;

import java.util.Arrays;

A group of EventProcessors used as part of the Disruptor.
Type parameters:
  • <T> – the type of entry used by the event processors.
/** * A group of {@link EventProcessor}s used as part of the {@link Disruptor}. * * @param <T> the type of entry used by the event processors. */
public class EventHandlerGroup<T> { private final Disruptor<T> disruptor; private final ConsumerRepository<T> consumerRepository; private final Sequence[] sequences; EventHandlerGroup( final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); }
Create a new event handler group that combines the consumers in this group with otherHandlerGroup.
Params:
  • otherHandlerGroup – the event handler group to combine.
Returns:a new EventHandlerGroup combining the existing and new consumers into a single dependency group.
/** * Create a new event handler group that combines the consumers in this group with <tt>otherHandlerGroup</tt>. * * @param otherHandlerGroup the event handler group to combine. * @return a new EventHandlerGroup combining the existing and new consumers into a single dependency group. */
public EventHandlerGroup<T> and(final EventHandlerGroup<T> otherHandlerGroup) { final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length]; System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length); System.arraycopy( otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length); return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences); }
Create a new event handler group that combines the handlers in this group with processors.
Params:
  • processors – the processors to combine.
Returns:a new EventHandlerGroup combining the existing and new processors into a single dependency group.
/** * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>. * * @param processors the processors to combine. * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group. */
public EventHandlerGroup<T> and(final EventProcessor... processors) { Sequence[] combinedSequences = new Sequence[sequences.length + processors.length]; for (int i = 0; i < processors.length; i++) { consumerRepository.add(processors[i]); combinedSequences[i] = processors[i].getSequence(); } System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length); return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences); }

Set up batch handlers to consume events from the ring buffer. These handlers will only process events after every EventProcessor in this group has processed the event.

This method is generally used as part of a chain. For example if the handler A must process events before handler B:

dw.handleEventsWith(A).then(B);
Params:
  • handlers – the batch handlers that will process events.
Returns:a EventHandlerGroup that can be used to set up a event processor barrier over the created event processors.
/** * <p>Set up batch handlers to consume events from the ring buffer. These handlers will only process events * after every {@link EventProcessor} in this group has processed the event.</p> * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before handler <code>B</code>:</p> * * <pre><code>dw.handleEventsWith(A).then(B);</code></pre> * * @param handlers the batch handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */
@SafeVarargs public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) { return handleEventsWith(handlers); }

Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start these processors when Disruptor.start() is called.

This method is generally used as part of a chain. For example if the handler A must process events before handler B:

Params:
  • eventProcessorFactories – the event processor factories to use to create the event processors that will process events.
Returns:a EventHandlerGroup that can be used to chain dependencies.
/** * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will * automatically start these processors when {@link Disruptor#start()} is called.</p> * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before handler <code>B</code>:</p> * * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */
@SafeVarargs public final EventHandlerGroup<T> then(final EventProcessorFactory<T>... eventProcessorFactories) { return handleEventsWith(eventProcessorFactories); }

Set up a worker pool to handle events from the ring buffer. The worker pool will only process events after every EventProcessor in this group has processed the event. Each event will be processed by one of the work handler instances.

This method is generally used as part of a chain. For example if the handler A must process events before the worker pool with handlers B, C:

dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);
Params:
  • handlers – the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
Returns:a EventHandlerGroup that can be used to set up a event processor barrier over the created event processors.
/** * <p>Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances.</p> * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * * <pre><code>dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */
@SafeVarargs public final EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers) { return handleEventsWithWorkerPool(handlers); }

Set up batch handlers to handle events from the ring buffer. These handlers will only process events after every EventProcessor in this group has processed the event.

This method is generally used as part of a chain. For example if A must process events before B:

dw.after(A).handleEventsWith(B);
Params:
  • handlers – the batch handlers that will process events.
Returns:a EventHandlerGroup that can be used to set up a event processor barrier over the created event processors.
/** * <p>Set up batch handlers to handle events from the ring buffer. These handlers will only process events * after every {@link EventProcessor} in this group has processed the event.</p> * * <p>This method is generally used as part of a chain. For example if <code>A</code> must * process events before <code>B</code>:</p> * * <pre><code>dw.after(A).handleEventsWith(B);</code></pre> * * @param handlers the batch handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */
@SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); }

Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start these processors when Disruptor.start() is called.

This method is generally used as part of a chain. For example if A must process events before B:

dw.after(A).handleEventsWith(B);
Params:
  • eventProcessorFactories – the event processor factories to use to create the event processors that will process events.
Returns:a EventHandlerGroup that can be used to chain dependencies.
/** * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will * automatically start these processors when {@link Disruptor#start()} is called.</p> * * <p>This method is generally used as part of a chain. For example if <code>A</code> must * process events before <code>B</code>:</p> * * <pre><code>dw.after(A).handleEventsWith(B);</code></pre> * * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */
@SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories) { return disruptor.createEventProcessors(sequences, eventProcessorFactories); }

Set up a worker pool to handle events from the ring buffer. The worker pool will only process events after every EventProcessor in this group has processed the event. Each event will be processed by one of the work handler instances.

This method is generally used as part of a chain. For example if the handler A must process events before the worker pool with handlers B, C:

dw.after(A).handleEventsWithWorkerPool(B, C);
Params:
  • handlers – the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
Returns:a EventHandlerGroup that can be used to set up a event processor barrier over the created event processors.
/** * <p>Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances.</p> * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * * <pre><code>dw.after(A).handleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */
@SafeVarargs public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers) { return disruptor.createWorkerPool(sequences, handlers); }
Create a dependency barrier for the processors in this group. This allows custom event processors to have dependencies on BatchEventProcessors created by the disruptor.
Returns:a SequenceBarrier including all the processors in this group.
/** * Create a dependency barrier for the processors in this group. * This allows custom event processors to have dependencies on * {@link com.lmax.disruptor.BatchEventProcessor}s created by the disruptor. * * @return a {@link SequenceBarrier} including all the processors in this group. */
public SequenceBarrier asSequenceBarrier() { return disruptor.getRingBuffer().newBarrier(sequences); } }