/*
* Copyright (c) 2002, 2011, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.nio.channels.spi.SelectorProvider;
import java.nio.channels.Selector;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
A multi-threaded implementation of Selector for Windows.
Author: Konstantin Kladko, Mark Reinhold
/**
* A multi-threaded implementation of Selector for Windows.
*
* @author Konstantin Kladko
* @author Mark Reinhold
*/
final class WindowsSelectorImpl extends SelectorImpl {
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private final static int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The global native poll array holds file decriptors and event masks
private PollArrayWrapper pollWrapper;
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;
// Number of helper threads needed for select. We need one thread per
// each additional set of MAX_SELECTABLE_FDS - 1 channels.
private int threadsCount = 0;
// A list of helper threads for select.
private final List<Thread> threads = new ArrayList<Thread>();
//Pipe used as a wakeup object.
private final Pipe wakeupPipe;
// File descriptors corresponding to source and sink
private final int wakeupSourceFd, wakeupSinkFd;
// Maps file descriptors to their indices in pollArray
private final static class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(new Integer(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = new Integer(ski.channel.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel))
return remove(fd);
return null;
}
}
// class for fdMap entries
private final static class MapEntry {
SelectionKeyImpl ski;
long updateCount = 0;
long clearedCount = 0;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
private final FdMap fdMap = new FdMap();
// SubSelector for the main thread
private final SubSelector subSelector = new SubSelector();
private long timeout; //timeout for poll
// Lock for interrupt triggering and clearing
private final Object interruptLock = new Object();
private volatile boolean interruptTriggered = false;
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
// Helper threads wait on this lock for the next poll.
private final StartLock startLock = new StartLock();
private final class StartLock {
// A variable which distinguishes the current run of doSelect from the
// previous one. Incrementing runsCounter and notifying threads will
// trigger another round of poll.
private long runsCounter;
// Triggers threads, waiting on this lock to start polling.
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // wake up threads.
}
// This function is called by a helper thread to wait for the
// next round of poll(). It also checks, if this thread became
// redundant. If yes, it returns true, notifying the thread
// that it should exit.
private synchronized boolean waitForStart(SelectThread thread) {
while (true) {
while (runsCounter == thread.lastRun) {
try {
startLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (thread.index >= threads.size()) { // redundant thread
return true; // will cause run() to exit.
} else {
thread.lastRun = runsCounter; // update lastRun
return false; // will cause run() to poll.
}
}
}
}
// Main thread waits on this lock, until all helper threads are done
// with poll().
private final FinishLock finishLock = new FinishLock();
private final class FinishLock {
// Number of helper threads, that did not finish yet.
private int threadsToFinish;
// IOException which occured during the last run.
IOException exception = null;
// Called before polling.
private void reset() {
threadsToFinish = threads.size(); // helper threads
}
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
// The main thread invokes this function on finishLock to wait
// for helper threads to finish poll().
private synchronized void waitForHelperThreads() {
if (threadsToFinish == threads.size()) {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
Thread.currentThread().interrupt();
}
}
}
// sets IOException for this run
private synchronized void setException(IOException e) {
exception = e;
}
// Checks if there was any exception during the last run.
// If yes, throws it
private void checkForException() throws IOException {
if (exception == null)
return;
StringBuffer message = new StringBuffer("An exception occured" +
" during the execution of select(): \n");
message.append(exception);
message.append('\n');
exception = null;
throw new IOException(message.toString());
}
}
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private SubSelector() {
this.pollArrayIndex = 0; // main thread
}
private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
PollArrayWrapper.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
PollArrayWrapper.POLLCONN |
PollArrayWrapper.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
PollArrayWrapper.POLLIN |
PollArrayWrapper.POLLCONN |
PollArrayWrapper.POLLOUT,
true);
return numKeysUpdated;
}
Note, clearedCount is used to determine if the readyOps have
been reset in this select operation. updateCount is used to
tell if a key has been counted as updated in this select
operation.
me.updateCount <= me.clearedCount <= updateCount
/**
* Note, clearedCount is used to determine if the readyOps have
* been reset in this select operation. updateCount is used to
* tell if a key has been counted as updated in this select
* operation.
*
* me.updateCount <= me.clearedCount <= updateCount
*/
private int processFDSet(long updateCount, int[] fds, int rOps,
boolean isExceptFds) {
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
if (selectedKeys.contains(sk)) { // Key in selected set
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
}
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private int index; // index of this thread
SubSelector subSelector;
private long lastRun = 0; // last run number
// Creates a new thread
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i);
}
}
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
// Sets Windows wakeup socket to a non-signaled state.
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
}
}
private native void resetWakeupSocket0(int wakeupSourceFd);
private native boolean discardUrgentData(int fd);
// We increment this counter on each call to updateSelectedKeys()
// each entry in SubSelector.fdsMap has a memorized value of
// updateCount. When we increment numKeysUpdated we set updateCount
// for the corresponding entry to its current value. This is used to
// avoid counting the same key more than once - the same key can
// appear in readfds and writefds.
private long updateCount = 0;
// Update ops of the corresponding Channels. Add the ready keys to the
// ready queue.
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
Iterator it = threads.iterator();
while (it.hasNext())
numKeysUpdated += ((SelectThread)it.next()).subSelector.
processSelectedKeys(updateCount);
return numKeysUpdated;
}
protected void implClose() throws IOException {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
threads.clear();
// Call startThreads. All remaining helper threads now exit,
// since threads.size() = 0;
startLock.startThreads();
}
}
}
protected void implRegister(SelectionKeyImpl ski) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
}
channelArray[totalChannels - 1] = null;
totalChannels--;
ski.setIndex(-1);
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
void putEventOps(SelectionKeyImpl sk, int ops) {
pollWrapper.putEventOps(sk.getIndex(), ops);
}
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
static {
IOUtil.load();
}
}