package io.undertow.server.handlers;
import io.undertow.UndertowLogger;
import io.undertow.server.HandlerWrapper;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.builder.HandlerBuilder;
import io.undertow.util.WorkerUtils;
import org.xnio.XnioExecutor;
import org.xnio.XnioIoThread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class StuckThreadDetectionHandler implements HttpHandler {
public static final int DEFAULT_THRESHOLD = 600;
private final AtomicInteger stuckCount = new AtomicInteger(0);
private final int threshold;
private final ConcurrentHashMap<Long, MonitoredThread> activeThreads =
new ConcurrentHashMap<Long, MonitoredThread>();
private final Queue<CompletedStuckThread> completedStuckThreadsQueue =
new ConcurrentLinkedQueue<>();
private final HttpHandler next;
private final Runnable stuckThreadTask = new Runnable() {
@Override
public void run() {
long thresholdInMillis = threshold * 1000L;
for (MonitoredThread monitoredThread : activeThreads.values()) {
long activeTime = monitoredThread.getActiveTimeInMillis();
if (activeTime >= thresholdInMillis && monitoredThread.markAsStuckIfStillRunning()) {
int numStuckThreads = stuckCount.incrementAndGet();
notifyStuckThreadDetected(monitoredThread, activeTime, numStuckThreads);
}
}
for (CompletedStuckThread completedStuckThread = completedStuckThreadsQueue.poll();
completedStuckThread != null; completedStuckThread = completedStuckThreadsQueue.poll()) {
int numStuckThreads = stuckCount.decrementAndGet();
notifyStuckThreadCompleted(completedStuckThread, numStuckThreads);
}
synchronized (StuckThreadDetectionHandler.this) {
if(activeThreads.isEmpty()) {
timerKey = null;
} else {
timerKey = WorkerUtils.executeAfter(((XnioIoThread)Thread.currentThread()), stuckThreadTask, 1, TimeUnit.SECONDS);
}
}
}
};
private volatile XnioExecutor.Key timerKey;
public StuckThreadDetectionHandler(HttpHandler next) {
this(DEFAULT_THRESHOLD, next);
}
public StuckThreadDetectionHandler(int threshold, HttpHandler next) {
this.threshold = threshold;
this.next = next;
}
public int getThreshold() {
return threshold;
}
private void notifyStuckThreadDetected(MonitoredThread monitoredThread,
long activeTime, int numStuckThreads) {
Throwable th = new Throwable();
th.setStackTrace(monitoredThread.getThread().getStackTrace());
UndertowLogger.REQUEST_LOGGER.stuckThreadDetected
(monitoredThread.getThread().getName(), monitoredThread.getThread().getId(),
activeTime, monitoredThread.getStartTime(), monitoredThread.getRequestUri(), threshold, numStuckThreads, th);
}
private void notifyStuckThreadCompleted(CompletedStuckThread thread,
int numStuckThreads) {
UndertowLogger.REQUEST_LOGGER.stuckThreadCompleted
(thread.getName(), thread.getId(), thread.getTotalActiveTime(), numStuckThreads);
}
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
Long key = Thread.currentThread().getId();
MonitoredThread monitoredThread = new MonitoredThread(Thread.currentThread(), exchange.getRequestURI() + exchange.getQueryString());
activeThreads.put(key, monitoredThread);
if(timerKey == null) {
synchronized (this) {
if(timerKey == null) {
timerKey = exchange.getIoThread().executeAfter(stuckThreadTask, 1, TimeUnit.SECONDS);
}
}
}
try {
next.handleRequest(exchange);
} finally {
activeThreads.remove(key);
if (monitoredThread.markAsDone() == MonitoredThreadState.STUCK) {
completedStuckThreadsQueue.add(
new CompletedStuckThread(monitoredThread.getThread(),
monitoredThread.getActiveTimeInMillis()));
}
}
}
public long[] getStuckThreadIds() {
List<Long> idList = new ArrayList<>();
for (MonitoredThread monitoredThread : activeThreads.values()) {
if (monitoredThread.isMarkedAsStuck()) {
idList.add(Long.valueOf(monitoredThread.getThread().getId()));
}
}
long[] result = new long[idList.size()];
for (int i = 0; i < result.length; i++) {
result[i] = idList.get(i).longValue();
}
return result;
}
private static class MonitoredThread {
private final Thread thread;
private final String requestUri;
private final long start;
private final AtomicInteger state = new AtomicInteger(
MonitoredThreadState.RUNNING.ordinal());
MonitoredThread(Thread thread, String requestUri) {
this.thread = thread;
this.requestUri = requestUri;
this.start = System.currentTimeMillis();
}
public Thread getThread() {
return this.thread;
}
public String getRequestUri() {
return requestUri;
}
public long getActiveTimeInMillis() {
return System.currentTimeMillis() - start;
}
public Date getStartTime() {
return new Date(start);
}
public boolean markAsStuckIfStillRunning() {
return this.state.compareAndSet(MonitoredThreadState.RUNNING.ordinal(),
MonitoredThreadState.STUCK.ordinal());
}
public MonitoredThreadState markAsDone() {
int val = this.state.getAndSet(MonitoredThreadState.DONE.ordinal());
return MonitoredThreadState.values()[val];
}
boolean isMarkedAsStuck() {
return this.state.get() == MonitoredThreadState.STUCK.ordinal();
}
}
private static class CompletedStuckThread {
private final String threadName;
private final long threadId;
private final long totalActiveTime;
CompletedStuckThread(Thread thread, long totalActiveTime) {
this.threadName = thread.getName();
this.threadId = thread.getId();
this.totalActiveTime = totalActiveTime;
}
public String getName() {
return this.threadName;
}
public long getId() {
return this.threadId;
}
public long getTotalActiveTime() {
return this.totalActiveTime;
}
}
private enum MonitoredThreadState {
RUNNING, STUCK, DONE;
}
public static final class Wrapper implements HandlerWrapper {
private final int threshhold;
public Wrapper(int threshhold) {
this.threshhold = threshhold;
}
public Wrapper() {
this.threshhold = DEFAULT_THRESHOLD;
}
@Override
public HttpHandler wrap(HttpHandler handler) {
return new StuckThreadDetectionHandler(threshhold, handler);
}
}
@Override
public String toString() {
return "stuck-thread-detector( " + threshold + " )";
}
public static class Builder implements HandlerBuilder {
@Override
public String name() {
return "stuck-thread-detector";
}
@Override
public Map<String, Class<?>> parameters() {
return Collections.<String, Class<?>>singletonMap("threshhold", Integer.class);
}
@Override
public Set<String> requiredParameters() {
return Collections.emptySet();
}
@Override
public String defaultParameter() {
return "threshhold";
}
@Override
public HandlerWrapper build(Map<String, Object> config) {
Integer threshhold = (Integer) config.get("threshhold");
if(threshhold == null) {
return new Wrapper();
} else {
return new Wrapper(threshhold);
}
}
}
}