//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.util.thread;

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

An Executor using preallocated/reserved Threads from a wrapped Executor.

Calls to execute(Runnable) on a ReservedThreadExecutor will either succeed with a Thread immediately being assigned the Runnable task, or fail if no Thread is available.

Threads are reserved lazily, with a new reserved thread being allocated from a wrapped Executor when an execution fails. If the setIdleTimeout(long, TimeUnit) is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread whenever it has been idle for that period.

/** * An Executor using preallocated/reserved Threads from a wrapped Executor. * <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed * with a Thread immediately being assigned the Runnable task, or fail if no Thread is * available. * <p>Threads are reserved lazily, with a new reserved thread being allocated from a * wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)} * is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread * whenever it has been idle for that period. */
@ManagedObject("A pool for reserved threads") public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor { private static final Logger LOG = LoggerFactory.getLogger(ReservedThreadExecutor.class); private static final Runnable STOP = new Runnable() { @Override public void run() { } @Override public String toString() { return "STOP!"; } }; private final Executor _executor; private final int _capacity; private final ConcurrentLinkedDeque<ReservedThread> _stack; private final AtomicInteger _size = new AtomicInteger(); private final AtomicInteger _pending = new AtomicInteger(); private ThreadPoolBudget.Lease _lease; private long _idleTime = 1L; private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
Params:
  • executor – The executor to use to obtain threads
  • capacity – The number of threads to preallocate. If less than 0 then capacity is calculated based on a heuristic from the number of available processors and thread pool size.
/** * @param executor The executor to use to obtain threads * @param capacity The number of threads to preallocate. If less than 0 then capacity * is calculated based on a heuristic from the number of available processors and * thread pool size. */
public ReservedThreadExecutor(Executor executor, int capacity) { _executor = executor; _capacity = reservedThreads(executor, capacity); _stack = new ConcurrentLinkedDeque<>(); if (LOG.isDebugEnabled()) LOG.debug("{}", this); }
Params:
  • executor – The executor to use to obtain threads
  • capacity – The number of threads to preallocate, If less than 0 then capacity is calculated based on a heuristic from the number of available processors and thread pool size.
Returns:the number of reserved threads that would be used by a ReservedThreadExecutor constructed with these arguments.
/** * @param executor The executor to use to obtain threads * @param capacity The number of threads to preallocate, If less than 0 then capacity * is calculated based on a heuristic from the number of available processors and * thread pool size. * @return the number of reserved threads that would be used by a ReservedThreadExecutor * constructed with these arguments. */
private static int reservedThreads(Executor executor, int capacity) { if (capacity >= 0) return capacity; int cpus = ProcessorUtils.availableProcessors(); if (executor instanceof ThreadPool.SizedThreadPool) { int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); return Math.max(1, Math.min(cpus, threads / 10)); } return cpus; } public Executor getExecutor() { return _executor; } @ManagedAttribute(value = "max number of reserved threads", readonly = true) public int getCapacity() { return _capacity; } @ManagedAttribute(value = "available reserved threads", readonly = true) public int getAvailable() { return _stack.size(); } @ManagedAttribute(value = "pending reserved threads", readonly = true) public int getPending() { return _pending.get(); } @ManagedAttribute(value = "idletimeout in MS", readonly = true) public long getIdleTimeoutMs() { if (_idleTimeUnit == null) return 0; return _idleTimeUnit.toMillis(_idleTime); }
Set the idle timeout for shrinking the reserved thread pool
Params:
  • idleTime – Time to wait before shrinking, or 0 for no timeout.
  • idleTimeUnit – Time units for idle timeout
