/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.utils.JVMStabilityInspector;

final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
{
    private static final Logger logger = LoggerFactory.getLogger(SEPWorker.class);
    private static final boolean SET_THREAD_NAME = Boolean.parseBoolean(System.getProperty("cassandra.set_sep_thread_name", "true"));

    final Long workerId;
    final Thread thread;
    final SharedExecutorPool pool;

    // prevStopCheck stores the value of pool.stopCheck after we last incremented it; if it hasn't changed,
    // we know nobody else was spinning in the interval, so we increment our soleSpinnerSpinTime accordingly,
    // and otherwise we set it to zero; this is then used to terminate the final spinning thread, as the coordinated
    // strategy can only work when there are multiple threads spinning (as more sleep time must elapse than real time)
    long prevStopCheck = 0;
    long soleSpinnerSpinTime = 0;

    SEPWorker(Long workerId, Work initialState, SharedExecutorPool pool)
    {
        this.pool = pool;
        this.workerId = workerId;
        thread = new FastThreadLocalThread(this, pool.poolName + "-Worker-" + workerId);
        thread.setDaemon(true);
        set(initialState);
        thread.start();
    }

    public void run()
    {
        /**
         * we maintain two important invariants:
         * 1)   after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed
         *      promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to
         *      avoid signalling if there are _any_ spinning threads. we achieve this by simply calling maybeSchedule()
         *      on each queue if on decrementing the spin counter we hit zero.
         * 2)   before processing a task on a given queue, we attempt to assign another worker to the _same queue only_;
         *      this allows a producer to skip signalling work if the task queue is currently non-empty, and in conjunction
         *      with invariant (1) ensures that if any thread was spinning when a task was added to any executor, that
         *      task will be processed immediately if work permits are available
         */

        SEPExecutor assigned = null;
        Runnable task = null;
        try
        {
            while (true)
            {
                if (pool.shuttingDown)
                    return;

                if (isSpinning() && !selfAssign())
                {
                    doWaitSpin();
                    // if the pool is terminating, but we have been assigned STOP_SIGNALLED, if we do not re-check
                    // whether the pool is shutting down this thread will go to sleep and block forever
                    continue;
                }

                // if stop was signalled, go to sleep (don't try self-assign; being put to sleep is rare, so let's obey it
                // whenever we receive it - though we don't apply this constraint to producers, who may reschedule us before
                // we go to sleep)
                if (stop())
                    while (isStopped())
                        LockSupport.park();

                // we can be assigned any state from STOPPED, so loop if we don't actually have any tasks assigned
                assigned = get().assigned;
                if (assigned == null)
                    continue;
                if (SET_THREAD_NAME)
                    Thread.currentThread().setName(assigned.name + "-" + workerId);
                task = assigned.tasks.poll();

                // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
                // (which is also a state that will never be interrupted externally)
                set(Work.WORKING);
                while (true)
                {
                    // before we process any task, we maybe schedule a new worker _to our executor only_; this
                    // ensures that even once all spinning threads have found work, if more work is left to be serviced
                    // and permits are available, it will be dealt with immediately.
                    assigned.maybeSchedule();

                    // we know there is work waiting, as we have a work permit, so poll() will always succeed
                    task.run();
                    task = null;

                    // if we're shutting down, or we fail to take a permit, we don't perform any more work
                    if (!assigned.takeTaskPermit())
                        break;
                    task = assigned.tasks.poll();
                }

                // return our work permit, and maybe signal shutdown
                assigned.returnWorkPermit();
                assigned = null;

                // try to immediately reassign ourselves some work; if we fail, start spinning
                if (!selfAssign())
                    startSpinning();
            }
        }
        catch (Throwable t)
        {
            JVMStabilityInspector.inspectThrowable(t);
            if (task != null)
                logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
            else
                logger.error("Unexpected exception killed worker: {}", t);
        }
        finally
        {
            if (assigned != null)
                assigned.returnWorkPermit();

            do
            {
                if (get().assigned != null)
                {
                    get().assigned.returnWorkPermit();
                    set(Work.WORKING);
                }
            } while (!assign(Work.STOPPED, true));
        }
    }

    // try to assign this worker the provided work
    // valid states to assign are SPINNING, STOP_SIGNALLED, (ASSIGNED);
    // restores invariants of the various states (e.g. spinningCount, descheduled collection and thread park status)
    boolean assign(Work work, boolean self)
    {
        Work state = get();
        while (state.canAssign(self))
        {
            if (!compareAndSet(state, work))
            {
                state = get();
                continue;
            }
            // if we were spinning, exit the state (decrement the count); this is valid even if we are already spinning,
            // as the assigning thread will have incremented the spinningCount
            if (state.isSpinning())
                stopSpinning();

            // if we're being descheduled, place ourselves in the descheduled collection
            if (work.isStop())
            {
                pool.descheduled.put(workerId, this);
                if (pool.shuttingDown)
                    return true;
            }

            // if we're currently stopped, and the new state is not a stop signal
            // (which we can immediately convert to stopped), unpark the worker
            if (state.isStopped() && (!work.isStop() || !stop()))
                LockSupport.unpark(thread);
            return true;
        }
        return false;
    }

    // try to assign ourselves an executor with work available
    private boolean selfAssign()
    {
        // if we aren't permitted to assign in this state, fail
        if (!get().canAssign(true))
            return false;
        for (SEPExecutor exec : pool.executors)
        {
            if (exec.takeWorkPermit(true))
            {
                Work work = new Work(exec);
                // we successfully started work on this executor, so we must either assign it to ourselves or ...
                if (assign(work, true))
                    return true;
                // ... if we fail, schedule it to another worker
                pool.schedule(work);
                // and return success as we must have already been assigned a task
                assert get().assigned != null;
                return true;
            }
        }
        return false;
    }

    // we can only call this when our state is WORKING, and no other thread may change our state in this case;
    // so in this case only we do not need to CAS. We increment the spinningCount and add ourselves to the spinning
    // collection at the same time
    private void startSpinning()
    {
        assert get() == Work.WORKING;
        pool.spinningCount.incrementAndGet();
        set(Work.SPINNING);
    }

    // exit the spinning state; if there are no remaining spinners, we immediately try and schedule work for all executors
    // so that any producer is safe to not spin up a worker when they see a spinning thread (invariant (1) above)
    private void stopSpinning()
    {
        if (pool.spinningCount.decrementAndGet() == 0)
            for (SEPExecutor executor : pool.executors)
                executor.maybeSchedule();
        prevStopCheck = soleSpinnerSpinTime = 0;
    }

    // perform a sleep-spin, incrementing pool.stopCheck accordingly
    private void doWaitSpin()
    {
        // pick a random sleep interval based on the number of threads spinning, so that
        // we should always have a thread about to wake up, but most threads are sleeping
        long sleep = 10000L * pool.spinningCount.get();
        sleep = Math.min(1000000, sleep);
        sleep *= Math.random();
        sleep = Math.max(10000, sleep);

        long start = System.nanoTime();

        // place ourselves in the spinning collection; if we clash with another thread just exit
        Long target = start + sleep;
        if (pool.spinning.putIfAbsent(target, this) != null)
            return;
        LockSupport.parkNanos(sleep);

        // remove ourselves (if haven't been already) - we should be at or near the front, so should be cheap-ish
        pool.spinning.remove(target, this);

        // finish timing and grab spinningTime (before we finish timing so it is under rather than overestimated)
        long end = System.nanoTime();
        long spin = end - start;
        long stopCheck = pool.stopCheck.addAndGet(spin);
        maybeStop(stopCheck, end);
        if (prevStopCheck + spin == stopCheck)
            soleSpinnerSpinTime += spin;
        else
            soleSpinnerSpinTime = 0;
        prevStopCheck = stopCheck;
    }

    private static final long stopCheckInterval = TimeUnit.MILLISECONDS.toNanos(10L);

    // stops a worker if elapsed real time is less than elapsed spin time, as this implies the equivalent of
    // at least one worker achieved nothing in the interval. we achieve this by maintaining a stopCheck which
    // is initialised to a negative offset from realtime; as we spin we add to this value, and if we ever exceed
    // realtime we have spun too much and deschedule; if we get too far behind realtime, we reset to our initial offset
    private void maybeStop(long stopCheck, long now)
    {
        long delta = now - stopCheck;
        if (delta <= 0)
        {
            // if stopCheck has caught up with present, we've been spinning too much, so if we can atomically
            // set it to the past again, we should stop a worker
            if (pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
            {
                // try and stop ourselves;
                // if we've already been assigned work stop another worker
                if (!assign(Work.STOP_SIGNALLED, true))
                    pool.schedule(Work.STOP_SIGNALLED);
            }
        }
        else if (soleSpinnerSpinTime > stopCheckInterval && pool.spinningCount.get() == 1)
        {
            // permit self-stopping
            assign(Work.STOP_SIGNALLED, true);
        }
        else
        {
            // if stop check has gotten too far behind present, update it so new spins can affect it
            while (delta > stopCheckInterval * 2 && !pool.stopCheck.compareAndSet(stopCheck, now - stopCheckInterval))
            {
                stopCheck = pool.stopCheck.get();
                delta = now - stopCheck;
            }
        }
    }

    private boolean isSpinning()
    {
        return get().isSpinning();
    }

    private boolean stop()
    {
        return get().isStop() && compareAndSet(Work.STOP_SIGNALLED, Work.STOPPED);
    }

    private boolean isStopped()
    {
        return get().isStopped();
    }

    
Represents, and communicates changes to, a worker's work state - there are three non-actively-working states (STOP_SIGNALLED, STOPPED, AND SPINNING) and two working states: WORKING, and (ASSIGNED), the last being represented by a non-static instance with its "assigned" executor set. STOPPED: indicates the worker is descheduled, and whilst accepts work in this state (causing it to be rescheduled) it will generally not be considered for work until all other worker threads are busy. In this state we should be present in the pool.descheduled collection, and should be parked -> (ASSIGNED)|SPINNING STOP_SIGNALLED: the worker has been asked to deschedule itself, but has not yet done so; only entered from a SPINNING state, and generally communicated to itself, but maybe set from any worker. this state may be preempted and replaced with (ASSIGNED) or SPINNING In this state we should be present in the pool.descheduled collection -> (ASSIGNED)|STOPPED|SPINNING SPINNING: indicates the worker has no work to perform, so is performing a friendly wait-based-spinning until it either is (ASSIGNED) some work (by itself or another thread), or sent STOP_SIGNALLED In this state we _may_ be in the pool.spinning collection (but only if we are in the middle of a sleep) -> (ASSIGNED)|STOP_SIGNALLED|SPINNING (ASSIGNED): asks the worker to perform some work against the specified executor, and preassigns a task permit from that executor so that in this state there is always work to perform. In general a worker assigns itself this state, but sometimes it may assign another worker the state either if there is work outstanding and no-spinning threads, or there is a race to self-assign -> WORKING WORKING: indicates the worker is actively processing an executor's task queue; in this state it accepts no state changes/communications, except from itself; it usually exits this mode into SPINNING, but if work is immediately available on another executor it self-triggers (ASSIGNED) -> SPINNING|(ASSIGNED)
/** * Represents, and communicates changes to, a worker's work state - there are three non-actively-working * states (STOP_SIGNALLED, STOPPED, AND SPINNING) and two working states: WORKING, and (ASSIGNED), the last * being represented by a non-static instance with its "assigned" executor set. * * STOPPED: indicates the worker is descheduled, and whilst accepts work in this state (causing it to * be rescheduled) it will generally not be considered for work until all other worker threads are busy. * In this state we should be present in the pool.descheduled collection, and should be parked * -> (ASSIGNED)|SPINNING * STOP_SIGNALLED: the worker has been asked to deschedule itself, but has not yet done so; only entered from a SPINNING * state, and generally communicated to itself, but maybe set from any worker. this state may be preempted * and replaced with (ASSIGNED) or SPINNING * In this state we should be present in the pool.descheduled collection * -> (ASSIGNED)|STOPPED|SPINNING * SPINNING: indicates the worker has no work to perform, so is performing a friendly wait-based-spinning * until it either is (ASSIGNED) some work (by itself or another thread), or sent STOP_SIGNALLED * In this state we _may_ be in the pool.spinning collection (but only if we are in the middle of a sleep) * -> (ASSIGNED)|STOP_SIGNALLED|SPINNING * (ASSIGNED): asks the worker to perform some work against the specified executor, and preassigns a task permit * from that executor so that in this state there is always work to perform. * In general a worker assigns itself this state, but sometimes it may assign another worker the state * either if there is work outstanding and no-spinning threads, or there is a race to self-assign * -> WORKING * WORKING: indicates the worker is actively processing an executor's task queue; in this state it accepts * no state changes/communications, except from itself; it usually exits this mode into SPINNING, * but if work is immediately available on another executor it self-triggers (ASSIGNED) * -> SPINNING|(ASSIGNED) */
static final class Work { static final Work STOP_SIGNALLED = new Work(); static final Work STOPPED = new Work(); static final Work SPINNING = new Work(); static final Work WORKING = new Work(); final SEPExecutor assigned; Work(SEPExecutor executor) { this.assigned = executor; } private Work() { this.assigned = null; } boolean canAssign(boolean self) { // we can assign work if there isn't new work already assigned and either // 1) we are assigning to ourselves // 2) the worker we are assigning to is not already in the middle of WORKING return assigned == null && (self || !isWorking()); } boolean isSpinning() { return this == Work.SPINNING; } boolean isWorking() { return this == Work.WORKING; } boolean isStop() { return this == Work.STOP_SIGNALLED; } boolean isStopped() { return this == Work.STOPPED; } boolean isAssigned() { return assigned != null; } } }