/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.concurrent;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.tracing.Tracing.isTracing;
This class encorporates some Executor best practices for Cassandra. Most of the executors in the system
should use or extend this. There are two main improvements over a vanilla TPE:
- If a task throws an exception, the default uncaught exception handler will be invoked; if there is
no such handler, the exception will be logged.
- MaximumPoolSize is not supported. Here is what that means (quoting TPE javadoc):
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
We don't want this last stage of creating new threads if the queue is full; it makes it needlessly difficult to
reason about the system's behavior. In other words, if DebuggableTPE has allocated our maximum number of (core)
threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
stage is less busy, core thread timeout is enabled.
/**
* This class encorporates some Executor best practices for Cassandra. Most of the executors in the system
* should use or extend this. There are two main improvements over a vanilla TPE:
*
* - If a task throws an exception, the default uncaught exception handler will be invoked; if there is
* no such handler, the exception will be logged.
* - MaximumPoolSize is not supported. Here is what that means (quoting TPE javadoc):
*
* If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
* If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
* If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
*
* We don't want this last stage of creating new threads if the queue is full; it makes it needlessly difficult to
* reason about the system's behavior. In other words, if DebuggableTPE has allocated our maximum number of (core)
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled.
*/
public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
{
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
{
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
{
((DebuggableThreadPoolExecutor) executor).onInitialRejection(task);
BlockingQueue<Runnable> queue = executor.getQueue();
while (true)
{
if (executor.isShutdown())
{
((DebuggableThreadPoolExecutor) executor).onFinalRejection(task);
throw new RejectedExecutionException("ThreadPoolExecutor has shut down");
}
try
{
if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
{
((DebuggableThreadPoolExecutor) executor).onFinalAccept(task);
break;
}
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
}
};
public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
{
this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, priority));
}
public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> queue, ThreadFactory factory)
{
this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
}
public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
allowCoreThreadTimeOut(true);
// block task submissions until queue has room.
// this is fighting TPE's design a bit because TPE rejects if queue.offer reports a full queue.
// we'll just override this with a handler that retries until it gets in. ugly, but effective.
// (there is an extensive analysis of the options here at
// http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
this.setRejectedExecutionHandler(blockingExecutionHandler);
}
Creates a thread pool that creates new threads as needed, but
will reuse previously constructed threads when they are
available.
Params: - threadPoolName – the name of the threads created by this executor
Returns: The new DebuggableThreadPoolExecutor
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available.
* @param threadPoolName the name of the threads created by this executor
* @return The new DebuggableThreadPoolExecutor
*/
public static DebuggableThreadPoolExecutor createCachedThreadpoolWithMaxSize(String threadPoolName)
{
return new DebuggableThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamedThreadFactory(threadPoolName));
}
Returns a ThreadPoolExecutor with a fixed number of threads.
When all threads are actively executing tasks, new tasks are queued.
If (most) threads are expected to be idle most of the time, prefer createWithMaxSize() instead.
Params: - threadPoolName – the name of the threads created by this executor
- size – the fixed number of threads for this executor
Returns: the new DebuggableThreadPoolExecutor
/**
* Returns a ThreadPoolExecutor with a fixed number of threads.
* When all threads are actively executing tasks, new tasks are queued.
* If (most) threads are expected to be idle most of the time, prefer createWithMaxSize() instead.
* @param threadPoolName the name of the threads created by this executor
* @param size the fixed number of threads for this executor
* @return the new DebuggableThreadPoolExecutor
*/
public static DebuggableThreadPoolExecutor createWithFixedPoolSize(String threadPoolName, int size)
{
return createWithMaximumPoolSize(threadPoolName, size, Integer.MAX_VALUE, TimeUnit.SECONDS);
}
Returns a ThreadPoolExecutor with a fixed maximum number of threads, but whose
threads are terminated when idle for too long.
When all threads are actively executing tasks, new tasks are queued.
Params: - threadPoolName – the name of the threads created by this executor
- size – the maximum number of threads for this executor
- keepAliveTime – the time an idle thread is kept alive before being terminated
- unit – tht time unit for
keepAliveTime
Returns: the new DebuggableThreadPoolExecutor
/**
* Returns a ThreadPoolExecutor with a fixed maximum number of threads, but whose
* threads are terminated when idle for too long.
* When all threads are actively executing tasks, new tasks are queued.
* @param threadPoolName the name of the threads created by this executor
* @param size the maximum number of threads for this executor
* @param keepAliveTime the time an idle thread is kept alive before being terminated
* @param unit tht time unit for {@code keepAliveTime}
* @return the new DebuggableThreadPoolExecutor
*/
public static DebuggableThreadPoolExecutor createWithMaximumPoolSize(String threadPoolName, int size, int keepAliveTime, TimeUnit unit)
{
return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
}
protected void onInitialRejection(Runnable task) {}
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
public void execute(Runnable command, ExecutorLocals locals)
{
super.execute(locals == null || command instanceof LocalSessionWrapper
? command
: new LocalSessionWrapper<Object>(command, locals));
}
public void maybeExecuteImmediately(Runnable command)
{
execute(command);
}
// execute does not call newTaskFor
@Override
public void execute(Runnable command)
{
super.execute(isTracing() && !(command instanceof LocalSessionWrapper)
? new LocalSessionWrapper<Object>(Executors.callable(command, null))
: command);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result)
{
if (isTracing() && !(runnable instanceof LocalSessionWrapper))
{
return new LocalSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
if (isTracing() && !(callable instanceof LocalSessionWrapper))
{
return new LocalSessionWrapper<T>(callable);
}
return super.newTaskFor(callable);
}
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
maybeResetTraceSessionWrapper(r);
logExceptionsAfterExecute(r, t);
}
protected static void maybeResetTraceSessionWrapper(Runnable r)
{
if (r instanceof LocalSessionWrapper)
{
LocalSessionWrapper tsw = (LocalSessionWrapper) r;
// we have to reset trace state as its presence is what denotes the current thread is tracing
// and if left this thread might start tracing unrelated tasks
tsw.reset();
}
}
@Override
protected void beforeExecute(Thread t, Runnable r)
{
if (r instanceof LocalSessionWrapper)
((LocalSessionWrapper) r).setupContext();
super.beforeExecute(t, r);
}
Send @param t and any exception wrapped by @param r to the default uncaught exception handler,
or log them if none such is set up
/**
* Send @param t and any exception wrapped by @param r to the default uncaught exception handler,
* or log them if none such is set up
*/
public static void logExceptionsAfterExecute(Runnable r, Throwable t)
{
Throwable hiddenThrowable = extractThrowable(r);
if (hiddenThrowable != null)
handleOrLog(hiddenThrowable);
// ThreadPoolExecutor will re-throw exceptions thrown by its Task (which will be seen by
// the default uncaught exception handler) so we only need to do anything if that handler
// isn't set up yet.
if (t != null && Thread.getDefaultUncaughtExceptionHandler() == null)
handleOrLog(t);
}
Send @param t to the default uncaught exception handler, or log it if none such is set up
/**
* Send @param t to the default uncaught exception handler, or log it if none such is set up
*/
public static void handleOrLog(Throwable t)
{
if (Thread.getDefaultUncaughtExceptionHandler() == null)
logger.error("Error in ThreadPoolExecutor", t);
else
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
Returns: any exception wrapped by @param runnable, i.e., if it is a FutureTask
/**
* @return any exception wrapped by @param runnable, i.e., if it is a FutureTask
*/
public static Throwable extractThrowable(Runnable runnable)
{
// Check for exceptions wrapped by FutureTask. We do this by calling get(), which will
// cause it to throw any saved exception.
//
// Complicating things, calling get() on a ScheduledFutureTask will block until the task
// is cancelled. Hence, the extra isDone check beforehand.
if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone())
{
try
{
((Future<?>) runnable).get();
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
catch (CancellationException e)
{
logger.trace("Task cancelled", e);
}
catch (ExecutionException e)
{
return e.getCause();
}
}
return null;
}
Used to wrap a Runnable or Callable passed to submit or execute so we can clone the ExecutorLocals and move
them into the worker thread.
Type parameters: - <T> –
/**
* Used to wrap a Runnable or Callable passed to submit or execute so we can clone the ExecutorLocals and move
* them into the worker thread.
*
* @param <T>
*/
private static class LocalSessionWrapper<T> extends FutureTask<T>
{
private final ExecutorLocals locals;
public LocalSessionWrapper(Callable<T> callable)
{
super(callable);
locals = ExecutorLocals.create();
}
public LocalSessionWrapper(Runnable command, ExecutorLocals locals)
{
super(command, null);
this.locals = locals;
}
private void setupContext()
{
ExecutorLocals.set(locals);
}
private void reset()
{
ExecutorLocals.set(null);
}
}
}