package org.glassfish.grizzly.nio;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.impl.FutureImpl;
import org.glassfish.grizzly.impl.SafeFutureImpl;
import org.glassfish.grizzly.utils.Futures;
public class DefaultSelectorHandler implements SelectorHandler {
private static final long DEFAULT_SELECT_TIMEOUT_MILLIS = 30000;
private static final Logger logger = Grizzly.logger(DefaultSelectorHandler.class);
public static final boolean IS_WORKAROUND_SELECTOR_SPIN = Boolean.getBoolean(DefaultSelectorHandler.class.getName() + ".force-selector-spin-detection")
|| System.getProperty("os.name").equalsIgnoreCase("linux");
protected final long selectTimeout;
private static final int SPIN_RATE_THRESHOLD = 2000;
public DefaultSelectorHandler() {
this(DEFAULT_SELECT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
public DefaultSelectorHandler(final long selectTimeout, final TimeUnit timeunit) {
this.selectTimeout = TimeUnit.MILLISECONDS.convert(selectTimeout, timeunit);
}
@Override
public long getSelectTimeout() {
return selectTimeout;
}
@Override
public boolean preSelect(final SelectorRunner selectorRunner) throws IOException {
return processPendingTasks(selectorRunner);
}
@Override
public Set<SelectionKey> select(final SelectorRunner selectorRunner) throws IOException {
final Selector selector = selectorRunner.getSelector();
final boolean hasPostponedTasks = !selectorRunner.getPostponedTasks().isEmpty();
if (!hasPostponedTasks) {
selector.select(selectTimeout);
} else {
selector.selectNow();
}
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (IS_WORKAROUND_SELECTOR_SPIN) {
selectorRunner.checkSelectorSpin(!selectedKeys.isEmpty() || hasPostponedTasks, SPIN_RATE_THRESHOLD);
}
return selectedKeys;
}
@Override
public void postSelect(final SelectorRunner selectorRunner) throws IOException {
}
@Override
public void registerKeyInterest(final SelectorRunner selectorRunner, final SelectionKey key, final int interest) throws IOException {
if (isSelectorRunnerThread(selectorRunner)) {
registerKey0(key, interest);
} else {
selectorRunner.addPendingTask(new RegisterKeyTask(key, interest));
}
}
private static void registerKey0(final SelectionKey selectionKey, final int interest) {
if (selectionKey.isValid()) {
final int currentOps = selectionKey.interestOps();
if ((currentOps & interest) != interest) {
selectionKey.interestOps(currentOps | interest);
}
}
}
@Override
public void deregisterKeyInterest(final SelectorRunner selectorRunner, final SelectionKey key, final int interest) throws IOException {
if (key.isValid()) {
final int currentOps = key.interestOps();
if ((currentOps & interest) != 0) {
key.interestOps(currentOps & ~interest);
}
}
}
@Override
public void registerChannel(final SelectorRunner selectorRunner, final SelectableChannel channel, final int interest, final Object attachment)
throws IOException {
final FutureImpl<RegisterChannelResult> future = SafeFutureImpl.create();
registerChannelAsync(selectorRunner, channel, interest, attachment, Futures.toCompletionHandler(future));
try {
future.get(selectTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
@Override
public void registerChannelAsync(SelectorRunner selectorRunner, SelectableChannel channel, int interest, Object attachment,
CompletionHandler<RegisterChannelResult> completionHandler) {
if (isSelectorRunnerThread(selectorRunner)) {
registerChannel0(selectorRunner, channel, interest, attachment, completionHandler);
} else {
addPendingTask(selectorRunner, new RegisterChannelOperation(channel, interest, attachment, completionHandler));
}
}
@Override
public void deregisterChannel(final SelectorRunner selectorRunner, final SelectableChannel channel) throws IOException {
final FutureImpl<RegisterChannelResult> future = SafeFutureImpl.create();
deregisterChannelAsync(selectorRunner, channel, Futures.toCompletionHandler(future));
try {
future.get(selectTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
@Override
public void deregisterChannelAsync(final SelectorRunner selectorRunner, final SelectableChannel channel,
final CompletionHandler<RegisterChannelResult> completionHandler) {
if (isSelectorRunnerThread(selectorRunner)) {
deregisterChannel0(selectorRunner, channel, completionHandler);
} else {
addPendingTask(selectorRunner, new DeregisterChannelOperation(channel, completionHandler));
}
}
@Override
public void execute(final SelectorRunner selectorRunner, final Task task, final CompletionHandler<Task> completionHandler) {
if (isSelectorRunnerThread(selectorRunner)) {
try {
task.run();
if (completionHandler != null) {
completionHandler.completed(task);
}
} catch (Exception e) {
if (completionHandler != null) {
completionHandler.failed(e);
}
}
} else {
addPendingTask(selectorRunner, new RunnableTask(task, completionHandler));
}
}
@Override
public void enque(final SelectorRunner selectorRunner, final Task task, final CompletionHandler<Task> completionHandler) {
if (isSelectorRunnerThread(selectorRunner)) {
final Queue<SelectorHandlerTask> postponedTasks = selectorRunner.getPostponedTasks();
postponedTasks.offer(new RunnableTask(task, completionHandler));
} else {
addPendingTask(selectorRunner, new RunnableTask(task, completionHandler));
}
}
private void addPendingTask(final SelectorRunner selectorRunner, final SelectorHandlerTask task) {
if (selectorRunner == null) {
task.cancel();
return;
}
selectorRunner.addPendingTask(task);
if (selectorRunner.isStop() && selectorRunner.getPendingTasks().remove(task)) {
task.cancel();
}
}
private boolean processPendingTasks(final SelectorRunner selectorRunner) throws IOException {
return processPendingTaskQueue(selectorRunner, selectorRunner.obtainPostponedTasks())
&& (!selectorRunner.hasPendingTasks || processPendingTaskQueue(selectorRunner, selectorRunner.getPendingTasks()));
}
private boolean processPendingTaskQueue(final SelectorRunner selectorRunner, final Queue<SelectorHandlerTask> selectorHandlerTasks) throws IOException {
SelectorHandlerTask selectorHandlerTask;
while ((selectorHandlerTask = selectorHandlerTasks.poll()) != null) {
if (!selectorHandlerTask.run(selectorRunner)) {
return false;
}
}
return true;
}
private static void registerChannel0(final SelectorRunner selectorRunner, final SelectableChannel channel, final int interest, final Object attachment,
final CompletionHandler<RegisterChannelResult> completionHandler) {
try {
if (channel.isOpen()) {
final Selector selector = selectorRunner.getSelector();
final SelectionKey key = channel.keyFor(selector);
if (key == null || key.isValid()) {
final SelectionKey registeredSelectionKey = channel.register(selector, interest, attachment);
selectorRunner.getTransport().getSelectionKeyHandler().onKeyRegistered(registeredSelectionKey);
final RegisterChannelResult result = new RegisterChannelResult(selectorRunner, registeredSelectionKey, channel);
if (completionHandler != null) {
completionHandler.completed(result);
}
} else {
final Queue<SelectorHandlerTask> postponedTasks = selectorRunner.getPostponedTasks();
final RegisterChannelOperation operation = new RegisterChannelOperation(channel, interest, attachment, completionHandler);
postponedTasks.add(operation);
}
} else {
failChannelRegistration(completionHandler, new ClosedChannelException());
}
} catch (IOException e) {
failChannelRegistration(completionHandler, e);
}
}
private static void failChannelRegistration(final CompletionHandler<RegisterChannelResult> completionHandler, final Throwable error) {
if (completionHandler != null) {
completionHandler.failed(error);
}
}
private static void deregisterChannel0(final SelectorRunner selectorRunner, final SelectableChannel channel,
final CompletionHandler<RegisterChannelResult> completionHandler) {
final Throwable error;
try {
if (channel.isOpen()) {
final Selector selector = selectorRunner.getSelector();
final SelectionKey key = channel.keyFor(selector);
if (key != null) {
selectorRunner.getTransport().getSelectionKeyHandler().cancel(key);
selectorRunner.getTransport().getSelectionKeyHandler().onKeyDeregistered(key);
final RegisterChannelResult result = new RegisterChannelResult(selectorRunner, key, channel);
if (completionHandler != null) {
completionHandler.completed(result);
}
return;
}
error = new IllegalStateException("Channel is not registered");
} else {
error = new ClosedChannelException();
}
Futures.notifyFailure(null, completionHandler, error);
} catch (IOException e) {
Futures.notifyFailure(null, completionHandler, e);
}
}
@Override
public boolean onSelectorClosed(final SelectorRunner selectorRunner) {
try {
selectorRunner.workaroundSelectorSpin();
return true;
} catch (Exception e) {
return false;
}
}
private static boolean isSelectorRunnerThread(final SelectorRunner selectorRunner) {
return selectorRunner != null && Thread.currentThread() == selectorRunner.getRunnerThread();
}
protected static final class RegisterKeyTask implements SelectorHandlerTask {
private final SelectionKey selectionKey;
private final int interest;
public RegisterKeyTask(final SelectionKey selectionKey, final int interest) {
this.selectionKey = selectionKey;
this.interest = interest;
}
@Override
public boolean run(final SelectorRunner selectorRunner) throws IOException {
SelectionKey localSelectionKey = selectionKey;
if (IS_WORKAROUND_SELECTOR_SPIN) {
localSelectionKey = selectorRunner.checkIfSpinnedKey(selectionKey);
}
registerKey0(localSelectionKey, interest);
return true;
}
@Override
public void cancel() {
}
}
protected static final class RegisterChannelOperation implements SelectorHandlerTask {
private final SelectableChannel channel;
private final int interest;
private final Object attachment;
private final CompletionHandler<RegisterChannelResult> completionHandler;
private RegisterChannelOperation(final SelectableChannel channel, final int interest, final Object attachment,
final CompletionHandler<RegisterChannelResult> completionHandler) {
this.channel = channel;
this.interest = interest;
this.attachment = attachment;
this.completionHandler = completionHandler;
}
@Override
public boolean run(final SelectorRunner selectorRunner) throws IOException {
registerChannel0(selectorRunner, channel, interest, attachment, completionHandler);
return true;
}
@Override
public void cancel() {
if (completionHandler != null) {
completionHandler.failed(new IOException("Selector is closed"));
}
}
}
protected static final class RunnableTask implements SelectorHandlerTask {
private final Task task;
private final CompletionHandler<Task> completionHandler;
private RunnableTask(final Task task, final CompletionHandler<Task> completionHandler) {
this.task = task;
this.completionHandler = completionHandler;
}
@Override
public boolean run(final SelectorRunner selectorRunner) throws IOException {
boolean continueExecution = true;
try {
continueExecution = task.run();
if (completionHandler != null) {
completionHandler.completed(task);
}
} catch (Throwable t) {
logger.log(Level.FINEST, "doExecutePendiongIO failed.", t);
if (completionHandler != null) {
completionHandler.failed(t);
}
}
return continueExecution;
}
@Override
public void cancel() {
if (completionHandler != null) {
completionHandler.failed(new IOException("Selector is closed"));
}
}
}
protected static final class DeregisterChannelOperation implements SelectorHandlerTask {
private final SelectableChannel channel;
private final CompletionHandler<RegisterChannelResult> completionHandler;
private DeregisterChannelOperation(final SelectableChannel channel, final CompletionHandler<RegisterChannelResult> completionHandler) {
this.channel = channel;
this.completionHandler = completionHandler;
}
@Override
public boolean run(final SelectorRunner selectorRunner) throws IOException {
deregisterChannel0(selectorRunner, channel, completionHandler);
return true;
}
@Override
public void cancel() {
if (completionHandler != null) {
completionHandler.failed(new IOException("Selector is closed"));
}
}
}
}