/*
* Copyright (c) 2015, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.spi;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.jersey.internal.LocalizationMessages;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.internal.util.ExtendedLogger;
import org.glassfish.jersey.internal.util.collection.LazyValue;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
Abstract thread pool executor provider.
This class provides a skeleton implementation for provisioning and basic lifecycle management of thread pool executors. Every instance of the concrete implementation of this provider class creates at most one shared and lazily initialized thread pool executor instance, which can be retrieved by invoking the getExecutor()
method. This provider also makes sure that the provisioned thread pool executor instance is properly shut down when the managing provider instance is closed
(in case it has not been already shut down).
At minimum, concrete subclasses of this provider are expected to implement the createExecutor
method that is used as a thread pool instance factory. The method is invoked lazily, with the first call to the getExecutor()
method. The result returned from the createExecutor()
method is cached internally and is used as a return value for subsequent calls to the getExecutor()
method. This means, that createExecutor()
method is guaranteed to be invoked at most once during the lifetime of any particular provider instance.
Author: Marek Potociar Since: 2.18
/**
* Abstract thread pool executor provider.
* <p>
* This class provides a skeleton implementation for provisioning and basic lifecycle management of thread pool executors.
* Every instance of the concrete implementation of this provider class creates at most one shared and lazily initialized
* thread pool executor instance, which can be retrieved by invoking the {@link #getExecutor()} method. This provider also makes
* sure that the provisioned thread pool executor instance is properly shut down when the managing provider instance is
* {@link #close() closed} (in case it has not been already shut down).
* </p>
* <p>
* At minimum, concrete subclasses of this provider are expected to implement the {@link #createExecutor} method that is used
* as a thread pool instance factory. The method is invoked lazily, with the first call to the {@code getExecutor()} method.
* The result returned from the {@code createExecutor()} method is cached internally and is used as a return value for subsequent
* calls to the {@code getExecutor()} method. This means, that {@code createExecutor()} method is guaranteed to be invoked
* <em>at most once</em> during the lifetime of any particular provider instance.
* </p>
*
* @author Marek Potociar
* @since 2.18
*/
public abstract class AbstractThreadPoolProvider<E extends ThreadPoolExecutor> implements AutoCloseable {
private static final ExtendedLogger LOGGER = new ExtendedLogger(
Logger.getLogger(AbstractThreadPoolProvider.class.getName()), Level.FINEST);
Default thread pool executor termination timeout in milliseconds.
/**
* Default thread pool executor termination timeout in milliseconds.
*/
public static final int DEFAULT_TERMINATION_TIMEOUT = 5000;
private final String name;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final LazyValue<E> lazyExecutorServiceProvider =
Values.lazy((Value<E>) () -> createExecutor(getCorePoolSize(), createThreadFactory(), getRejectedExecutionHandler()));
Inheritance constructor.
Params: - name – name of the provided thread pool executor. Will be used in the names of threads created & used by the
provided thread pool executor.
/**
* Inheritance constructor.
*
* @param name name of the provided thread pool executor. Will be used in the names of threads created & used by the
* provided thread pool executor.
*/
protected AbstractThreadPoolProvider(final String name) {
this.name = name;
}
Get the thread pool executor.
The first invocation of this method will invoke the overridden createExecutor
method to retrieve the provided thread pool executor instance. The created thread pool executor instance is then cached and will be returned upon subsequent calls to this method. Throws: - IllegalStateException – in case the provider has been
closed
already.
See Also: Returns: provided thread pool executor.
/**
* Get the thread pool executor.
* <p/>
* The first invocation of this method will invoke the overridden {@link #createExecutor} method to retrieve the
* provided thread pool executor instance. The created thread pool executor instance is then cached and will be returned upon
* subsequent calls to this method.
*
* @return provided thread pool executor.
* @throws java.lang.IllegalStateException in case the provider has been {@link #close() closed} already.
* @see #createExecutor
* @see #close()
* @see #isClosed()
*/
protected final E getExecutor() {
if (isClosed()) {
throw new IllegalStateException(LocalizationMessages.THREAD_POOL_EXECUTOR_PROVIDER_CLOSED());
}
return lazyExecutorServiceProvider.get();
}
Create a new instance of the thread pool executor that should be provided by the getExecutor()
method. Concrete implementations of this class must override this method and implement the logic that creates the executor service to be provided. The returned thread pool executor will be shut down when this provider instance is closed
.
This method is invoked at most once, during the first call to the getExecutor()
method.
Params: - corePoolSize – number of core threads the provisioned thread pool executor should provide.
- threadFactory – thread factory to be used by the provisioned thread pool executor when creating new threads.
- handler – handler for tasks that cannot be executed by the provisioned thread pool executor (e.g. due to a
shutdown).
See Also: Returns: new instance of the provided thread pool executor.
/**
* Create a new instance of the thread pool executor that should be provided by the {@link #getExecutor()} method.
* <p>
* Concrete implementations of this class must override this method and implement the logic that creates the executor
* service to be provided. The returned thread pool executor will be shut down when this provider instance is
* {@link #close() closed}.
* </p>
* <p>
* This method is invoked at most once, during the first call to the {@code getExecutor()} method.
* </p>
*
* @param corePoolSize number of core threads the provisioned thread pool executor should provide.
* @param threadFactory thread factory to be used by the provisioned thread pool executor when creating new threads.
* @param handler handler for tasks that cannot be executed by the provisioned thread pool executor (e.g. due to a
* shutdown).
* @return new instance of the provided thread pool executor.
* @see #getExecutor()
* @see #close()
* @see #getCorePoolSize()
* @see #getBackingThreadFactory()
* @see #getRejectedExecutionHandler()
*/
protected abstract E createExecutor(
final int corePoolSize, final ThreadFactory threadFactory, final RejectedExecutionHandler handler);
Get the provisioned thread pool executor termination time out (in milliseconds).
The method is used during the thread pool executor shutdown sequence to determine the shutdown timeout, when this provider instance is closed
. In case the thread pool executor shutdown is interrupted or the timeout expires, the provisioned thread pool executor is shutdown forcefully
.
The method can be overridden to customize the thread pool executor termination time out. If not customized, the method defaults to 5000 ms.
See Also: Returns: provisioned thread pool executor termination time out (in milliseconds).
/**
* Get the provisioned thread pool executor termination time out (in milliseconds).
* <p>
* The method is used during the thread pool executor shutdown sequence to determine the shutdown timeout, when this provider
* instance is {@link #close() closed}.
* In case the thread pool executor shutdown is interrupted or the timeout expires, the provisioned thread pool executor is
* {@link java.util.concurrent.ExecutorService#shutdownNow() shutdown forcefully}.
* </p>
* <p>
* The method can be overridden to customize the thread pool executor termination time out. If not customized, the
* method defaults to {@value #DEFAULT_TERMINATION_TIMEOUT} ms.
* </p>
*
* @return provisioned thread pool executor termination time out (in milliseconds).
* @see #close()
* @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
*/
protected int getTerminationTimeout() {
return DEFAULT_TERMINATION_TIMEOUT;
}
Get the number of the core threads of the the provisioned thread pool executor.
The value from this method is passed as one of the input parameters in a call to the createExecutor
method.
The method can be overridden to customize the number of core threads of the provisioned thread pool executor. If not customized, the method defaults to the number of available processors
in the system.
See Also: Returns: number of core threads in the provisioned thread pool executor.
/**
* Get the number of the core threads of the the provisioned thread pool executor.
* <p>
* The value from this method is passed as one of the input parameters in a call to the {@link #createExecutor} method.
* </p>
* <p>
* The method can be overridden to customize the number of core threads of the provisioned thread pool executor.
* If not customized, the method defaults to the number of {@link Runtime#availableProcessors() available processors}
* in the system.
* </p>
*
* @return number of core threads in the provisioned thread pool executor.
* @see #createExecutor
*/
protected int getCorePoolSize() {
return Runtime.getRuntime().availableProcessors();
}
Get the handler for tasks that could not be executed by the provisioned thread pool executor.
The value from this method is passed as one of the input parameters in a call to the createExecutor
method.
The method can be overridden to customize the rejected task handler used by the provisioned thread pool executor.
If not customized, the method provides a basic default NO-OP implementation.
See Also: Returns: handler for tasks that could not be executed by the provisioned thread pool executor.
/**
* Get the handler for tasks that could not be executed by the provisioned thread pool executor.
* <p>
* The value from this method is passed as one of the input parameters in a call to the {@link #createExecutor} method.
* </p>
* <p>
* The method can be overridden to customize the rejected task handler used by the provisioned thread pool executor.
* If not customized, the method provides a basic default NO-OP implementation.
* </p>
*
* @return handler for tasks that could not be executed by the provisioned thread pool executor.
* @see #createExecutor
*/
protected RejectedExecutionHandler getRejectedExecutionHandler() {
return (r, executor) -> {
// TODO: implement the rejected execution handler method.
};
}
Get a backing thread factory that should be used as a delegate for creating the new threads for the provisioned executor
service.
The value from this method is used as a backing ThreadFactory
for an internally constructed thread factory instance that is passed as one of the input parameters in a call to the createExecutor
method. When not null
, the new threads will be created by invoking the ThreadFactory.newThread(Runnable)
on this backing ThreadFactory
.
The method can be overridden to customize the backing thread factory for the provisioned thread pool executor. If not customized, the method returns null
by default.
See Also: Returns: backing thread factory for the provisioned thread pool executor. May return null
, in which case no backing thread factory will be used.
/**
* Get a backing thread factory that should be used as a delegate for creating the new threads for the provisioned executor
* service.
* <p>
* The value from this method is used as a backing {@link ThreadFactory} for an internally constructed thread factory
* instance
* that is passed as one of the input parameters in a call to the {@link #createExecutor} method.
* When not {@code null}, the new threads will be created by invoking the {@link ThreadFactory#newThread(Runnable)} on
* this backing {@code ThreadFactory}.
* </p>
* <p>
* The method can be overridden to customize the backing thread factory for the provisioned thread pool executor.
* If not customized, the method returns {@code null} by default.
* </p>
*
* @return backing thread factory for the provisioned thread pool executor. May return {@code null}, in which case no backing
* thread factory will be used.
* @see #createExecutor
*/
protected ThreadFactory getBackingThreadFactory() {
return null;
}
private ThreadFactory createThreadFactory() {
final ThreadFactoryBuilder factoryBuilder = new ThreadFactoryBuilder()
.setNameFormat(name + "-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler());
final ThreadFactory backingThreadFactory = getBackingThreadFactory();
if (backingThreadFactory != null) {
factoryBuilder.setThreadFactory(backingThreadFactory);
}
return factoryBuilder.build();
}
Check if this thread pool executor provider has been closed
. See Also: Returns: true
if this provider has been closed, false
otherwise.
/**
* Check if this thread pool executor provider has been {@link #close() closed}.
*
* @return {@code true} if this provider has been closed, {@code false} otherwise.
* @see #close()
*/
public final boolean isClosed() {
return closed.get();
}
Close event handler, that invoked during the close()
operation.
Concrete implementations of this provider class may override this method to perform any additional resource clean-up.
Default implementation is a NO-OP.
See Also:
/**
* Close event handler, that invoked during the {@link #close()} operation.
* <p>
* Concrete implementations of this provider class may override this method to perform any additional resource clean-up.
* Default implementation is a NO-OP.
* </p>
*
* @see #close()
*/
protected void onClose() {
// NO-OP default implementation.
}
Close this thread pool executor provider.
Once the provider is closed, it will stop providing the thread pool executor and subsequent invocations to getExecutor()
method will result in an IllegalStateException
being thrown. The current status of the provider can be checked via isClosed()
method.
Upon invocation, the following tasks are performed:
- The thread pool executor instance provisioning via
getExecutor()
method is stopped.
- The
onClose()
event handler is invoked.
- The thread pool executor, if previously
created
and provisioned
, is shut down.
The actual thread pool executor shutdown is performed as follows:
First, a graceful shutdown
is attempted. The value returned from a call to getTerminationTimeout()
method is used to determine the graceful shutdown timeout period.
In case the thread pool executor graceful shutdown is interrupted or the timeout expires, the provisioned thread pool executor is shutdown forcefully
. All tasks that have never commenced execution are then cancelled interruptingly
, if possible.
See Also:
/**
* Close this thread pool executor provider.
* <p>
* Once the provider is closed, it will stop providing the thread pool executor and subsequent invocations to
* {@link #getExecutor()} method will result in an {@link java.lang.IllegalStateException} being thrown. The current
* status of the provider can be checked via {@link #isClosed()} method.
* </p>
* <p>
* Upon invocation, the following tasks are performed:
* <ul>
* <li>The thread pool executor instance provisioning via {@code getExecutor()} method is stopped.</li>
* <li>The {@link #onClose()} event handler is invoked.</li>
* <li>The thread pool executor, if previously {@link #createExecutor created} and {@link #getExecutor() provisioned},
* is shut down.</li>
* </ul>
* The actual thread pool executor shutdown is performed as follows:
* </p>
* <p>
* First, a {@link java.util.concurrent.ExecutorService#shutdown() graceful shutdown} is attempted. The value returned from
* a call to {@link #getTerminationTimeout()} method is used to determine the graceful shutdown timeout period.
* </p>
* <p>
* In case the thread pool executor graceful shutdown is interrupted or the timeout expires, the provisioned thread pool
* executor is
* {@link java.util.concurrent.ExecutorService#shutdownNow() shutdown forcefully}. All tasks that have never commenced
* execution are then {@link java.util.concurrent.Future#cancel cancelled interruptingly}, if possible.
* </p>
*
* @see #isClosed()
* @see #onClose()
* @see #getExecutor()
* @see #getTerminationTimeout()
*/
public final void close() {
if (!closed.compareAndSet(false, true)) {
return;
}
try {
onClose();
} finally {
if (lazyExecutorServiceProvider.isInitialized()) {
AccessController.doPrivileged(shutdownExecutor(
name,
lazyExecutorServiceProvider.get(),
getTerminationTimeout(),
TimeUnit.MILLISECONDS));
}
}
}
Create a PrivilegedAction
that contains logic for a proper shut-down sequence of an executor service. Params: - executorName – executor service identification.
- executorService – executor service instance.
- terminationTimeout – orderly shut-down termination time-out value (maximum time to wait for the termination).
- terminationTimeUnit – orderly shut-down termination time-out time unit.
Returns: an executor shut-down logic wrapped in a privileged action.
/**
* Create a {@link java.security.PrivilegedAction} that contains logic for a proper shut-down sequence of an executor
* service.
*
* @param executorName executor service identification.
* @param executorService executor service instance.
* @param terminationTimeout orderly shut-down termination time-out value (maximum time to wait for the termination).
* @param terminationTimeUnit orderly shut-down termination time-out time unit.
* @return an executor shut-down logic wrapped in a privileged action.
*/
private static PrivilegedAction<?> shutdownExecutor(
final String executorName,
final ExecutorService executorService,
final int terminationTimeout,
final TimeUnit terminationTimeUnit) {
return (PrivilegedAction<Void>) () -> {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
if (executorService.isTerminated()) {
return null;
}
boolean terminated = false;
boolean interrupted = false;
try {
terminated = executorService.awaitTermination(terminationTimeout, terminationTimeUnit);
} catch (InterruptedException e) {
if (LOGGER.isDebugLoggable()) {
LOGGER.log(LOGGER.getDebugLevel(),
"Interrupted while waiting for thread pool executor " + executorName + " to shutdown.", e);
}
interrupted = true;
}
try {
if (!terminated) {
final List<Runnable> cancelledTasks = executorService.shutdownNow();
for (Runnable cancelledTask : cancelledTasks) {
if (cancelledTask instanceof Future) {
((Future) cancelledTask).cancel(true);
}
}
if (LOGGER.isDebugLoggable()) {
LOGGER.debugLog("Thread pool executor {0} forced-shut down. List of cancelled tasks: {1}",
executorName, cancelledTasks);
}
}
} finally {
if (interrupted) {
// restoring the interrupt flag
Thread.currentThread().interrupt();
}
}
return null;
};
}
}