//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
An Executor using preallocated/reserved Threads from a wrapped Executor.
Calls to execute(Runnable)
on a ReservedThreadExecutor
will either succeed with a Thread immediately being assigned the Runnable task, or fail if no Thread is available.
Threads are reserved lazily, with a new reserved thread being allocated from a wrapped Executor
when an execution fails. If the setIdleTimeout(long, TimeUnit)
is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread whenever it has been idle for that period.
/**
* An Executor using preallocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available.
* <p>Threads are reserved lazily, with a new reserved thread being allocated from a
* wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)}
* is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread
* whenever it has been idle for that period.
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor
{
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private static final Runnable STOP = new Runnable()
{
@Override
public void run()
{
}
@Override
public String toString()
{
return "STOP!";
}
};
private final Executor _executor;
private final int _capacity;
private final ConcurrentLinkedDeque<ReservedThread> _stack;
private final AtomicInteger _size = new AtomicInteger();
private final AtomicInteger _pending = new AtomicInteger();
private ThreadPoolBudget.Lease _lease;
private long _idleTime = 1L;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
Params: - executor – The executor to use to obtain threads
- capacity – The number of threads to preallocate. If less than 0 then capacity
is calculated based on a heuristic from the number of available processors and
thread pool size.
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate. If less than 0 then capacity
* is calculated based on a heuristic from the number of available processors and
* thread pool size.
*/
public ReservedThreadExecutor(Executor executor, int capacity)
{
_executor = executor;
_capacity = reservedThreads(executor, capacity);
_stack = new ConcurrentLinkedDeque<>();
LOG.debug("{}", this);
}
Params: - executor – The executor to use to obtain threads
- capacity – The number of threads to preallocate, If less than 0 then capacity
is calculated based on a heuristic from the number of available processors and
thread pool size.
Returns: the number of reserved threads that would be used by a ReservedThreadExecutor
constructed with these arguments.
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate, If less than 0 then capacity
* is calculated based on a heuristic from the number of available processors and
* thread pool size.
* @return the number of reserved threads that would be used by a ReservedThreadExecutor
* constructed with these arguments.
*/
private static int reservedThreads(Executor executor, int capacity)
{
if (capacity >= 0)
return capacity;
int cpus = ProcessorUtils.availableProcessors();
if (executor instanceof ThreadPool.SizedThreadPool)
{
int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads();
return Math.max(1, Math.min(cpus, threads / 10));
}
return cpus;
}
public Executor getExecutor()
{
return _executor;
}
@ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity()
{
return _capacity;
}
@ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable()
{
return _stack.size();
}
@ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending()
{
return _pending.get();
}
@ManagedAttribute(value = "idletimeout in MS", readonly = true)
public long getIdleTimeoutMs()
{
if (_idleTimeUnit == null)
return 0;
return _idleTimeUnit.toMillis(_idleTime);
}
Set the idle timeout for shrinking the reserved thread pool
Params: - idleTime – Time to wait before shrinking, or 0 for no timeout.
- idleTimeUnit – Time units for idle timeout
/**
* Set the idle timeout for shrinking the reserved thread pool
*
* @param idleTime Time to wait before shrinking, or 0 for no timeout.
* @param idleTimeUnit Time units for idle timeout
*/
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit)
{
if (isRunning())
throw new IllegalStateException();
_idleTime = idleTime;
_idleTimeUnit = idleTimeUnit;
}
@Override
public void doStart() throws Exception
{
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity);
super.doStart();
}
@Override
public void doStop() throws Exception
{
if (_lease != null)
_lease.close();
while (true)
{
ReservedThread thread = _stack.pollFirst();
if (thread == null)
break;
_size.decrementAndGet();
thread.stop();
}
super.doStop();
}
@Override
public void execute(Runnable task) throws RejectedExecutionException
{
_executor.execute(task);
}
Params: - task – The task to run
Returns: True iff a reserved thread was available and has been assigned the task to run.
/**
* @param task The task to run
* @return True iff a reserved thread was available and has been assigned the task to run.
*/
@Override
public boolean tryExecute(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} tryExecute {}", this, task);
if (task == null)
return false;
ReservedThread thread = _stack.pollFirst();
if (thread == null && task != STOP)
{
startReservedThread();
return false;
}
int size = _size.decrementAndGet();
thread.offer(task);
if (size == 0 && task != STOP)
startReservedThread();
return true;
}
private void startReservedThread()
{
try
{
while (true)
{
// Not atomic, but there is a re-check in ReservedThread.run().
int pending = _pending.get();
int size = _size.get();
if (pending + size >= _capacity)
return;
if (_pending.compareAndSet(pending, pending + 1))
{
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);
_executor.execute(new ReservedThread());
return;
}
}
}
catch (RejectedExecutionException e)
{
LOG.ignore(e);
}
}
@Override
public String toString()
{
return String.format("%s@%x{s=%d/%d,p=%d}",
getClass().getSimpleName(),
hashCode(),
_size.get(),
_capacity,
_pending.get());
}
private class ReservedThread implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
private boolean _starting = true;
private Runnable _task = null;
public void offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} offer {}", this, task);
try (Locker.Lock lock = _locker.lock())
{
_task = task;
_wakeup.signal();
}
}
public void stop()
{
offer(STOP);
}
private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
Runnable task = null;
while (task == null)
{
boolean idle = false;
try (Locker.Lock lock = _locker.lock())
{
if (_task == null)
{
try
{
if (_idleTime == 0)
_wakeup.await();
else
idle = !_wakeup.await(_idleTime, _idleTimeUnit);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
task = _task;
_task = null;
}
if (idle)
{
// Because threads are held in a stack, excess threads will be
// idle. However, we cannot remove threads from the bottom of
// the stack, so we submit a poison pill job to stop the thread
// on top of the stack (which unfortunately will be the most
// recently used)
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE", this);
tryExecute(STOP);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
return task;
}
@Override
public void run()
{
while (isRunning())
{
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while (true)
{
int size = _size.get();
if (size >= _capacity)
{
if (LOG.isDebugEnabled())
LOG.debug("{} size {} > capacity", this, size, _capacity);
if (_starting)
_pending.decrementAndGet();
return;
}
if (_size.compareAndSet(size, size + 1))
break;
}
if (_starting)
{
if (LOG.isDebugEnabled())
LOG.debug("{} started", this);
_pending.decrementAndGet();
_starting = false;
}
// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.offerFirst(this);
// Wait for a task
Runnable task = reservedWait();
if (task == STOP)
// return on STOP poison pill
break;
// Run the task
try
{
task.run();
}
catch (Throwable e)
{
LOG.warn(e);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} Exited", this);
}
@Override
public String toString()
{
return String.format("%s@%x", ReservedThreadExecutor.this, hashCode());
}
}
}