/*
 * Copyright (c) 2010, 2017 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.grizzly.threadpool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

ExecutorService implementation, which function the similar way as former Grizzly 1.x Pipeline based thread pools. The SyncThreadPool is synchronized similar way as Grizzly 1.x Pipeline, which makes thread pool more accurate when deciding to create or not additional worker threads.
Author:Alexey Stashok
/** * {@link ExecutorService} implementation, which function the similar way as * former Grizzly 1.x Pipeline based thread pools. * * The <tt>SyncThreadPool</tt> is synchronized similar way as Grizzly 1.x Pipeline, * which makes thread pool more accurate when deciding to create or not * additional worker threads. * * * @author Alexey Stashok */
public class SyncThreadPool extends AbstractThreadPool { private final Queue<Runnable> workQueue; protected int maxQueuedTasks = -1; private int currentPoolSize; private int activeThreadsCount; /** * */ public SyncThreadPool(ThreadPoolConfig config) { super(config); if (config.getKeepAliveTime(TimeUnit.MILLISECONDS) < 0) { throw new IllegalArgumentException("keepAliveTime < 0"); } workQueue = config.getQueue() != null ? config.getQueue() : config.setQueue(new LinkedList<Runnable>()).getQueue(); this.maxQueuedTasks = config.getQueueLimit(); final int corePoolSize = config.getCorePoolSize(); while (currentPoolSize < corePoolSize) { startWorker(new SyncThreadWorker(true)); } ProbeNotifier.notifyThreadPoolStarted(this); } @Override public void execute(Runnable task) { if (task == null) { throw new IllegalArgumentException("Runnable task is null"); } synchronized (stateLock) { if (!running) { throw new RejectedExecutionException("ThreadPool is not running"); } final int workQueueSize = workQueue.size() + 1; if ((maxQueuedTasks < 0 || workQueueSize <= maxQueuedTasks) && workQueue.offer(task)) { onTaskQueued(task); } else { onTaskQueueOverflow(); assert false; // should not reach this point } final int idleThreadsNumber = currentPoolSize - activeThreadsCount; if (idleThreadsNumber >= workQueueSize) { stateLock.notify(); return; } if (currentPoolSize < config.getMaxPoolSize()) { final boolean isCore = (currentPoolSize < config.getCorePoolSize()); startWorker(new SyncThreadWorker(isCore)); if (currentPoolSize == config.getMaxPoolSize()) { onMaxNumberOfThreadsReached(); } } } } @Override protected void startWorker(Worker worker) { synchronized (stateLock) { super.startWorker(worker); activeThreadsCount++; currentPoolSize++; } } @Override protected void onWorkerExit(Worker worker) { super.onWorkerExit(worker); synchronized (stateLock) { currentPoolSize--; activeThreadsCount--; } } @Override protected void poisonAll() { int size = currentPoolSize; final Queue<Runnable> q = getQueue(); while (size-- > 0) { q.offer(poison); } } @Override public String toString() { synchronized (stateLock) { return super.toString() + ", max-queue-size=" + maxQueuedTasks; } } protected class SyncThreadWorker extends Worker { private final boolean core; public SyncThreadWorker(boolean core) { this.core = core; } @Override protected Runnable getTask() throws InterruptedException { synchronized (stateLock) { activeThreadsCount--; try { if (!running || (!core && currentPoolSize > config.getMaxPoolSize())) { // if maxpoolsize becomes lower during runtime we kill of the return null; } Runnable r = workQueue.poll(); if (r != null) { return r; } long keepAliveMillis = config.getKeepAliveTime(TimeUnit.MILLISECONDS); final boolean hasKeepAlive = !core && keepAliveMillis >= 0; long endTime = -1; if (hasKeepAlive) { endTime = System.currentTimeMillis() + keepAliveMillis; } do { if (!hasKeepAlive) { stateLock.wait(); } else { stateLock.wait(keepAliveMillis); } r = workQueue.poll(); if (r != null) { return r; } // Less than 20 millis remainder will consider as keepalive timeout if (!running) { return null; } else if (hasKeepAlive) { keepAliveMillis = endTime - System.currentTimeMillis(); if (keepAliveMillis < 20) { return null; } } } while (true); } finally { activeThreadsCount++; } } } } }