package org.eclipse.jetty.util.thread;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
@ManagedObject("A thread pool")
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor
{
private final ThreadPoolExecutor _executor;
private final ThreadPoolBudget _budget;
private final ThreadGroup _group;
private String _name = "etp" + hashCode();
private int _minThreads;
private int _reservedThreads = -1;
private TryExecutor _tryExecutor = TryExecutor.NO_TRY;
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon;
private boolean _detailedDump;
public ExecutorThreadPool()
{
this(200, 8);
}
public ExecutorThreadPool(int maxThreads)
{
this(maxThreads, Math.min(8, maxThreads));
}
public ExecutorThreadPool(int maxThreads, int minThreads)
{
this(maxThreads, minThreads, new LinkedBlockingQueue<>());
}
public ExecutorThreadPool(int maxThreads, int minThreads, BlockingQueue<Runnable> queue)
{
this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, queue), minThreads, -1, null);
}
public ExecutorThreadPool(ThreadPoolExecutor executor)
{
this(executor, -1);
}
public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads)
{
this(executor, reservedThreads, null);
}
public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads, ThreadGroup group)
{
this(executor, Math.min(ProcessorUtils.availableProcessors(), executor.getCorePoolSize()), reservedThreads, group);
}
private ExecutorThreadPool(ThreadPoolExecutor executor, int minThreads, int reservedThreads, ThreadGroup group)
{
int maxThreads = executor.getMaximumPoolSize();
if (maxThreads < minThreads)
{
executor.shutdownNow();
throw new IllegalArgumentException("max threads (" + maxThreads + ") cannot be less than min threads (" + minThreads + ")");
}
_executor = executor;
_executor.setThreadFactory(this::newThread);
_group = group;
_minThreads = minThreads;
_reservedThreads = reservedThreads;
_budget = new ThreadPoolBudget(this);
}
@ManagedAttribute("name of this thread pool")
public String getName()
{
return _name;
}
public void setName(String name)
{
if (isRunning())
throw new IllegalStateException(getState());
_name = name;
}
@Override
@ManagedAttribute("minimum number of threads in the pool")
public int getMinThreads()
{
return _minThreads;
}
@Override
public void setMinThreads(int threads)
{
_minThreads = threads;
}
@Override
@ManagedAttribute("maximum number of threads in the pool")
public int getMaxThreads()
{
return _executor.getMaximumPoolSize();
}
@Override
public void setMaxThreads(int threads)
{
if (_budget != null)
_budget.check(threads);
_executor.setCorePoolSize(threads);
_executor.setMaximumPoolSize(threads);
}
@ManagedAttribute("maximum time a thread may be idle in ms")
public int getIdleTimeout()
{
return (int)_executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
}
public void setIdleTimeout(int idleTimeout)
{
_executor.setKeepAliveTime(idleTimeout, TimeUnit.MILLISECONDS);
}
@ManagedAttribute("the number of reserved threads in the pool")
public int getReservedThreads()
{
if (isStarted())
return getBean(ReservedThreadExecutor.class).getCapacity();
return _reservedThreads;
}
public void setReservedThreads(int reservedThreads)
{
if (isRunning())
throw new IllegalStateException(getState());
_reservedThreads = reservedThreads;
}
public void setThreadsPriority(int priority)
{
_priority = priority;
}
public int getThreadsPriority()
{
return _priority;
}
@ManagedAttribute("whether this thread pool uses daemon threads")
public boolean isDaemon()
{
return _daemon;
}
public void setDaemon(boolean daemon)
{
_daemon = daemon;
}
@ManagedAttribute("reports additional details in the dump")
public boolean isDetailedDump()
{
return _detailedDump;
}
public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}
@Override
@ManagedAttribute("number of threads in the pool")
public int getThreads()
{
return _executor.getPoolSize();
}
@Override
@ManagedAttribute("number of idle threads in the pool")
public int getIdleThreads()
{
return _executor.getPoolSize() - _executor.getActiveCount();
}
@Override
public void execute(Runnable command)
{
_executor.execute(command);
}
@Override
public boolean tryExecute(Runnable task)
{
TryExecutor tryExecutor = _tryExecutor;
return tryExecutor != null && tryExecutor.tryExecute(task);
}
@Override
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
public boolean isLowOnThreads()
{
return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads();
}
@Override
protected void doStart() throws Exception
{
if (_executor.isShutdown())
throw new IllegalStateException("This thread pool is not restartable");
for (int i = 0; i < _minThreads; ++i)
{
_executor.prestartCoreThread();
}
_tryExecutor = new ReservedThreadExecutor(this, _reservedThreads);
addBean(_tryExecutor);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(_tryExecutor);
_tryExecutor = TryExecutor.NO_TRY;
_executor.shutdownNow();
_budget.reset();
}
@Override
public void join() throws InterruptedException
{
_executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
@Override
public ThreadPoolBudget getThreadPoolBudget()
{
return _budget;
}
protected Thread newThread(Runnable job)
{
Thread thread = new Thread(_group, job);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(getName() + "-" + thread.getId());
return thread;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
String prefix = getName() + "-";
List<Dumpable> threads = Thread.getAllStackTraces().entrySet().stream()
.filter(entry -> entry.getKey().getName().startsWith(prefix))
.map(entry ->
{
Thread thread = entry.getKey();
StackTraceElement[] frames = entry.getValue();
String knownMethod = null;
for (StackTraceElement frame : frames)
{
if ("getTask".equals(frame.getMethodName()) && frame.getClassName().endsWith("ThreadPoolExecutor"))
{
knownMethod = "IDLE ";
break;
}
else if ("reservedWait".equals(frame.getMethodName()) && frame.getClassName().endsWith("ReservedThread"))
{
knownMethod = "RESERVED ";
break;
}
else if ("select".equals(frame.getMethodName()) && frame.getClassName().endsWith("SelectorProducer"))
{
knownMethod = "SELECTING ";
break;
}
else if ("accept".equals(frame.getMethodName()) && frame.getClassName().contains("ServerConnector"))
{
knownMethod = "ACCEPTING ";
break;
}
}
String known = knownMethod == null ? "" : knownMethod;
return new Dumpable()
{
@Override
public void dump(Appendable out, String indent) throws IOException
{
StringBuilder b = new StringBuilder();
b.append(String.valueOf(thread.getId()))
.append(" ")
.append(thread.getName())
.append(" p=").append(String.valueOf(thread.getPriority()))
.append(" ")
.append(known)
.append(thread.getState().toString());
if (isDetailedDump())
{
if (known.isEmpty())
Dumpable.dumpObjects(out, indent, b.toString(), (Object[])frames);
else
Dumpable.dumpObject(out, b.toString());
}
else
{
b.append(" @ ").append(frames.length > 0 ? String.valueOf(frames[0]) : "<no_stack_frames>");
Dumpable.dumpObject(out, b.toString());
}
}
@Override
public String dump()
{
return null;
}
};
})
.collect(Collectors.toList());
List<Runnable> jobs = Collections.emptyList();
if (isDetailedDump())
jobs = new ArrayList<>(_executor.getQueue());
dumpObjects(out, indent, threads, new DumpableCollection("jobs", jobs));
}
@Override
public String toString()
{
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d,%s}",
getClass().getSimpleName(),
getName(),
hashCode(),
getState(),
getMinThreads(),
getThreads(),
getMaxThreads(),
getIdleThreads(),
_executor.getQueue().size(),
_tryExecutor);
}
}