Copyright (C) 2009-2013 Barchart, Inc. All rights reserved. Licensed under the OSI BSD License. http://www.opensource.org/licenses/bsd-license.php
/** * Copyright (C) 2009-2013 Barchart, Inc. <http://www.barchart.com/> * * All rights reserved. Licensed under the OSI BSD License. * * http://www.opensource.org/licenses/bsd-license.php */
package com.barchart.udt.nio; import static com.barchart.udt.SocketUDT.*; import java.io.IOException; import java.nio.IntBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.IllegalSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.AbstractSelectableChannel; import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.barchart.udt.EpollUDT; import com.barchart.udt.ExceptionUDT; import com.barchart.udt.SocketUDT; import com.barchart.udt.TypeUDT; import com.barchart.udt.util.HelpUDT;
selector

design guidelines:

1) follow general contracts of jdk 6 nio; see barchart-udt-reference-jdk6

2) adapt to how netty is doing select; see NioEventLoop

note: you must use SelectorProviderUDT.openSelector() to obtain instance of this class; do not use JDK Selector.open()

/** * selector * <p> * design guidelines: * <p> * 1) follow general contracts of jdk 6 nio; see <a href= * "https://github.com/barchart/barchart-udt/tree/master/barchart-udt-reference-jdk6" * >barchart-udt-reference-jdk6</a> * <p> * 2) adapt to how netty is doing select; see <a href= * "https://github.com/netty/netty/blob/master/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java" * >NioEventLoop</a> * <p> * note: you must use {@link SelectorProviderUDT#openSelector()} to obtain * instance of this class; do not use JDK * {@link java.nio.channels.Selector#open()} */
public class SelectorUDT extends AbstractSelector { protected static final Logger log = LoggerFactory .getLogger(SelectorUDT.class);
use this call to instantiate a selector for UDT
/** * use this call to instantiate a selector for UDT */
protected static Selector open(final TypeUDT type) throws IOException { final SelectorProviderUDT provider; switch (type) { case DATAGRAM: provider = SelectorProviderUDT.DATAGRAM; break; case STREAM: provider = SelectorProviderUDT.STREAM; break; default: log.error("unsupported type={}", type); throw new IOException("unsupported type"); } return provider.openSelector(); } private final EpollUDT epollUDT = new EpollUDT(); /** */ public final int maximimSelectorSize;
list of epoll sockets with read interest
/** * list of epoll sockets with read interest */
private final IntBuffer readBuffer;
[ socket-id : selection-key ]
/** * [ socket-id : selection-key ] */
private final ConcurrentMap<Integer, SelectionKeyUDT> // registeredKeyMap = new ConcurrentHashMap<Integer, SelectionKeyUDT>();
public view : immutable
/** * public view : immutable */
private final Set<? extends SelectionKey> // registeredKeySet = HelpUDT.unmodifiableSet(registeredKeyMap.values());
tracks correlation read with write for the same key
/** * tracks correlation read with write for the same key */
private volatile int resultIndex;
set of keys with data ready for an operation
/** * set of keys with data ready for an operation */
private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> // selectedKeyMap = new ConcurrentHashMap<SelectionKeyUDT, SelectionKeyUDT>();
public view : removal allowed, but not addition
/** * public view : removal allowed, but not addition */
private final Set<? extends SelectionKey> // selectedKeySet = HelpUDT.ungrowableSet(selectedKeyMap.keySet());
select is exclusive
/** select is exclusive */
private final Lock selectLock = new ReentrantLock();
reported epoll socket list sizes
/** reported epoll socket list sizes */
private final IntBuffer sizeBuffer;
Canceled keys.
/** * Canceled keys. */
private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> // terminatedKeyMap = new ConcurrentHashMap<SelectionKeyUDT, SelectionKeyUDT>();
guarded by doSelectLocked
/** guarded by {@link #doSelectLocked} */
private volatile int wakeupBaseCount; private volatile int wakeupStepCount;
list of epoll sockets with write interest
/** list of epoll sockets with write interest */
private final IntBuffer writeBuffer; protected SelectorUDT( // final SelectorProvider provider, // final int maximumSelectorSize // ) throws ExceptionUDT { super(provider); this.maximimSelectorSize = maximumSelectorSize; readBuffer = HelpUDT.newDirectIntBufer(maximumSelectorSize); writeBuffer = HelpUDT.newDirectIntBufer(maximumSelectorSize); sizeBuffer = HelpUDT.newDirectIntBufer(UDT_SIZE_COUNT); }
Enqueue cancel request.
/** * Enqueue cancel request. */
protected void cancel(final SelectionKeyUDT keyUDT) { terminatedKeyMap.putIfAbsent(keyUDT, keyUDT); }
Process pending cancel requests.
/** * Process pending cancel requests. */
protected void doCancel() { if (terminatedKeyMap.isEmpty()) { return; } final Iterator<SelectionKeyUDT> iterator = terminatedKeyMap.values() .iterator(); while (iterator.hasNext()) { final SelectionKeyUDT keyUDT = iterator.next(); iterator.remove(); if (keyUDT.isValid()) { keyUDT.makeValid(false); registeredKeyMap.remove(keyUDT.socketId()); } } }
Params:
  • millisTimeout – <0 : invinite; =0 : immediate; >0 : finite;
/** * @param millisTimeout * <0 : invinite; =0 : immediate; >0 : finite; */
protected int doEpollEnter(final long millisTimeout) throws IOException { if (!isOpen()) { log.error("slector is closed"); throw new ClosedSelectorException(); } try { selectLock.lock(); return doEpollExclusive(millisTimeout); } finally { selectLock.unlock(); } }
Params:
  • millisTimeout – <0 : invinite; =0 : immediate; >0 : finite;
Returns: <0 : should not happen =0 : means nothing was selected/timeout >0 : number of selected keys
/** * @param millisTimeout * * <0 : invinite; * * =0 : immediate; * * >0 : finite; * @return * * <0 : should not happen * * =0 : means nothing was selected/timeout * * >0 : number of selected keys */
protected int doEpollExclusive(final long millisTimeout) throws IOException { try { /** java.nio.Selector contract for wakeup() */ // begin(); /** pre select */ doCancel(); /** select proper */ doEpollSelect(millisTimeout); /** post select */ doResults(); } finally { /** java.nio.Selector contract for wakeup() */ // end(); } return selectedKeyMap.size(); }
Params:
  • millisTimeout – <0 : infinite =0 : immediate >0 : finite
/** * @param millisTimeout * * <0 : infinite * * =0 : immediate * * >0 : finite */
protected int doEpollSelect(long millisTimeout) throws ExceptionUDT { wakeupMarkBase(); int readyCount = 0; if (millisTimeout < 0) { /** infinite: do select in slices; check for wakeup; */ do { readyCount = doEpollSelectUDT(DEFAULT_MIN_SELECTOR_TIMEOUT); if (readyCount > 0 || wakeupIsPending()) { break; } } while (true); } else if (millisTimeout > 0) { /** finite: do select in slices; check for wakeup; count down */ do { readyCount = doEpollSelectUDT(DEFAULT_MIN_SELECTOR_TIMEOUT); if (readyCount > 0 || wakeupIsPending()) { break; } millisTimeout -= DEFAULT_MIN_SELECTOR_TIMEOUT; } while (millisTimeout > 0); } else { /** immediate */ readyCount = doEpollSelectUDT(0); } return readyCount; } protected int doEpollSelectUDT(final long timeout) throws ExceptionUDT { return SocketUDT.selectEpoll(// epollUDT.id(), // readBuffer, // writeBuffer, // sizeBuffer, // timeout // ); } protected void doResults() { final int resultIndex = this.resultIndex++; doResultsRead(resultIndex); doResultsWrite(resultIndex); } protected void doResultsRead(final int resultIndex) { final int readSize = sizeBuffer.get(UDT_READ_INDEX); for (int index = 0; index < readSize; index++) { final int socketId = readBuffer.get(index); final SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId); /** * Epoll will report closed socket once in both read and write sets. * But selector consumer may cancel the key before close. */ if (keyUDT == null) { logSocketId("missing from read ", socketId); continue; } if (keyUDT.doRead(resultIndex)) { selectedKeyMap.putIfAbsent(keyUDT, keyUDT); } } } protected void doResultsWrite(final int resultIndex) { final int writeSize = sizeBuffer.get(UDT_WRITE_INDEX); for (int index = 0; index < writeSize; index++) { final int socketId = writeBuffer.get(index); final SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId); /** * Epoll will report closed socket once in both read and write sets. * But selector consumer may cancel the key before close. */ if (keyUDT == null) { logSocketId("missing from write", socketId); continue; } if (keyUDT.doWrite(resultIndex)) { selectedKeyMap.putIfAbsent(keyUDT, keyUDT); } } } protected EpollUDT epollUDT() { return epollUDT; } @Override protected void implCloseSelector() throws IOException { wakeup(); try { selectLock.lock(); for (final SelectionKeyUDT keyUDT : registeredKeyMap.values()) { cancel(keyUDT); } } finally { selectLock.unlock(); } doCancel(); } @SuppressWarnings("unchecked") @Override public Set<SelectionKey> keys() { if (!isOpen()) { throw new ClosedSelectorException(); } return (Set<SelectionKey>) registeredKeySet; } protected void logSocketId(final String title, final int socketId) { if (log.isDebugEnabled()) { log.debug("{} {}", title, String.format("[id: 0x%08x]", socketId)); } } /** */ @Override protected SelectionKey register( // final AbstractSelectableChannel channel, // final int interestOps, // final Object attachment // ) { if (registeredKeyMap.size() >= maximimSelectorSize) { log.error("reached maximimSelectorSize"); throw new IllegalSelectorException(); } if (!(channel instanceof ChannelUDT)) { log.error("!(channel instanceof ChannelUDT)"); throw new IllegalSelectorException(); } final ChannelUDT channelUDT = (ChannelUDT) channel; final Integer socketId = channelUDT.socketUDT().id(); SelectionKeyUDT keyUDT = registeredKeyMap.get(socketId); if (keyUDT == null) { keyUDT = new SelectionKeyUDT(this, channelUDT, attachment); registeredKeyMap.putIfAbsent(socketId, keyUDT); keyUDT = registeredKeyMap.get(socketId); } keyUDT.interestOps(interestOps); return keyUDT; } @Override public int select() throws IOException { return select(0); } @Override public int select(final long timeout) throws IOException { if (timeout < 0) { throw new IllegalArgumentException("negative timeout"); } else if (timeout > 0) { return doEpollEnter(timeout); } else { return doEpollEnter(SocketUDT.TIMEOUT_INFINITE); } } @SuppressWarnings("unchecked") @Override public Set<SelectionKey> selectedKeys() { if (!isOpen()) { throw new ClosedSelectorException(); } return (Set<SelectionKey>) selectedKeySet; } @Override public int selectNow() throws IOException { return doEpollEnter(SocketUDT.TIMEOUT_NONE); } @Override public Selector wakeup() { wakeupStepCount++; return this; } protected boolean wakeupIsPending() { return wakeupBaseCount != wakeupStepCount; } protected void wakeupMarkBase() { wakeupBaseCount = wakeupStepCount; } }