/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

import com.lmax.disruptor.util.Util;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

WorkerPool contains a pool of WorkProcessors that will consume sequences so jobs can be farmed out across a pool of workers. Each of the WorkProcessors manage and calls a WorkHandler to process the events.
Type parameters:
  • <T> – event to be processed by a pool of workers
/** * WorkerPool contains a pool of {@link WorkProcessor}s that will consume sequences so jobs can be farmed out across a pool of workers. * Each of the {@link WorkProcessor}s manage and calls a {@link WorkHandler} to process the events. * * @param <T> event to be processed by a pool of workers */
public final class WorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; // WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors;
Create a worker pool to enable an array of WorkHandlers to consume published sequences.

This option requires a pre-configured RingBuffer which must have RingBuffer.addGatingSequences(Sequence...) called before the work pool is started.

Params:
  • ringBuffer – of events to be consumed.
  • sequenceBarrier – on which the workers will depend.
  • exceptionHandler – to callback when an error occurs which is not handled by the WorkHandlers.
  • workHandlers – to distribute the work load across.
/** * Create a worker pool to enable an array of {@link WorkHandler}s to consume published sequences. * <p> * This option requires a pre-configured {@link RingBuffer} which must have {@link RingBuffer#addGatingSequences(Sequence...)} * called before the work pool is started. * * @param ringBuffer of events to be consumed. * @param sequenceBarrier on which the workers will depend. * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. * @param workHandlers to distribute the work load across. */
@SafeVarargs public WorkerPool( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence); } }
Construct a work pool with an internal RingBuffer for convenience.

This option does not require RingBuffer.addGatingSequences(Sequence...) to be called before the work pool is started.

Params:
  • eventFactory – for filling the RingBuffer
  • exceptionHandler – to callback when an error occurs which is not handled by the WorkHandlers.
  • workHandlers – to distribute the work load across.
/** * Construct a work pool with an internal {@link RingBuffer} for convenience. * <p> * This option does not require {@link RingBuffer#addGatingSequences(Sequence...)} to be called before the work pool is started. * * @param eventFactory for filling the {@link RingBuffer} * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. * @param workHandlers to distribute the work load across. */
@SafeVarargs public WorkerPool( final EventFactory<T> eventFactory, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { ringBuffer = RingBuffer.createMultiProducer(eventFactory, 1024, new BlockingWaitStrategy()); final SequenceBarrier barrier = ringBuffer.newBarrier(); final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( ringBuffer, barrier, workHandlers[i], exceptionHandler, workSequence); } ringBuffer.addGatingSequences(getWorkerSequences()); }
Get an array of Sequences representing the progress of the workers.
Returns:an array of Sequences representing the progress of the workers.
/** * Get an array of {@link Sequence}s representing the progress of the workers. * * @return an array of {@link Sequence}s representing the progress of the workers. */
public Sequence[] getWorkerSequences() { final Sequence[] sequences = new Sequence[workProcessors.length + 1]; for (int i = 0, size = workProcessors.length; i < size; i++) { sequences[i] = workProcessors[i].getSequence(); } sequences[sequences.length - 1] = workSequence; return sequences; }
Start the worker pool processing events in sequence.
Params:
  • executor – providing threads for running the workers.
Throws:
Returns:the RingBuffer used for the work queue.
/** * Start the worker pool processing events in sequence. * * @param executor providing threads for running the workers. * @return the {@link RingBuffer} used for the work queue. * @throws IllegalStateException if the pool has already been started and not halted yet */
public RingBuffer<T> start(final Executor executor) { if (!started.compareAndSet(false, true)) { throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted."); } final long cursor = ringBuffer.getCursor(); workSequence.set(cursor); for (WorkProcessor<?> processor : workProcessors) { processor.getSequence().set(cursor); executor.execute(processor); } return ringBuffer; }
Wait for the RingBuffer to drain of published events then halt the workers.
/** * Wait for the {@link RingBuffer} to drain of published events then halt the workers. */
public void drainAndHalt() { Sequence[] workerSequences = getWorkerSequences(); while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) { Thread.yield(); } for (WorkProcessor<?> processor : workProcessors) { processor.halt(); } started.set(false); }
Halt all workers immediately at the end of their current cycle.
/** * Halt all workers immediately at the end of their current cycle. */
public void halt() { for (WorkProcessor<?> processor : workProcessors) { processor.halt(); } started.set(false); } public boolean isRunning() { return started.get(); } }