/*
 * Copyright 2002-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.scheduling.concurrent;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.concurrent.ManagedTask;

import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;

Adapter that takes a java.util.concurrent.Executor and exposes a Spring TaskExecutor for it. Also detects an extended java.util.concurrent.ExecutorService, adapting the AsyncTaskExecutor interface accordingly.

Autodetects a JSR-236 ManagedExecutorService in order to expose ManagedTask adapters for it, exposing a long-running hint based on SchedulingAwareRunnable and an identity name based on the given Runnable/Callable's toString(). For JSR-236 style lookup in a Java EE 7 environment, consider using DefaultManagedTaskExecutor.

Note that there is a pre-built ThreadPoolTaskExecutor that allows for defining a ThreadPoolExecutor in bean style, exposing it as a Spring TaskExecutor directly. This is a convenient alternative to a raw ThreadPoolExecutor definition with a separate definition of the present adapter class.

Author:Juergen Hoeller
See Also:
Since:2.0
/** * Adapter that takes a {@code java.util.concurrent.Executor} and exposes * a Spring {@link org.springframework.core.task.TaskExecutor} for it. * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. * * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it, * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity * name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style * lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}. * * <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows * for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style, * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly. * This is a convenient alternative to a raw ThreadPoolExecutor definition with * a separate definition of the present adapter class. * * @author Juergen Hoeller * @since 2.0 * @see java.util.concurrent.Executor * @see java.util.concurrent.ExecutorService * @see java.util.concurrent.ThreadPoolExecutor * @see java.util.concurrent.Executors * @see DefaultManagedTaskExecutor * @see ThreadPoolTaskExecutor */
public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { @Nullable private static Class<?> managedExecutorServiceClass; static { try { managedExecutorServiceClass = ClassUtils.forName( "javax.enterprise.concurrent.ManagedExecutorService", ConcurrentTaskScheduler.class.getClassLoader()); } catch (ClassNotFoundException ex) { // JSR-236 API not available... managedExecutorServiceClass = null; } } private Executor concurrentExecutor; private TaskExecutorAdapter adaptedExecutor;
Create a new ConcurrentTaskExecutor, using a single thread executor as default.
See Also:
  • newSingleThreadExecutor.newSingleThreadExecutor()
/** * Create a new ConcurrentTaskExecutor, using a single thread executor as default. * @see java.util.concurrent.Executors#newSingleThreadExecutor() */
public ConcurrentTaskExecutor() { this.concurrentExecutor = Executors.newSingleThreadExecutor(); this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor); }
Create a new ConcurrentTaskExecutor, using the given Executor.

Autodetects a JSR-236 ManagedExecutorService in order to expose ManagedTask adapters for it.

Params:
  • executor – the Executor to delegate to
/** * Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}. * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. * @param executor the {@link java.util.concurrent.Executor} to delegate to */
public ConcurrentTaskExecutor(@Nullable Executor executor) { this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor()); this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor); }
Specify the Executor to delegate to.

Autodetects a JSR-236 ManagedExecutorService in order to expose ManagedTask adapters for it.

/** * Specify the {@link java.util.concurrent.Executor} to delegate to. * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. */
public final void setConcurrentExecutor(@Nullable Executor executor) { this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor()); this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor); }
Return the Executor that this adapter delegates to.
/** * Return the {@link java.util.concurrent.Executor} that this adapter delegates to. */
public final Executor getConcurrentExecutor() { return this.concurrentExecutor; }
Specify a custom TaskDecorator to be applied to any Runnable about to be executed.

Note that such a decorator is not necessarily being applied to the user-supplied Runnable/Callable but rather to the actual execution callback (which may be a wrapper around the user-supplied task).

The primary use case is to set some execution context around the task's invocation, or to provide some monitoring/statistics for task execution.

NOTE: Exception handling in TaskDecorator implementations is limited to plain Runnable execution via execute calls. In case of #submit calls, the exposed Runnable will be a FutureTask which does not propagate any exceptions; you might have to cast it and call Future#get to evaluate exceptions.

Since:4.3
/** * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} * about to be executed. * <p>Note that such a decorator is not necessarily being applied to the * user-supplied {@code Runnable}/{@code Callable} but rather to the actual * execution callback (which may be a wrapper around the user-supplied task). * <p>The primary use case is to set some execution context around the task's * invocation, or to provide some monitoring/statistics for task execution. * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations * is limited to plain {@code Runnable} execution via {@code execute} calls. * In case of {@code #submit} calls, the exposed {@code Runnable} will be a * {@code FutureTask} which does not propagate any exceptions; you might * have to cast it and call {@code Future#get} to evaluate exceptions. * @since 4.3 */
public final void setTaskDecorator(TaskDecorator taskDecorator) { this.adaptedExecutor.setTaskDecorator(taskDecorator); } @Override public void execute(Runnable task) { this.adaptedExecutor.execute(task); } @Override public void execute(Runnable task, long startTimeout) { this.adaptedExecutor.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { return this.adaptedExecutor.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { return this.adaptedExecutor.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { return this.adaptedExecutor.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { return this.adaptedExecutor.submitListenable(task); } private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) { if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) { return new ManagedTaskExecutorAdapter(concurrentExecutor); } return new TaskExecutorAdapter(concurrentExecutor); }
TaskExecutorAdapter subclass that wraps all provided Runnables and Callables with a JSR-236 ManagedTask, exposing a long-running hint based on SchedulingAwareRunnable and an identity name based on the task's toString() representation.
/** * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables * with a JSR-236 ManagedTask, exposing a long-running hint based on * {@link SchedulingAwareRunnable} and an identity name based on the task's * {@code toString()} representation. */
private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter { public ManagedTaskExecutorAdapter(Executor concurrentExecutor) { super(concurrentExecutor); } @Override public void execute(Runnable task) { super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } @Override public Future<?> submit(Runnable task) { return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } @Override public ListenableFuture<?> submitListenable(Runnable task) { return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } }
Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask, exposing a long-running hint based on SchedulingAwareRunnable and a given identity name.
/** * Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask, * exposing a long-running hint based on {@link SchedulingAwareRunnable} * and a given identity name. */
protected static class ManagedTaskBuilder { public static Runnable buildManagedTask(Runnable task, String identityName) { Map<String, String> properties; if (task instanceof SchedulingAwareRunnable) { properties = new HashMap<>(4); properties.put(ManagedTask.LONGRUNNING_HINT, Boolean.toString(((SchedulingAwareRunnable) task).isLongLived())); } else { properties = new HashMap<>(2); } properties.put(ManagedTask.IDENTITY_NAME, identityName); return ManagedExecutors.managedTask(task, properties, null); } public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) { Map<String, String> properties = new HashMap<>(2); properties.put(ManagedTask.IDENTITY_NAME, identityName); return ManagedExecutors.managedTask(task, properties, null); } } }