package org.eclipse.jetty.util.thread;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedObject("A thread pool")
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
{
private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class);
private static Runnable NOOP = () ->
{
};
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
private final AtomicLong _lastShrink = new AtomicLong();
private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition();
private final BlockingQueue<Runnable> _jobs;
private final ThreadGroup _threadGroup;
private final ThreadFactory _threadFactory;
private String _name = "qtp" + hashCode();
private int _idleTimeout;
private int _maxThreads;
private int _minThreads;
private int _reservedThreads = -1;
private TryExecutor _tryExecutor = TryExecutor.NO_TRY;
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon = false;
private boolean _detailedDump = false;
private int _lowThreadsThreshold = 1;
private ThreadPoolBudget _budget;
private long _stopTimeout;
public QueuedThreadPool()
{
this(200);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads)
{
this(maxThreads, Math.min(8, maxThreads));
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
{
this(maxThreads, minThreads, 60000);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("queue") BlockingQueue<Runnable> queue)
{
this(maxThreads, minThreads, 60000, -1, queue, null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout)
{
this(maxThreads, minThreads, idleTimeout, null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
{
this(maxThreads, minThreads, idleTimeout, queue, null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup,
@Name("threadFactory") ThreadFactory threadFactory)
{
if (maxThreads < minThreads)
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
setStopTimeout(5000);
setReservedThreads(reservedThreads);
if (queue == null)
{
int capacity = Math.max(_minThreads, 8) * 1024;
queue = new BlockingArrayQueue<>(capacity, capacity);
}
_jobs = queue;
_threadGroup = threadGroup;
setThreadPoolBudget(new ThreadPoolBudget(this));
_threadFactory = threadFactory == null ? this : threadFactory;
}
@Override
public ThreadPoolBudget getThreadPoolBudget()
{
return _budget;
}
public void setThreadPoolBudget(ThreadPoolBudget budget)
{
if (budget != null && budget.getSizedThreadPool() != this)
throw new IllegalArgumentException();
_budget = budget;
}
public void setStopTimeout(long stopTimeout)
{
_stopTimeout = stopTimeout;
}
public long getStopTimeout()
{
return _stopTimeout;
}
@Override
protected void doStart() throws Exception
{
if (_reservedThreads == 0)
{
_tryExecutor = NO_TRY;
}
else
{
ReservedThreadExecutor reserved = new ReservedThreadExecutor(this, _reservedThreads);
reserved.setIdleTimeout(_idleTimeout, TimeUnit.MILLISECONDS);
_tryExecutor = reserved;
}
addBean(_tryExecutor);
_lastShrink.set(System.nanoTime());
super.doStart();
_counts.set(0, 0);
ensureThreads();
}
@Override
protected void doStop() throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("Stopping {}", this);
super.doStop();
removeBean(_tryExecutor);
_tryExecutor = TryExecutor.NO_TRY;
int threads = _counts.getAndSetHi(Integer.MIN_VALUE);
long timeout = getStopTimeout();
BlockingQueue<Runnable> jobs = getQueue();
if (timeout > 0)
{
for (int i = 0; i < threads; ++i)
{
jobs.offer(NOOP);
}
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
for (Thread thread : _threads)
{
if (LOG.isDebugEnabled())
LOG.debug("Interrupting {}", thread);
thread.interrupt();
}
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
Thread.yield();
if (LOG.isDebugEnabled())
{
for (Thread unstopped : _threads)
{
StringBuilder dmp = new StringBuilder();
for (StackTraceElement element : unstopped.getStackTrace())
{
dmp.append(System.lineSeparator()).append("\tat ").append(element);
}
LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
}
}
else
{
for (Thread unstopped : _threads)
{
LOG.warn("{} Couldn't stop {}", this, unstopped);
}
}
}
while (!_jobs.isEmpty())
{
Runnable job = _jobs.poll();
if (job instanceof Closeable)
{
try
{
((Closeable)job).close();
}
catch (Throwable t)
{
LOG.warn("Unable to close job: {}", job, t);
}
}
else if (job != NOOP)
LOG.warn("Stopped without executing or closing {}", job);
}
if (_budget != null)
_budget.reset();
try (AutoLock.WithCondition l = _joinLock.lock())
{
l.signalAll();
}
}
private void joinThreads(long stopByNanos) throws InterruptedException
{
for (Thread thread : _threads)
{
long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Waiting for {} for {}", thread, canWait);
if (canWait > 0)
thread.join(canWait);
}
}
public void setDaemon(boolean daemon)
{
_daemon = daemon;
}
public void setIdleTimeout(int idleTimeout)
{
_idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
}
@Override
public void setMaxThreads(int maxThreads)
{
if (_budget != null)
_budget.check(maxThreads);
_maxThreads = maxThreads;
if (_minThreads > _maxThreads)
_minThreads = _maxThreads;
}
@Override
public void setMinThreads(int minThreads)
{
_minThreads = minThreads;
if (_minThreads > _maxThreads)
_maxThreads = _minThreads;
if (isStarted())
ensureThreads();
}
public void setReservedThreads(int reservedThreads)
{
if (isRunning())
throw new IllegalStateException(getState());
_reservedThreads = reservedThreads;
}
public void setName(String name)
{
if (isRunning())
throw new IllegalStateException("started");
_name = name;
}
public void setThreadsPriority(int priority)
{
_priority = priority;
}
@ManagedAttribute("maximum time a thread may be idle in ms")
public int getIdleTimeout()
{
return _idleTimeout;
}
@Override
@ManagedAttribute("maximum number of threads in the pool")
public int getMaxThreads()
{
return _maxThreads;
}
@Override
@ManagedAttribute("minimum number of threads in the pool")
public int getMinThreads()
{
return _minThreads;
}
@ManagedAttribute("the number of reserved threads in the pool")
public int getReservedThreads()
{
if (isStarted())
{
ReservedThreadExecutor reservedThreadExecutor = getBean(ReservedThreadExecutor.class);
if (reservedThreadExecutor != null)
return reservedThreadExecutor.getCapacity();
}
return _reservedThreads;
}
@ManagedAttribute("name of the thread pool")
public String getName()
{
return _name;
}
@ManagedAttribute("priority of threads in the pool")
public int getThreadsPriority()
{
return _priority;
}
@ManagedAttribute("size of the job queue")
public int getQueueSize()
{
int idle = _counts.getLo();
return Math.max(0, -idle);
}
@ManagedAttribute("thread pool uses daemon threads")
public boolean isDaemon()
{
return _daemon;
}
@ManagedAttribute("reports additional details in the dump")
public boolean isDetailedDump()
{
return _detailedDump;
}
public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}
@ManagedAttribute("threshold at which the pool is low on threads")
public int getLowThreadsThreshold()
{
return _lowThreadsThreshold;
}
public void setLowThreadsThreshold(int lowThreadsThreshold)
{
_lowThreadsThreshold = lowThreadsThreshold;
}
@Override
public void execute(Runnable job)
{
int startThread;
while (true)
{
long counts = _counts.get();
int threads = AtomicBiInteger.getHi(counts);
if (threads == Integer.MIN_VALUE)
throw new RejectedExecutionException(job.toString());
int idle = AtomicBiInteger.getLo(counts);
startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;
if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
continue;
break;
}
if (!_jobs.offer(job))
{
if (addCounts(-startThread, 1 - startThread))
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
if (LOG.isDebugEnabled())
LOG.debug("queue {} startThread={}", job, startThread);
while (startThread-- > 0)
startThread();
}
@Override
public boolean tryExecute(Runnable task)
{
TryExecutor tryExecutor = _tryExecutor;
return tryExecutor != null && tryExecutor.tryExecute(task);
}
@Override
public void join() throws InterruptedException
{
try (AutoLock.WithCondition l = _joinLock.lock())
{
while (isRunning())
{
l.await();
}
}
while (isStopping())
{
Thread.sleep(1);
}
}
@Override
@ManagedAttribute("number of threads in the pool")
public int getThreads()
{
int threads = _counts.getHi();
return Math.max(0, threads);
}
@Override
@ManagedAttribute("number of idle threads in the pool")
public int getIdleThreads()
{
int idle = _counts.getLo();
return Math.max(0, idle);
}
@ManagedAttribute("number of busy threads in the pool")
public int getBusyThreads()
{
int reserved = _tryExecutor instanceof ReservedThreadExecutor ? ((ReservedThreadExecutor)_tryExecutor).getAvailable() : 0;
return getThreads() - getIdleThreads() - reserved;
}
@Override
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
public boolean isLowOnThreads()
{
return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold();
}
private void ensureThreads()
{
while (true)
{
long counts = _counts.get();
int threads = AtomicBiInteger.getHi(counts);
if (threads == Integer.MIN_VALUE)
break;
int idle = AtomicBiInteger.getLo(counts);
if (threads < _minThreads || (idle < 0 && threads < _maxThreads))
{
if (_counts.compareAndSet(counts, threads + 1, idle + 1))
startThread();
continue;
}
break;
}
}
protected void startThread()
{
boolean started = false;
try
{
Thread thread = _threadFactory.newThread(_runnable);
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
_lastShrink.set(System.nanoTime());
thread.start();
started = true;
}
finally
{
if (!started)
addCounts(-1, -1);
}
}
private boolean addCounts(int deltaThreads, int deltaIdle)
{
while (true)
{
long encoded = _counts.get();
int threads = AtomicBiInteger.getHi(encoded);
int idle = AtomicBiInteger.getLo(encoded);
if (threads == Integer.MIN_VALUE)
{
long update = AtomicBiInteger.encode(threads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return false;
}
else
{
long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return true;
}
}
}
@Override
public Thread newThread(Runnable runnable)
{
Thread thread = new Thread(_threadGroup, runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
return thread;
}
protected void removeThread(Thread thread)
{
_threads.remove(thread);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Object> threads = new ArrayList<>(getMaxThreads());
for (Thread thread : _threads)
{
StackTraceElement[] trace = thread.getStackTrace();
String stackTag = getCompressedStackTag(trace);
String baseThreadInfo = String.format("%s %s tid=%d prio=%d", thread.getName(), thread.getState(), thread.getId(), thread.getPriority());
if (!StringUtil.isBlank(stackTag))
threads.add(baseThreadInfo + " " + stackTag);
else if (isDetailedDump())
threads.add((Dumpable)(o, i) -> Dumpable.dumpObjects(o, i, baseThreadInfo, (Object[])trace));
else
threads.add(baseThreadInfo + " @ " + (trace.length > 0 ? trace[0].toString() : "???"));
}
DumpableCollection threadsDump = new DumpableCollection("threads", threads);
if (isDetailedDump())
dumpObjects(out, indent, threadsDump, new DumpableCollection("jobs", new ArrayList<>(getQueue())));
else
dumpObjects(out, indent, threadsDump);
}
private String getCompressedStackTag(StackTraceElement[] trace)
{
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().equals(Runner.class.getName()))
return "IDLE";
if ("reservedWait".equals(t.getMethodName()) && t.getClassName().endsWith("ReservedThread"))
return "RESERVED";
if ("select".equals(t.getMethodName()) && t.getClassName().endsWith("SelectorProducer"))
return "SELECTING";
if ("accept".equals(t.getMethodName()) && t.getClassName().contains("ServerConnector"))
return "ACCEPTING";
}
return "";
}
@Override
public String toString()
{
long count = _counts.get();
int threads = Math.max(0, AtomicBiInteger.getHi(count));
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
getState(),
getMinThreads(),
threads,
getMaxThreads(),
idle,
getReservedThreads(),
queue,
_tryExecutor);
}
private final Runnable _runnable = new Runner();
protected void runJob(Runnable job)
{
job.run();
}
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}
@ManagedOperation("interrupts a pool thread")
public boolean interruptThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
thread.interrupt();
return true;
}
}
return false;
}
@ManagedOperation("dumps a pool thread stack")
public String dumpThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
{
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
}
return buf.toString();
}
}
return null;
}
private class Runner implements Runnable
{
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
{
if (idleTimeout <= 0)
return _jobs.take();
return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);
boolean idle = true;
try
{
Runnable job = null;
while (true)
{
if (job != null)
{
idle = true;
if (!addCounts(0, 1))
break;
}
else if (_counts.getHi() == Integer.MIN_VALUE)
{
break;
}
try
{
job = _jobs.poll();
if (job == null)
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout) && _lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
job = idleJobPoll(idleTimeout);
if (job == null)
continue;
}
idle = false;
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug("interrupted {} in {}", job, QueuedThreadPool.this);
LOG.trace("IGNORED", e);
}
catch (Throwable e)
{
LOG.warn("Job failed", e);
}
finally
{
Thread.interrupted();
}
}
}
finally
{
Thread thread = Thread.currentThread();
removeThread(thread);
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
ensureThreads();
}
}
}
}