/** * Set the idle timeout for shrinking the reserved thread pool * * @param idleTime Time to wait before shrinking, or 0 for no timeout. * @param idleTimeUnit Time units for idle timeout */
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit) { if (isRunning()) throw new IllegalStateException(); _idleTime = idleTime; _idleTimeUnit = idleTimeUnit; } @Override public void doStart() throws Exception { _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity); _size.set(0); super.doStart(); } @Override public void doStop() throws Exception { if (_lease != null) _lease.close(); super.doStop(); while (true) { int size = _size.get(); // If no reserved threads left try setting size to -1 to // atomically prevent other threads adding themselves to stack. if (size == 0 && _size.compareAndSet(size, -1)) break; ReservedThread thread = _stack.pollFirst(); if (thread == null) { // Reserved thread must have incremented size but not yet added itself to queue. // We will spin until it is added. Thread.onSpinWait(); continue; } _size.decrementAndGet(); thread.stop(); } } @Override public void execute(Runnable task) throws RejectedExecutionException { _executor.execute(task); }
Params:
  • task – The task to run
Returns:True iff a reserved thread was available and has been assigned the task to run.
/** * @param task The task to run * @return True iff a reserved thread was available and has been assigned the task to run. */
@Override public boolean tryExecute(Runnable task) { if (LOG.isDebugEnabled()) LOG.debug("{} tryExecute {}", this, task); if (task == null) return false; ReservedThread thread = _stack.pollFirst(); if (thread == null) { if (task != STOP) startReservedThread(); return false; } int size = _size.decrementAndGet(); if (!thread.offer(task)) return false; if (size == 0 && task != STOP) startReservedThread(); return true; } private void startReservedThread() { try { while (true) { // Not atomic, but there is a re-check in ReservedThread.run(). int pending = _pending.get(); int size = _size.get(); if (pending + size >= _capacity) return; if (_pending.compareAndSet(pending, pending + 1)) { if (LOG.isDebugEnabled()) LOG.debug("{} startReservedThread p={}", this, pending + 1); _executor.execute(new ReservedThread()); return; } } } catch (RejectedExecutionException e) { LOG.trace("IGNORED", e); } } @Override public String toString() { return String.format("%s@%x{s=%d/%d,p=%d}", getClass().getSimpleName(), hashCode(), _size.get(), _capacity, _pending.get()); } private class ReservedThread implements Runnable { private final SynchronousQueue<Runnable> _task = new SynchronousQueue<>(); private boolean _starting = true; public boolean offer(Runnable task) { if (LOG.isDebugEnabled()) LOG.debug("{} offer {}", this, task); try { _task.put(task); return true; } catch (Throwable e) { LOG.trace("IGNORED", e); _size.getAndIncrement(); _stack.offerFirst(this); return false; } } public void stop() { offer(STOP); } private Runnable reservedWait() { if (LOG.isDebugEnabled()) LOG.debug("{} waiting", this); while (true) { try { Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit); if (LOG.isDebugEnabled()) LOG.debug("{} task={}", this, task); if (task != null) return task; if (_stack.remove(this)) { if (LOG.isDebugEnabled()) LOG.debug("{} IDLE", this); _size.decrementAndGet(); return STOP; } } catch (InterruptedException e) { LOG.trace("IGNORED", e); } } } @Override public void run() { while (isRunning()) { // test and increment size BEFORE decrementing pending, // so that we don't have a race starting new pending. int size = _size.get(); // Are we stopped? if (size < 0) return; // Are we surplus to capacity? if (size >= _capacity) { if (LOG.isDebugEnabled()) LOG.debug("{} size {} > capacity {}", this, size, _capacity); if (_starting) _pending.decrementAndGet(); return; } // If we cannot update size then recalculate if (!_size.compareAndSet(size, size + 1)) continue; if (_starting) { if (LOG.isDebugEnabled()) LOG.debug("{} started", this); _pending.decrementAndGet(); _starting = false; } // Insert ourselves in the stack. Size is already incremented, but // that only effects the decision to keep other threads reserved. _stack.offerFirst(this); // Once added to the stack, we must always wait for a job on the _task Queue // and never return early, else we may leave a thread blocked offering a _task. Runnable task = reservedWait(); if (task == STOP) // return on STOP poison pill break; // Run the task try { task.run(); } catch (Throwable e) { LOG.warn("Unable to run task", e); } } if (LOG.isDebugEnabled()) LOG.debug("{} Exited", this); } @Override public String toString() { return String.format("%s@%x", ReservedThreadExecutor.this, hashCode()); } } }