/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.cassandra.metrics.SEPMetrics;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.utils.concurrent.WaitQueue;

import static org.apache.cassandra.concurrent.SEPWorker.Work;

public class SEPExecutor extends AbstractLocalAwareExecutorService
{
    private final SharedExecutorPool pool;

    public final int maxWorkers;
    public final String name;
    private final int maxTasksQueued;
    private final SEPMetrics metrics;

    // stores both a set of work permits and task permits:
    //  bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued]   (initially 0)
    //  top 32 bits are number of work permits available in the range [0..maxWorkers]   (initially maxWorkers)
    private final AtomicLong permits = new AtomicLong();

    // producers wait on this when there is no room on the queue
    private final WaitQueue hasRoom = new WaitQueue();
    private final AtomicLong completedTasks = new AtomicLong();

    volatile boolean shuttingDown = false;
    final SimpleCondition shutdown = new SimpleCondition();

    // TODO: see if other queue implementations might improve throughput
    protected final ConcurrentLinkedQueue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();

    SEPExecutor(SharedExecutorPool pool, int maxWorkers, int maxTasksQueued, String jmxPath, String name)
    {
        this.pool = pool;
        this.name = name;
        this.maxWorkers = maxWorkers;
        this.maxTasksQueued = maxTasksQueued;
        this.permits.set(combine(0, maxWorkers));
        this.metrics = new SEPMetrics(this, jmxPath, name);
    }

    protected void onCompletion()
    {
        completedTasks.incrementAndGet();
    }

    // schedules another worker for this pool if there is work outstanding and there are no spinning threads that
    // will self-assign to it in the immediate future
    boolean maybeSchedule()
    {
        if (pool.spinningCount.get() > 0 || !takeWorkPermit(true))
            return false;

        pool.schedule(new Work(this));
        return true;
    }

    protected void addTask(FutureTask<?> task)
    {
        // we add to the queue first, so that when a worker takes a task permit it can be certain there is a task available
        // this permits us to schedule threads non-spuriously; it also means work is serviced fairly
        tasks.add(task);
        int taskPermits;
        while (true)
        {
            long current = permits.get();
            taskPermits = taskPermits(current);
            // because there is no difference in practical terms between the work permit being added or not (the work is already in existence)
            // we always add our permit, but block after the fact if we breached the queue limit
            if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits + 1)))
                break;
        }

        if (taskPermits == 0)
        {
            // we only need to schedule a thread if there are no tasks already waiting to be processed, as
            // the original enqueue will have started a thread to service its work which will have itself
            // spawned helper workers that would have either exhausted the available tasks or are still being spawned.
            // to avoid incurring any unnecessary signalling penalties we also do not take any work to hand to the new
            // worker, we simply start a worker in a spinning state
            pool.maybeStartSpinningWorker();
        }
        else if (taskPermits >= maxTasksQueued)
        {
            // register to receive a signal once a task is processed bringing the queue below its threshold
            WaitQueue.Signal s = hasRoom.register();

            // we will only be signalled once the queue drops below full, so this creates equivalent external behaviour
            // however the advantage is that we never wake-up spuriously;
            // we choose to always sleep, even if in the intervening time the queue has dropped below limit,
            // so long as we _will_ eventually receive a signal
            if (taskPermits(permits.get()) > maxTasksQueued)
            {
                // if we're blocking, we might as well directly schedule a worker if we aren't already at max
                if (takeWorkPermit(true))
                    pool.schedule(new Work(this));

                metrics.totalBlocked.inc();
                metrics.currentBlocked.inc();
                s.awaitUninterruptibly();
                metrics.currentBlocked.dec();
            }
            else // don't propagate our signal when we cancel, just cancel
                s.cancel();
        }
    }

    // takes permission to perform a task, if any are available; once taken it is guaranteed
    // that a proceeding call to tasks.poll() will return some work
    boolean takeTaskPermit()
    {
        while (true)
        {
            long current = permits.get();
            int taskPermits = taskPermits(current);
            if (taskPermits == 0)
                return false;
            if (permits.compareAndSet(current, updateTaskPermits(current, taskPermits - 1)))
            {
                if (taskPermits == maxTasksQueued && hasRoom.hasWaiters())
                    hasRoom.signalAll();
                return true;
            }
        }
    }

    // takes a worker permit and (optionally) a task permit simultaneously; if one of the two is unavailable, returns false
    boolean takeWorkPermit(boolean takeTaskPermit)
    {
        int taskDelta = takeTaskPermit ? 1 : 0;
        while (true)
        {
            long current = permits.get();
            int workPermits = workPermits(current);
            int taskPermits = taskPermits(current);
            if (workPermits == 0 || taskPermits == 0)
                return false;
            if (permits.compareAndSet(current, combine(taskPermits - taskDelta, workPermits - 1)))
            {
                if (takeTaskPermit && taskPermits == maxTasksQueued && hasRoom.hasWaiters())
                    hasRoom.signalAll();
                return true;
            }
        }
    }

    // gives up a work permit
    void returnWorkPermit()
    {
        while (true)
        {
            long current = permits.get();
            int workPermits = workPermits(current);
            if (permits.compareAndSet(current, updateWorkPermits(current, workPermits + 1)))
            {
                if (shuttingDown && workPermits + 1 == maxWorkers)
                    shutdown.signalAll();
                break;
            }
        }
    }

    public void maybeExecuteImmediately(Runnable command)
    {
        FutureTask<?> ft = newTaskFor(command, null);
        if (!takeWorkPermit(false))
        {
            addTask(ft);
        }
        else
        {
            try
            {
                ft.run();
            }
            finally
            {
                returnWorkPermit();
                // we have to maintain our invariant of always scheduling after any work is performed
                // in this case in particular we are not processing the rest of the queue anyway, and so
                // the work permit may go wasted if we don't immediately attempt to spawn another worker
                maybeSchedule();
            }
        }
    }

    public synchronized void shutdown()
    {
        shuttingDown = true;
        pool.executors.remove(this);
        if (getActiveCount() == 0 && getPendingTasks() == 0)
            shutdown.signalAll();

        // release metrics
        metrics.release();
    }

    public synchronized List<Runnable> shutdownNow()
    {
        shutdown();
        List<Runnable> aborted = new ArrayList<>();
        while (takeTaskPermit())
            aborted.add(tasks.poll());
        if (getActiveCount() == 0)
            shutdown.signalAll();
        return aborted;
    }

    public boolean isShutdown()
    {
        return shuttingDown;
    }

    public boolean isTerminated()
    {
        return shuttingDown && shutdown.isSignaled();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
    {
        shutdown.await(timeout, unit);
        return isTerminated();
    }

    public long getPendingTasks()
    {
        return taskPermits(permits.get());
    }

    public long getCompletedTasks()
    {
        return completedTasks.get();
    }

    public int getActiveCount()
    {
        return maxWorkers - workPermits(permits.get());
    }

    private static int taskPermits(long both)
    {
        return (int) both;
    }

    private static int workPermits(long both)
    {
        return (int) (both >>> 32);
    }

    private static long updateTaskPermits(long prev, int taskPermits)
    {
        return (prev & (-1L << 32)) | taskPermits;
    }

    private static long updateWorkPermits(long prev, int workPermits)
    {
        return (((long) workPermits) << 32) | (prev & (-1L >>> 32));
    }

    private static long combine(int taskPermits, int workPermits)
    {
        return (((long) workPermits) << 32) | taskPermits;
    }
}