/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package reactor.util.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongSupplier;


Strategy employed to wait for specific LongSupplier values with various spinning strategies.
/** * Strategy employed to wait for specific {@link LongSupplier} values with various spinning strategies. */
public abstract class WaitStrategy {
A no-op Runnable that can be used as a placeholder spin observer with waitFor(long, LongSupplier, Runnable)
/** * A no-op {@link Runnable} that can be used as a placeholder spin observer with * {@link #waitFor(long, LongSupplier, Runnable)} */
public static final Runnable NOOP_SPIN_OBSERVER = () -> { };
Blocking strategy that uses a lock and condition variable for consumer waiting on a barrier. This strategy can be used when throughput and low-latency are not as important as CPU resource.
Returns:the wait strategy
/** * Blocking strategy that uses a lock and condition variable for consumer waiting on a barrier. * * This strategy can be used when throughput and low-latency are not as important as CPU resource. * @return the wait strategy */
public static WaitStrategy blocking() { return new Blocking(); }
Busy Spin strategy that uses a busy spin loop for consumers waiting on a barrier. This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best used when threads can be bound to specific CPU cores.
Returns:the wait strategy
/** * Busy Spin strategy that uses a busy spin loop for consumers waiting on a barrier. * * This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best * used when threads can be bound to specific CPU cores. * @return the wait strategy */
public static WaitStrategy busySpin() { return BusySpin.INSTANCE; }
Test if exception is alert
Params:
  • t – exception checked
Returns:true if this is an alert signal
/** * Test if exception is alert * @param t exception checked * @return true if this is an alert signal */
public static boolean isAlert(Throwable t){ return t == AlertException.INSTANCE; }
Variation of the blocking() that attempts to elide conditional wake-ups when the lock is uncontended. Shows performance improvements on microbenchmarks. However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code.
Returns:the wait strategy
/** * Variation of the {@link #blocking()} that attempts to elide conditional wake-ups when the lock is uncontended. * Shows performance improvements on microbenchmarks. However this wait strategy should be considered experimental * as I have not full proved the correctness of the lock elision code. * @return the wait strategy */
public static WaitStrategy liteBlocking() { return new LiteBlocking(); }
Parking strategy that initially spins, then uses a Thread.yield(), and eventually sleep (LockSupport.parkNanos(1)) for the minimum number of nanos the OS and JVM will allow while the consumers are waiting on a barrier.

This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet periods.

Returns:the wait strategy
/** * Parking strategy that initially spins, then uses a Thread.yield(), and eventually sleep * (<code>LockSupport.parkNanos(1)</code>) for the minimum number of nanos the OS and JVM will allow while the * consumers are waiting on a barrier. * <p> * This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet * periods. * @return the wait strategy */
public static WaitStrategy parking() { return Parking.INSTANCE; }
Parking strategy that initially spins, then uses a Thread.yield(), and eventually sleep (LockSupport.parkNanos(1)) for the minimum number of nanos the OS and JVM will allow while the consumers are waiting on a barrier.

This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet periods.

Params:
  • retries – the spin cycle count before parking
Returns:the wait strategy
/** * Parking strategy that initially spins, then uses a Thread.yield(), and eventually * sleep (<code>LockSupport.parkNanos(1)</code>) for the minimum number of nanos the * OS and JVM will allow while the consumers are waiting on a barrier. * <p> * This strategy is a good compromise between performance and CPU resource. Latency * spikes can occur after quiet periods. * * @param retries the spin cycle count before parking * @return the wait strategy */
public static WaitStrategy parking(int retries) { return new Parking(retries); }

Phased wait strategy for waiting consumers on a barrier.

This strategy can be used when throughput and low-latency are not as important as CPU resource. Spins, then yields, then waits using the configured fallback WaitStrategy.

Params:
  • spinTimeout – the spin timeout
  • yieldTimeout – the yield timeout
  • units – the time unit
  • delegate – the target wait strategy to fallback on
Returns:the wait strategy
/** * <p>Phased wait strategy for waiting consumers on a barrier.</p> * <p> * <p>This strategy can be used when throughput and low-latency are not as important as CPU resource. Spins, then * yields, then waits using the configured fallback WaitStrategy.</p> * * @param spinTimeout the spin timeout * @param yieldTimeout the yield timeout * @param units the time unit * @param delegate the target wait strategy to fallback on * @return the wait strategy */
public static WaitStrategy phasedOff(long spinTimeout, long yieldTimeout, TimeUnit units, WaitStrategy delegate) { return new PhasedOff(spinTimeout, yieldTimeout, units, delegate); }
Block with wait/notifyAll semantics
Params:
  • spinTimeout – the spin timeout
  • yieldTimeout – the yield timeout
  • units – the time unit
Returns:the wait strategy
/** * Block with wait/notifyAll semantics * @param spinTimeout the spin timeout * @param yieldTimeout the yield timeout * @param units the time unit * @return the wait strategy */
public static WaitStrategy phasedOffLiteLock(long spinTimeout, long yieldTimeout, TimeUnit units) { return phasedOff(spinTimeout, yieldTimeout, units, liteBlocking()); }
Block with wait/notifyAll semantics
Params:
  • spinTimeout – the spin timeout
  • yieldTimeout – the yield timeout
  • units – the time unit
Returns:the wait strategy
/** * Block with wait/notifyAll semantics * @param spinTimeout the spin timeout * @param yieldTimeout the yield timeout * @param units the time unit * @return the wait strategy */
public static WaitStrategy phasedOffLock(long spinTimeout, long yieldTimeout, TimeUnit units) { return phasedOff(spinTimeout, yieldTimeout, units, blocking()); }
Block by parking in a loop
Params:
  • spinTimeout – the spin timeout
  • yieldTimeout – the yield timeout
  • units – the time unit
Returns:the wait strategy
/** * Block by parking in a loop * @param spinTimeout the spin timeout * @param yieldTimeout the yield timeout * @param units the time unit * @return the wait strategy */
public static WaitStrategy phasedOffSleep(long spinTimeout, long yieldTimeout, TimeUnit units) { return phasedOff(spinTimeout, yieldTimeout, units, parking(0)); }
Yielding strategy that uses a Thread.sleep(1) for consumers waiting on a barrier after an initially spinning. This strategy will incur up a latency of 1ms and save a maximum CPU resources.
Returns:the wait strategy
/** * Yielding strategy that uses a Thread.sleep(1) for consumers waiting on a * barrier * after an initially spinning. * * This strategy will incur up a latency of 1ms and save a maximum CPU resources. * @return the wait strategy */
public static WaitStrategy sleeping() { return Sleeping.INSTANCE; }
Yielding strategy that uses a Thread.yield() for consumers waiting on a barrier after an initially spinning. This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
Returns:the wait strategy
/** * Yielding strategy that uses a Thread.yield() for consumers waiting on a * barrier * after an initially spinning. * * This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes. * @return the wait strategy */
public static WaitStrategy yielding() { return Yielding.INSTANCE; }
Implementations should signal the waiting consumers that the cursor has advanced.
/** * Implementations should signal the waiting consumers that the cursor has advanced. */
public void signalAllWhenBlocking() { }
Wait for the given sequence to be available. It is possible for this method to return a value less than the sequence number supplied depending on the implementation of the WaitStrategy. A common use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications about message becoming available should remember to handle this case.
Params:
  • sequence – to be waited on.
  • cursor – the main sequence from ringbuffer. Wait/notify strategies will need this as is notified upon update.
  • spinObserver – Spin observer
Throws:
Returns:the sequence that is available which may be greater than the requested sequence.
/** * Wait for the given sequence to be available. It is possible for this method to return a value * less than the sequence number supplied depending on the implementation of the WaitStrategy. A common * use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications * about message becoming available should remember to handle this case. * * @param sequence to be waited on. * @param cursor the main sequence from ringbuffer. Wait/notify strategies will * need this as is notified upon update. * @param spinObserver Spin observer * @return the sequence that is available which may be greater than the requested sequence. * @throws InterruptedException if the thread is interrupted. */
public abstract long waitFor(long sequence, LongSupplier cursor, Runnable spinObserver) throws InterruptedException;
Throw an Alert signal exception (singleton) that can be checked against isAlert(Throwable)
/** * Throw an Alert signal exception (singleton) that can be checked against {@link #isAlert(Throwable)} */
public static void alert(){ throw AlertException.INSTANCE; }
Used to alert consumers waiting with a WaitStrategy for status changes.

It does not fill in a stack trace for performance reasons.

/** * Used to alert consumers waiting with a {@link WaitStrategy} for status changes. * <p> * It does not fill in a stack trace for performance reasons. */
@SuppressWarnings("serial") static final class AlertException extends RuntimeException {
Pre-allocated exception to avoid garbage generation
/** Pre-allocated exception to avoid garbage generation */
public static final AlertException INSTANCE = new AlertException();
Private constructor so only a single instance any.
/** * Private constructor so only a single instance any. */
private AlertException() { }
Overridden so the stack trace is not filled in for this exception for performance reasons.
Returns:this instance.
/** * Overridden so the stack trace is not filled in for this exception for performance reasons. * * @return this instance. */
@Override public Throwable fillInStackTrace() { return this; } } final static class Blocking extends WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); @Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } @SuppressWarnings("UnusedAssignment") //for availableSequence @Override public long waitFor(long sequence, LongSupplier cursorSequence, Runnable barrier) throws InterruptedException { long availableSequence; if ((availableSequence = cursorSequence.getAsLong()) < sequence) { lock.lock(); try { while ((availableSequence = cursorSequence.getAsLong()) < sequence) { barrier.run(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = cursorSequence.getAsLong()) < sequence) { barrier.run(); } return availableSequence; } } final static class BusySpin extends WaitStrategy { static final BusySpin INSTANCE = new BusySpin(); @Override public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) throws InterruptedException { long availableSequence; while ((availableSequence = cursor.getAsLong()) < sequence) { barrier.run(); } return availableSequence; } } final static class Sleeping extends WaitStrategy { static final Sleeping INSTANCE = new Sleeping(); @Override public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) throws InterruptedException { long availableSequence; while ((availableSequence = cursor.getAsLong()) < sequence) { barrier.run(); Thread.sleep(1); } return availableSequence; } } final static class LiteBlocking extends WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); private final AtomicBoolean signalNeeded = new AtomicBoolean(false); @Override public void signalAllWhenBlocking() { if (signalNeeded.getAndSet(false)) { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } } @SuppressWarnings("UnusedAssignment") //for availableSequence @Override public long waitFor(long sequence, LongSupplier cursorSequence, Runnable barrier) throws InterruptedException { long availableSequence; if ((availableSequence = cursorSequence.getAsLong()) < sequence) { lock.lock(); try { do { signalNeeded.getAndSet(true); if ((availableSequence = cursorSequence.getAsLong()) >= sequence) { break; } barrier.run(); processorNotifyCondition.await(); } while ((availableSequence = cursorSequence.getAsLong()) < sequence); } finally { lock.unlock(); } } while ((availableSequence = cursorSequence.getAsLong()) < sequence) { barrier.run(); } return availableSequence; } } final static class PhasedOff extends WaitStrategy { private final long spinTimeoutNanos; private final long yieldTimeoutNanos; private final WaitStrategy fallbackStrategy; PhasedOff(long spinTimeout, long yieldTimeout, TimeUnit units, WaitStrategy fallbackStrategy) { this.spinTimeoutNanos = units.toNanos(spinTimeout); this.yieldTimeoutNanos = spinTimeoutNanos + units.toNanos(yieldTimeout); this.fallbackStrategy = fallbackStrategy; } @Override public void signalAllWhenBlocking() { fallbackStrategy.signalAllWhenBlocking(); } @Override public long waitFor(long sequence, LongSupplier cursor, Runnable barrier) throws InterruptedException { long availableSequence; long startTime = 0; int counter = SPIN_TRIES; do { if ((availableSequence = cursor.getAsLong()) >= sequence) { return availableSequence; } if (0 == --counter) { if (0 == startTime) { startTime = System.nanoTime(); } else { long timeDelta = System.nanoTime() - startTime; if (timeDelta > yieldTimeoutNanos) { return fallbackStrategy.waitFor(sequence, cursor, barrier); } else if (timeDelta > spinTimeoutNanos) { Thread.yield(); } } counter = SPIN_TRIES; } barrier.run(); } while (true); } private static final int SPIN_TRIES = 10000; } final static class Parking extends WaitStrategy { static final Parking INSTANCE = new Parking(); private final int retries; Parking() { this(DEFAULT_RETRIES); } Parking(int retries) { this.retries = retries; } @Override public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) throws InterruptedException { long availableSequence; int counter = retries; while ((availableSequence = cursor.getAsLong()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } private int applyWaitMethod(final Runnable barrier, int counter) throws WaitStrategy.AlertException { barrier.run(); if (counter > 100) { --counter; } else if (counter > 0) { --counter; Thread.yield(); } else { LockSupport.parkNanos(1L); } return counter; } private static final int DEFAULT_RETRIES = 200; } final static class Yielding extends WaitStrategy { static final Yielding INSTANCE = new Yielding(); @Override public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) throws InterruptedException { long availableSequence; int counter = SPIN_TRIES; while ((availableSequence = cursor.getAsLong()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } private int applyWaitMethod(final Runnable barrier, int counter) throws WaitStrategy.AlertException { barrier.run(); if (0 == counter) { Thread.yield(); } else { --counter; } return counter; } private static final int SPIN_TRIES = 100; } }