/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.util.net;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
import org.apache.tomcat.util.res.StringManager;

public class NioBlockingSelector {

    private static final Log log = LogFactory.getLog(NioBlockingSelector.class);
    protected static final StringManager sm = StringManager.getManager(NioBlockingSelector.class);

    private final SynchronizedStack<KeyReference> keyReferenceStack =
            new SynchronizedStack<>();

    protected Selector sharedSelector;

    protected BlockPoller poller;

    public void open(String name, Selector selector) {
        sharedSelector = selector;
        poller = new BlockPoller();
        poller.selector = sharedSelector;
        poller.setDaemon(true);
        poller.setName(name + "-BlockPoller");
        poller.start();
    }

    public void close() {
        if (poller != null) {
            poller.disable();
            poller.interrupt();
            poller = null;
        }
    }

    
Performs a blocking write using the byte buffer for data to be written If the selector parameter is null, then it will perform a busy write that could take up a lot of CPU cycles.
Params:
  • buf – ByteBuffer - the buffer containing the data, we will write as long as (buf.hasRemaining()==true)
  • socket – SocketChannel - the socket to write data to
  • writeTimeout – long - the timeout for this write operation in milliseconds, -1 means no timeout
Throws:
Returns:the number of bytes written
/** * Performs a blocking write using the byte buffer for data to be written * If the <code>selector</code> parameter is null, then it will perform a busy write that could * take up a lot of CPU cycles. * * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code> * @param socket SocketChannel - the socket to write data to * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout * @return the number of bytes written * @throws EOFException if write returns -1 * @throws SocketTimeoutException if the write times out * @throws IOException if an IO Exception occurs in the underlying socket logic */
public int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); if (key == null) { throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered")); } KeyReference reference = keyReferenceStack.pop(); if (reference == null) { reference = new KeyReference(); } NioSocketWrapper att = (NioSocketWrapper) key.attachment(); int written = 0; boolean timedout = false; int keycount = 1; //assume we can write long time = System.currentTimeMillis(); //start the timeout timer try { while (!timedout && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write int cnt = socket.write(buf); //write the data if (cnt == -1) { throw new EOFException(); } written += cnt; if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer continue; //we successfully wrote, try again without a selector } } try { if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) { att.startWriteLatch(1); } poller.add(att, SelectionKey.OP_WRITE, reference); att.awaitWriteLatch(AbstractEndpoint.toTimeout(writeTimeout), TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { // Ignore } if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) { //we got interrupted, but we haven't received notification from the poller. keycount = 0; } else { //latch countdown has happened keycount = 1; att.resetWriteLatch(); } if (writeTimeout > 0 && (keycount == 0)) { timedout = (System.currentTimeMillis() - time) >= writeTimeout; } } if (timedout) { throw new SocketTimeoutException(); } } finally { poller.remove(att, SelectionKey.OP_WRITE); if (timedout && reference.key != null) { poller.cancelKey(reference.key); } reference.key = null; keyReferenceStack.push(reference); } return written; }
Performs a blocking read using the bytebuffer for data to be read If the selector parameter is null, then it will perform a busy read that could take up a lot of CPU cycles.
Params:
  • buf – ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
  • socket – SocketChannel - the socket to write data to
  • readTimeout – long - the timeout for this read operation in milliseconds, -1 means no timeout
Throws:
Returns:the number of bytes read
/** * Performs a blocking read using the bytebuffer for data to be read * If the <code>selector</code> parameter is null, then it will perform a busy read that could * take up a lot of CPU cycles. * * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out * @param socket SocketChannel - the socket to write data to * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout * @return the number of bytes read * @throws EOFException if read returns -1 * @throws SocketTimeoutException if the read times out * @throws IOException if an IO Exception occurs in the underlying socket logic */
public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); if (key == null) { throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered")); } KeyReference reference = keyReferenceStack.pop(); if (reference == null) { reference = new KeyReference(); } NioSocketWrapper att = (NioSocketWrapper) key.attachment(); int read = 0; boolean timedout = false; int keycount = 1; //assume we can read long time = System.currentTimeMillis(); //start the timeout timer try { while (!timedout) { if (keycount > 0) { //only read if we were registered for a read read = socket.read(buf); if (read != 0) { break; } } try { if (att.getReadLatch()==null || att.getReadLatch().getCount()==0) { att.startReadLatch(1); } poller.add(att,SelectionKey.OP_READ, reference); att.awaitReadLatch(AbstractEndpoint.toTimeout(readTimeout), TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { // Ignore } if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) { //we got interrupted, but we haven't received notification from the poller. keycount = 0; }else { //latch countdown has happened keycount = 1; att.resetReadLatch(); } if (readTimeout >= 0 && (keycount == 0)) { timedout = (System.currentTimeMillis() - time) >= readTimeout; } } if (timedout) { throw new SocketTimeoutException(); } } finally { poller.remove(att,SelectionKey.OP_READ); if (timedout && reference.key != null) { poller.cancelKey(reference.key); } reference.key = null; keyReferenceStack.push(reference); } return read; } protected static class BlockPoller extends Thread { protected volatile boolean run = true; protected Selector selector = null; protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>(); public void disable() { run = false; selector.wakeup(); } protected final AtomicInteger wakeupCounter = new AtomicInteger(0); public void cancelKey(final SelectionKey key) { Runnable r = new RunnableCancel(key); events.offer(r); wakeup(); } public void wakeup() { if (wakeupCounter.addAndGet(1)==0) selector.wakeup(); } public void cancel(SelectionKey sk, NioSocketWrapper key, int ops){ if (sk != null) { sk.cancel(); sk.attach(null); if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) { countDown(key.getWriteLatch()); } if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) { countDown(key.getReadLatch()); } } } public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) { if (key == null) { return; } NioChannel nch = key.getSocket(); final SocketChannel ch = nch.getIOChannel(); if (ch == null) { return; } Runnable r = new RunnableAdd(ch, key, ops, ref); events.offer(r); wakeup(); } public void remove(final NioSocketWrapper key, final int ops) { if (key == null) { return; } NioChannel nch = key.getSocket(); final SocketChannel ch = nch.getIOChannel(); if (ch == null) { return; } Runnable r = new RunnableRemove(ch, key, ops); events.offer(r); wakeup(); } public boolean events() { Runnable r = null; /* We only poll and run the runnable events when we start this * method. Further events added to the queue later will be delayed * to the next execution of this method. * * We do in this way, because running event from the events queue * may lead the working thread to add more events to the queue (for * example, the worker thread may add another RunnableAdd event when * waken up by a previous RunnableAdd event who got an invalid * SelectionKey). Trying to consume all the events in an increasing * queue till it's empty, will make the loop hard to be terminated, * which will kill a lot of time, and greatly affect performance of * the poller loop. */ int size = events.size(); for (int i = 0; i < size && (r = events.poll()) != null; i++) { r.run(); } return (size > 0); } @Override public void run() { while (run) { try { events(); int keyCount = 0; try { int i = wakeupCounter.get(); if (i > 0) { keyCount = selector.selectNow(); } else { wakeupCounter.set(-1); keyCount = selector.select(1000); } wakeupCounter.set(0); if (!run) { break; } } catch (NullPointerException x) { // sun bug 5076772 on windows JDK 1.5 if (selector == null) { throw x; } if (log.isDebugEnabled()) { log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x); } continue; } catch (CancelledKeyException x) { // sun bug 5076772 on windows JDK 1.5 if (log.isDebugEnabled()) { log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x); } continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("nioBlockingSelector.selectError"), x); continue; } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (run && iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); try { iterator.remove(); sk.interestOps(sk.interestOps() & (~sk.readyOps())); if (sk.isReadable()) { countDown(socketWrapper.getReadLatch()); } if (sk.isWritable()) { countDown(socketWrapper.getWriteLatch()); } } catch (CancelledKeyException ckx) { sk.cancel(); countDown(socketWrapper.getReadLatch()); countDown(socketWrapper.getWriteLatch()); } } } catch (Throwable t) { log.error(sm.getString("nioBlockingSelector.processingError"), t); } } events.clear(); // If using a shared selector, the NioSelectorPool will also try and // close the selector. Try and avoid the ClosedSelectorException // although because multiple threads are involved there is always // the possibility of an Exception here. if (selector.isOpen()) { try { // Cancels all remaining keys selector.selectNow(); } catch (Exception ignore) { if (log.isDebugEnabled()) log.debug("", ignore); } } try { selector.close(); } catch (Exception ignore) { if (log.isDebugEnabled()) log.debug("", ignore); } } public void countDown(CountDownLatch latch) { if (latch == null) { return; } latch.countDown(); } private class RunnableAdd implements Runnable { private final SocketChannel ch; private final NioSocketWrapper key; private final int ops; private final KeyReference ref; public RunnableAdd(SocketChannel ch, NioSocketWrapper key, int ops, KeyReference ref) { this.ch = ch; this.key = key; this.ops = ops; this.ref = ref; } @Override public void run() { SelectionKey sk = ch.keyFor(selector); try { if (sk == null) { sk = ch.register(selector, ops, key); ref.key = sk; } else if (!sk.isValid()) { cancel(sk, key, ops); } else { sk.interestOps(sk.interestOps() | ops); } } catch (CancelledKeyException cx) { cancel(sk, key, ops); } catch (ClosedChannelException cx) { cancel(null, key, ops); } } } private class RunnableRemove implements Runnable { private final SocketChannel ch; private final NioSocketWrapper key; private final int ops; public RunnableRemove(SocketChannel ch, NioSocketWrapper key, int ops) { this.ch = ch; this.key = key; this.ops = ops; } @Override public void run() { SelectionKey sk = ch.keyFor(selector); try { if (sk == null) { if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) { countDown(key.getWriteLatch()); } if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) { countDown(key.getReadLatch()); } } else { if (sk.isValid()) { sk.interestOps(sk.interestOps() & (~ops)); if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) { countDown(key.getWriteLatch()); } if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) { countDown(key.getReadLatch()); } if (sk.interestOps() == 0) { sk.cancel(); sk.attach(null); } } else { sk.cancel(); sk.attach(null); } } } catch (CancelledKeyException cx) { if (sk != null) { sk.cancel(); sk.attach(null); } } } } public static class RunnableCancel implements Runnable { private final SelectionKey key; public RunnableCancel(SelectionKey key) { this.key = key; } @Override public void run() { key.cancel(); } } } public static class KeyReference { SelectionKey key = null; @Override protected void finalize() { if (key != null && key.isValid()) { log.warn(sm.getString("nioBlockingSelector.possibleLeak")); try { key.cancel(); } catch (Exception ignore) { } } } } }