/*
 * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package sun.nio.ch;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

A multi-threaded implementation of Selector for Windows.
Author:Konstantin Kladko, Mark Reinhold
/** * A multi-threaded implementation of Selector for Windows. * * @author Konstantin Kladko * @author Mark Reinhold */
class WindowsSelectorImpl extends SelectorImpl { // Initial capacity of the poll array private final int INIT_CAP = 8; // Maximum number of sockets for select(). // Should be INIT_CAP times a power of 2 private static final int MAX_SELECTABLE_FDS = 1024; // The list of SelectableChannels serviced by this Selector. Every mod // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll // array, where the corresponding entry is occupied by the wakeupSocket private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; // The global native poll array holds file decriptors and event masks private PollArrayWrapper pollWrapper; // The number of valid entries in poll array, including entries occupied // by wakeup socket handle. private int totalChannels = 1; // Number of helper threads needed for select. We need one thread per // each additional set of MAX_SELECTABLE_FDS - 1 channels. private int threadsCount = 0; // A list of helper threads for select. private final List<SelectThread> threads = new ArrayList<SelectThread>(); //Pipe used as a wakeup object. private final Pipe wakeupPipe; // File descriptors corresponding to source and sink private final int wakeupSourceFd, wakeupSinkFd; // Maps file descriptors to their indices in pollArray private static final class FdMap extends HashMap<Integer, MapEntry> { static final long serialVersionUID = 0L; private MapEntry get(int desc) { return get(Integer.valueOf(desc)); } private MapEntry put(SelectionKeyImpl ski) { return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski)); } private MapEntry remove(SelectionKeyImpl ski) { Integer fd = Integer.valueOf(ski.getFDVal()); MapEntry x = get(fd); if ((x != null) && (x.ski.channel() == ski.channel())) return remove(fd); return null; } } // class for fdMap entries private static final class MapEntry { final SelectionKeyImpl ski; long updateCount = 0; MapEntry(SelectionKeyImpl ski) { this.ski = ski; } } private final FdMap fdMap = new FdMap(); // SubSelector for the main thread private final SubSelector subSelector = new SubSelector(); private long timeout; //timeout for poll // Lock for interrupt triggering and clearing private final Object interruptLock = new Object(); private volatile boolean interruptTriggered; // pending new registrations/updates, queued by implRegister and setEventOps private final Object updateLock = new Object(); private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } private void ensureOpen() { if (!isOpen()) throw new ClosedSelectorException(); } @Override protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this); this.timeout = timeout; // set selector timeout processUpdateQueue(); processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(action); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; }
Process new registrations and changes to the interest ops.
/** * Process new registrations and changes to the interest ops. */
private void processUpdateQueue() { assert Thread.holdsLock(this); synchronized (updateLock) { SelectionKeyImpl ski; // new registrations while ((ski = newKeys.pollFirst()) != null) { if (ski.isValid()) { growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); pollWrapper.putEntry(totalChannels, ski); totalChannels++; MapEntry previous = fdMap.put(ski); assert previous == null; } } // changes to interest ops while ((ski = updateKeys.pollFirst()) != null) { int events = ski.translateInterestOps(); int fd = ski.getFDVal(); if (ski.isValid() && fdMap.containsKey(fd)) { int index = ski.getIndex(); assert index >= 0 && index < totalChannels; pollWrapper.putEventOps(index, events); } } } } // Helper threads wait on this lock for the next poll. private final StartLock startLock = new StartLock(); private final class StartLock { // A variable which distinguishes the current run of doSelect from the // previous one. Incrementing runsCounter and notifying threads will // trigger another round of poll. private long runsCounter; // Triggers threads, waiting on this lock to start polling. private synchronized void startThreads() { runsCounter++; // next run notifyAll(); // wake up threads. } // This function is called by a helper thread to wait for the // next round of poll(). It also checks, if this thread became // redundant. If yes, it returns true, notifying the thread // that it should exit. private synchronized boolean waitForStart(SelectThread thread) { while (true) { while (runsCounter == thread.lastRun) { try { startLock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (thread.isZombie()) { // redundant thread return true; // will cause run() to exit. } else { thread.lastRun = runsCounter; // update lastRun return false; // will cause run() to poll. } } } } // Main thread waits on this lock, until all helper threads are done // with poll(). private final FinishLock finishLock = new FinishLock(); private final class FinishLock { // Number of helper threads, that did not finish yet. private int threadsToFinish; // IOException which occurred during the last run. IOException exception = null; // Called before polling. private void reset() { threadsToFinish = threads.size(); // helper threads } // Each helper thread invokes this function on finishLock, when // the thread is done with poll(). private synchronized void threadFinished() { if (threadsToFinish == threads.size()) { // finished poll() first // if finished first, wakeup others wakeup(); } threadsToFinish--; if (threadsToFinish == 0) // all helper threads finished poll(). notify(); // notify the main thread } // The main thread invokes this function on finishLock to wait // for helper threads to finish poll(). private synchronized void waitForHelperThreads() { if (threadsToFinish == threads.size()) { // no helper threads finished yet. Wakeup them up. wakeup(); } while (threadsToFinish != 0) { try { finishLock.wait(); } catch (InterruptedException e) { // Interrupted - set interrupted state. Thread.currentThread().interrupt(); } } } // sets IOException for this run private synchronized void setException(IOException e) { exception = e; } // Checks if there was any exception during the last run. // If yes, throws it private void checkForException() throws IOException { if (exception == null) return; StringBuffer message = new StringBuffer("An exception occurred" + " during the execution of select(): \n"); message.append(exception); message.append('\n'); exception = null; throw new IOException(message.toString()); } } private final class SubSelector { private final int pollArrayIndex; // starting index in pollArray to poll // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; private SubSelector() { this.pollArrayIndex = 0; // main thread } private SubSelector(int threadIndex) { // helper threads this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; } private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private int poll(int index) throws IOException { // poll for helper threads return poll0(pollWrapper.pollArrayAddress + (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(MAX_SELECTABLE_FDS, totalChannels - (index + 1) * MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) { int numKeysUpdated = 0; numKeysUpdated += processFDSet(updateCount, action, readFds, Net.POLLIN, false); numKeysUpdated += processFDSet(updateCount, action, writeFds, Net.POLLCONN | Net.POLLOUT, false); numKeysUpdated += processFDSet(updateCount, action, exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true); return numKeysUpdated; }
updateCount is used to tell if a key has been counted as updated in this select operation. me.updateCount <= updateCount
/** * updateCount is used to tell if a key has been counted as updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount, Consumer<SelectionKey> action, int[] fds, int rOps, boolean isExceptFds) { int numKeysUpdated = 0; for (int i = 1; i <= fds[0]; i++) { int desc = fds[i]; if (desc == wakeupSourceFd) { synchronized (interruptLock) { interruptTriggered = true; } continue; } MapEntry me = fdMap.get(desc); // If me is null, the key was deregistered in the previous // processDeregisterQueue. if (me == null) continue; SelectionKeyImpl sk = me.ski; // The descriptor may be in the exceptfds set because there is // OOB data queued to the socket. If there is OOB data then it // is discarded and the key is not added to the selected set. if (isExceptFds && (sk.channel() instanceof SocketChannelImpl) && discardUrgentData(desc)) { continue; } int updated = processReadyEvents(rOps, sk, action); if (updated > 0 && me.updateCount != updateCount) { me.updateCount = updateCount; numKeysUpdated++; } } return numKeysUpdated; } } // Represents a helper thread used for select. private final class SelectThread extends Thread { private final int index; // index of this thread final SubSelector subSelector; private long lastRun = 0; // last run number private volatile boolean zombie; // Creates a new thread private SelectThread(int i) { super(null, null, "SelectorHelper", 0, false); this.index = i; this.subSelector = new SubSelector(i); //make sure we wait for next round of poll this.lastRun = startLock.runsCounter; } void makeZombie() { zombie = true; } boolean isZombie() { return zombie; } public void run() { while (true) { // poll loop // wait for the start of poll. If this thread has become // redundant, then exit. if (startLock.waitForStart(this)) return; // call poll() try { subSelector.poll(index); } catch (IOException e) { // Save this exception and let other threads finish. finishLock.setException(e); } // notify main thread, that this thread has finished, and // wakeup others, if this thread is the first to finish. finishLock.threadFinished(); } } } // After some channels registered/deregistered, the number of required // helper threads may have changed. Adjust this number. private void adjustThreadsCount() { if (threadsCount > threads.size()) { // More threads needed. Start more threads. for (int i = threads.size(); i < threadsCount; i++) { SelectThread newThread = new SelectThread(i); threads.add(newThread); newThread.setDaemon(true); newThread.start(); } } else if (threadsCount < threads.size()) { // Some threads become redundant. Remove them from the threads List. for (int i = threads.size() - 1 ; i >= threadsCount; i--) threads.remove(i).makeZombie(); } } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); // Sets Windows wakeup socket to a non-signaled state. private void resetWakeupSocket() { synchronized (interruptLock) { if (interruptTriggered == false) return; resetWakeupSocket0(wakeupSourceFd); interruptTriggered = false; } } private native void resetWakeupSocket0(int wakeupSourceFd); private native boolean discardUrgentData(int fd); // We increment this counter on each call to updateSelectedKeys() // each entry in SubSelector.fdsMap has a memorized value of // updateCount. When we increment numKeysUpdated we set updateCount // for the corresponding entry to its current value. This is used to // avoid counting the same key more than once - the same key can // appear in readfds and writefds. private long updateCount = 0; // Update ops of the corresponding Channels. Add the ready keys to the // ready queue. private int updateSelectedKeys(Consumer<SelectionKey> action) { updateCount++; int numKeysUpdated = 0; numKeysUpdated += subSelector.processSelectedKeys(updateCount, action); for (SelectThread t: threads) { numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action); } return numKeysUpdated; } @Override protected void implClose() throws IOException { assert !isOpen(); assert Thread.holdsLock(this); // prevent further wakeup synchronized (interruptLock) { interruptTriggered = true; } wakeupPipe.sink().close(); wakeupPipe.source().close(); pollWrapper.free(); // Make all remaining helper threads exit for (SelectThread t: threads) t.makeZombie(); startLock.startThreads(); } @Override protected void implRegister(SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { newKeys.addLast(ski); } } private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } } @Override protected void implDereg(SelectionKeyImpl ski) { assert !ski.isValid(); assert Thread.holdsLock(this); if (fdMap.remove(ski) != null) { int i = ski.getIndex(); assert (i >= 0); if (i != totalChannels - 1) { // Copy end one over it SelectionKeyImpl endChannel = channelArray[totalChannels-1]; channelArray[i] = endChannel; endChannel.setIndex(i); pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i); } ski.setIndex(-1); channelArray[totalChannels - 1] = null; totalChannels--; if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { totalChannels--; threadsCount--; // The last thread has become redundant. } } } @Override public void setEventOps(SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } } @Override public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } static { IOUtil.load(); } }