/*
 * Copyright (c) 2002, 2011, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */




package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;
import java.nio.channels.Selector;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;

A multi-threaded implementation of Selector for Windows.
Author:Konstantin Kladko, Mark Reinhold
/** * A multi-threaded implementation of Selector for Windows. * * @author Konstantin Kladko * @author Mark Reinhold */
final class WindowsSelectorImpl extends SelectorImpl { // Initial capacity of the poll array private final int INIT_CAP = 8; // Maximum number of sockets for select(). // Should be INIT_CAP times a power of 2 private final static int MAX_SELECTABLE_FDS = 1024; // The list of SelectableChannels serviced by this Selector. Every mod // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll // array, where the corresponding entry is occupied by the wakeupSocket private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; // The global native poll array holds file decriptors and event masks private PollArrayWrapper pollWrapper; // The number of valid entries in poll array, including entries occupied // by wakeup socket handle. private int totalChannels = 1; // Number of helper threads needed for select. We need one thread per // each additional set of MAX_SELECTABLE_FDS - 1 channels. private int threadsCount = 0; // A list of helper threads for select. private final List<Thread> threads = new ArrayList<Thread>(); //Pipe used as a wakeup object. private final Pipe wakeupPipe; // File descriptors corresponding to source and sink private final int wakeupSourceFd, wakeupSinkFd; // Maps file descriptors to their indices in pollArray private final static class FdMap extends HashMap<Integer, MapEntry> { static final long serialVersionUID = 0L; private MapEntry get(int desc) { return get(new Integer(desc)); } private MapEntry put(SelectionKeyImpl ski) { return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski)); } private MapEntry remove(SelectionKeyImpl ski) { Integer fd = new Integer(ski.channel.getFDVal()); MapEntry x = get(fd); if ((x != null) && (x.ski.channel == ski.channel)) return remove(fd); return null; } } // class for fdMap entries private final static class MapEntry { SelectionKeyImpl ski; long updateCount = 0; long clearedCount = 0; MapEntry(SelectionKeyImpl ski) { this.ski = ski; } } private final FdMap fdMap = new FdMap(); // SubSelector for the main thread private final SubSelector subSelector = new SubSelector(); private long timeout; //timeout for poll // Lock for interrupt triggering and clearing private final Object interruptLock = new Object(); private volatile boolean interruptTriggered = false; WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; } // Helper threads wait on this lock for the next poll. private final StartLock startLock = new StartLock(); private final class StartLock { // A variable which distinguishes the current run of doSelect from the // previous one. Incrementing runsCounter and notifying threads will // trigger another round of poll. private long runsCounter; // Triggers threads, waiting on this lock to start polling. private synchronized void startThreads() { runsCounter++; // next run notifyAll(); // wake up threads. } // This function is called by a helper thread to wait for the // next round of poll(). It also checks, if this thread became // redundant. If yes, it returns true, notifying the thread // that it should exit. private synchronized boolean waitForStart(SelectThread thread) { while (true) { while (runsCounter == thread.lastRun) { try { startLock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (thread.index >= threads.size()) { // redundant thread return true; // will cause run() to exit. } else { thread.lastRun = runsCounter; // update lastRun return false; // will cause run() to poll. } } } } // Main thread waits on this lock, until all helper threads are done // with poll(). private final FinishLock finishLock = new FinishLock(); private final class FinishLock { // Number of helper threads, that did not finish yet. private int threadsToFinish; // IOException which occured during the last run. IOException exception = null; // Called before polling. private void reset() { threadsToFinish = threads.size(); // helper threads } // Each helper thread invokes this function on finishLock, when // the thread is done with poll(). private synchronized void threadFinished() { if (threadsToFinish == threads.size()) { // finished poll() first // if finished first, wakeup others wakeup(); } threadsToFinish--; if (threadsToFinish == 0) // all helper threads finished poll(). notify(); // notify the main thread } // The main thread invokes this function on finishLock to wait // for helper threads to finish poll(). private synchronized void waitForHelperThreads() { if (threadsToFinish == threads.size()) { // no helper threads finished yet. Wakeup them up. wakeup(); } while (threadsToFinish != 0) { try { finishLock.wait(); } catch (InterruptedException e) { // Interrupted - set interrupted state. Thread.currentThread().interrupt(); } } } // sets IOException for this run private synchronized void setException(IOException e) { exception = e; } // Checks if there was any exception during the last run. // If yes, throws it private void checkForException() throws IOException { if (exception == null) return; StringBuffer message = new StringBuffer("An exception occured" + " during the execution of select(): \n"); message.append(exception); message.append('\n'); exception = null; throw new IOException(message.toString()); } } private final class SubSelector { private final int pollArrayIndex; // starting index in pollArray to poll // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; private SubSelector() { this.pollArrayIndex = 0; // main thread } private SubSelector(int threadIndex) { // helper threads this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; } private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private int poll(int index) throws IOException { // poll for helper threads return poll0(pollWrapper.pollArrayAddress + (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(MAX_SELECTABLE_FDS, totalChannels - (index + 1) * MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); private int processSelectedKeys(long updateCount) { int numKeysUpdated = 0; numKeysUpdated += processFDSet(updateCount, readFds, PollArrayWrapper.POLLIN, false); numKeysUpdated += processFDSet(updateCount, writeFds, PollArrayWrapper.POLLCONN | PollArrayWrapper.POLLOUT, false); numKeysUpdated += processFDSet(updateCount, exceptFds, PollArrayWrapper.POLLIN | PollArrayWrapper.POLLCONN | PollArrayWrapper.POLLOUT, true); return numKeysUpdated; }
Note, clearedCount is used to determine if the readyOps have been reset in this select operation. updateCount is used to tell if a key has been counted as updated in this select operation. me.updateCount <= me.clearedCount <= updateCount
/** * Note, clearedCount is used to determine if the readyOps have * been reset in this select operation. updateCount is used to * tell if a key has been counted as updated in this select * operation. * * me.updateCount <= me.clearedCount <= updateCount */
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds) { int numKeysUpdated = 0; for (int i = 1; i <= fds[0]; i++) { int desc = fds[i]; if (desc == wakeupSourceFd) { synchronized (interruptLock) { interruptTriggered = true; } continue; } MapEntry me = fdMap.get(desc); // If me is null, the key was deregistered in the previous // processDeregisterQueue. if (me == null) continue; SelectionKeyImpl sk = me.ski; // The descriptor may be in the exceptfds set because there is // OOB data queued to the socket. If there is OOB data then it // is discarded and the key is not added to the selected set. if (isExceptFds && (sk.channel() instanceof SocketChannelImpl) && discardUrgentData(desc)) { continue; } if (selectedKeys.contains(sk)) { // Key in selected set if (me.clearedCount != updateCount) { if (sk.channel.translateAndSetReadyOps(rOps, sk) && (me.updateCount != updateCount)) { me.updateCount = updateCount; numKeysUpdated++; } } else { // The readyOps have been set; now add if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && (me.updateCount != updateCount)) { me.updateCount = updateCount; numKeysUpdated++; } } me.clearedCount = updateCount; } else { // Key is not in selected set yet if (me.clearedCount != updateCount) { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } } else { // The readyOps have been set; now add sk.channel.translateAndUpdateReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } } me.clearedCount = updateCount; } } return numKeysUpdated; } } // Represents a helper thread used for select. private final class SelectThread extends Thread { private int index; // index of this thread SubSelector subSelector; private long lastRun = 0; // last run number // Creates a new thread private SelectThread(int i) { this.index = i; this.subSelector = new SubSelector(i); //make sure we wait for next round of poll this.lastRun = startLock.runsCounter; } public void run() { while (true) { // poll loop // wait for the start of poll. If this thread has become // redundant, then exit. if (startLock.waitForStart(this)) return; // call poll() try { subSelector.poll(index); } catch (IOException e) { // Save this exception and let other threads finish. finishLock.setException(e); } // notify main thread, that this thread has finished, and // wakeup others, if this thread is the first to finish. finishLock.threadFinished(); } } } // After some channels registered/deregistered, the number of required // helper threads may have changed. Adjust this number. private void adjustThreadsCount() { if (threadsCount > threads.size()) { // More threads needed. Start more threads. for (int i = threads.size(); i < threadsCount; i++) { SelectThread newThread = new SelectThread(i); threads.add(newThread); newThread.setDaemon(true); newThread.start(); } } else if (threadsCount < threads.size()) { // Some threads become redundant. Remove them from the threads List. for (int i = threads.size() - 1 ; i >= threadsCount; i--) threads.remove(i); } } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); // Sets Windows wakeup socket to a non-signaled state. private void resetWakeupSocket() { synchronized (interruptLock) { if (interruptTriggered == false) return; resetWakeupSocket0(wakeupSourceFd); interruptTriggered = false; } } private native void resetWakeupSocket0(int wakeupSourceFd); private native boolean discardUrgentData(int fd); // We increment this counter on each call to updateSelectedKeys() // each entry in SubSelector.fdsMap has a memorized value of // updateCount. When we increment numKeysUpdated we set updateCount // for the corresponding entry to its current value. This is used to // avoid counting the same key more than once - the same key can // appear in readfds and writefds. private long updateCount = 0; // Update ops of the corresponding Channels. Add the ready keys to the // ready queue. private int updateSelectedKeys() { updateCount++; int numKeysUpdated = 0; numKeysUpdated += subSelector.processSelectedKeys(updateCount); Iterator it = threads.iterator(); while (it.hasNext()) numKeysUpdated += ((SelectThread)it.next()).subSelector. processSelectedKeys(updateCount); return numKeysUpdated; } protected void implClose() throws IOException { if (channelArray != null) { if (pollWrapper != null) { // prevent further wakeup synchronized (interruptLock) { interruptTriggered = true; } wakeupPipe.sink().close(); wakeupPipe.source().close(); for(int i = 1; i < totalChannels; i++) { // Deregister channels if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent deregister(channelArray[i]); SelectableChannel selch = channelArray[i].channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } } pollWrapper.free(); pollWrapper = null; selectedKeys = null; channelArray = null; threads.clear(); // Call startThreads. All remaining helper threads now exit, // since threads.size() = 0; startLock.startThreads(); } } } protected void implRegister(SelectionKeyImpl ski) { growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } } protected void implDereg(SelectionKeyImpl ski) throws IOException{ int i = ski.getIndex(); assert (i >= 0); if (i != totalChannels - 1) { // Copy end one over it SelectionKeyImpl endChannel = channelArray[totalChannels-1]; channelArray[i] = endChannel; endChannel.setIndex(i); pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i); } channelArray[totalChannels - 1] = null; totalChannels--; ski.setIndex(-1); if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { totalChannels--; threadsCount--; // The last thread has become redundant. } fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys keys.remove(ski); selectedKeys.remove(ski); deregister(ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } void putEventOps(SelectionKeyImpl sk, int ops) { pollWrapper.putEventOps(sk.getIndex(), ops); } public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } static { IOUtil.load(); } }