Copyright (C) 2009-2013 Barchart, Inc. All rights reserved. Licensed under the OSI BSD License. http://www.opensource.org/licenses/bsd-license.php
/** * Copyright (C) 2009-2013 Barchart, Inc. <http://www.barchart.com/> * * All rights reserved. Licensed under the OSI BSD License. * * http://www.opensource.org/licenses/bsd-license.php */
package com.barchart.udt.nio; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.barchart.udt.EpollUDT; import com.barchart.udt.EpollUDT.Opt; import com.barchart.udt.ExceptionUDT; import com.barchart.udt.OptionUDT; import com.barchart.udt.SocketUDT; import com.barchart.udt.StatusUDT;
UDT selection key implementation.
/** * UDT selection key implementation. */
public class SelectionKeyUDT extends SelectionKey implements Comparable<SelectionKeyUDT> {
JDK interest to Epoll READ mapping.
/** * JDK interest to Epoll READ mapping. */
protected static final int HAS_READ = OP_ACCEPT | OP_READ;
JDK interest to Epoll WRITE mapping.
/** * JDK interest to Epoll WRITE mapping. */
protected static final int HAS_WRITE = OP_CONNECT | OP_WRITE; protected static final Logger log = LoggerFactory .getLogger(SelectionKeyUDT.class);
Convert select options : from jdk into epoll.
/** * Convert select options : from jdk into epoll. */
protected static Opt from(final int interestOps) { final boolean hasRead = (interestOps & HAS_READ) != 0; final boolean hasWrite = (interestOps & HAS_WRITE) != 0; if (hasRead && hasWrite) { return Opt.ALL; } if (hasRead) { return Opt.ERROR_READ; } if (hasWrite) { return Opt.ERROR_WRITE; } return Opt.ERROR; }
Render select options.
/** * Render select options. */
public static final String toStringOps(final int selectOps) { final char A = (OP_ACCEPT & selectOps) != 0 ? 'A' : '-'; final char C = (OP_CONNECT & selectOps) != 0 ? 'C' : '-'; final char R = (OP_READ & selectOps) != 0 ? 'R' : '-'; final char W = (OP_WRITE & selectOps) != 0 ? 'W' : '-'; return String.format("%c%c%c%c", A, C, R, W); }
Channel bound to the key.
/** * Channel bound to the key. */
private final ChannelUDT channelUDT;
Requested interest in epoll format.
/** * Requested interest in epoll format. */
private volatile Opt epollOpt;
Requested interest in JDK format.
/** * Requested interest in JDK format. */
private volatile int interestOps;
Key validity state. Key is valid when created, and invalid when canceled.
/** * Key validity state. Key is valid when created, and invalid when canceled. */
private volatile boolean isValid;
Reported ready interest.
/** * Reported ready interest. */
private volatile int readyOps;
Correlation index for doRead(int) vs doWrite(int)
/** * Correlation index for {@link #doRead(int)} vs {@link #doWrite(int)} */
private volatile int resultIndex;
Selector bound to the key.
/** * Selector bound to the key. */
private final SelectorUDT selectorUDT; protected SelectionKeyUDT( // final SelectorUDT selectorUDT, // final ChannelUDT channelUDT, // final Object attachment // ) { super.attach(attachment); this.selectorUDT = selectorUDT; this.channelUDT = channelUDT; makeValid(true); }
Ensure key is NOT canceled.
/** * Ensure key is NOT canceled. */
protected void assertValidKey() throws CancelledKeyException { if (isValid()) { return; } throw new CancelledKeyException(); }
Ensure only permitted interest mask bits are present.
/** * Ensure only permitted interest mask bits are present. */
protected void assertValidOps(final int interestOps) { if ((interestOps & ~(channel().validOps())) != 0) { throw new IllegalArgumentException("invalid interestOps=" + interestOps); } } @Override public void cancel() { if (isValid()) { selector().cancel(this); } } @Override public SelectableChannel channel() { return (SelectableChannel) channelUDT; }
Underlying UDT channel.
/** * Underlying UDT channel. */
protected ChannelUDT channelUDT() { return channelUDT; } @Override public int compareTo(final SelectionKeyUDT that) { final int thisId = this.socketId(); final int thatId = that.socketId(); if (thisId > thatId) { return +1; } if (thisId < thatId) { return -1; } return 0; }
Apply READ readiness according to KindUDT channel role.

Note: doRead(int) is invoked before doWrite(int)

Sockets with exceptions are returned to both read and write sets.

Returns:Should report ready-state change?
/** * Apply READ readiness according to {@link KindUDT} channel role. * <p> * Note: {@link #doRead(int)} is invoked before {@link #doWrite(int)} * <p> * Sockets with exceptions are returned to both read and write sets. * * @return Should report ready-state change? */
protected boolean doRead(final int resultIndex) { int readyOps = 0; final int interestOps = this.interestOps; /** Store read/write verifier. */ this.resultIndex = resultIndex; try { /** Check error report. */ if (!epollOpt.hasRead()) { if (isSocketBroken()) { readyOps = channel().validOps(); return true; } else { logError("Unexpected error report."); return false; } } switch (kindUDT()) { case ACCEPTOR: if ((interestOps & OP_ACCEPT) != 0) { readyOps = OP_ACCEPT; return true; } else { logError("Ready to ACCEPT while not interested."); return false; } case CONNECTOR: case RENDEZVOUS: if ((interestOps & OP_READ) != 0) { readyOps = OP_READ; return true; } else { logError("Ready to READ while not interested."); return false; } default: logError("Wrong kind."); return false; } } finally { this.readyOps = readyOps; } }
Apply WRITE readiness according to KindUDT channel role.

Note: doRead(int) is invoked before doWrite(int)

Sockets with exceptions are returned to both read and write sets.

Returns:Should report ready-state change?
/** * Apply WRITE readiness according to {@link KindUDT} channel role. * <p> * Note: {@link #doRead(int)} is invoked before {@link #doWrite(int)} * <p> * Sockets with exceptions are returned to both read and write sets. * * @return Should report ready-state change? */
protected boolean doWrite(final int resultIndex) { int readyOps = 0; final int interestOps = this.interestOps; /** Verify read/write relationship. */ final boolean hadReadBeforeWrite = this.resultIndex == resultIndex; try { /** Check error report. */ if (!epollOpt.hasWrite()) { if (isSocketBroken()) { readyOps = channel().validOps(); return true; } else { logError("Unexpected error report."); return false; } } switch (kindUDT()) { case ACCEPTOR: logError("Ready to WRITE for acceptor."); return false; case CONNECTOR: case RENDEZVOUS: if (channelUDT().isConnectFinished()) { if ((interestOps & OP_WRITE) != 0) { readyOps = OP_WRITE; return true; } else { logError("Ready to WRITE when not insterested."); return false; } } else { if ((interestOps & OP_CONNECT) != 0) { readyOps = OP_CONNECT; return true; } else { logError("Ready to CONNECT when not interested."); return false; } } default: logError("Wrong kind."); return false; } } finally { if (hadReadBeforeWrite) { this.readyOps |= readyOps; } else { this.readyOps = readyOps; } } }
Requested interest in epoll format.
/** * Requested interest in epoll format. */
protected Opt epollOpt() { return epollOpt; }
Epoll bound to this key.
/** * Epoll bound to this key. */
protected EpollUDT epollUDT() { return selector().epollUDT(); }
Key equality based on socket-id.
/** * Key equality based on socket-id. */
@Override public boolean equals(final Object otherKey) { if (otherKey instanceof SelectionKeyUDT) { final SelectionKeyUDT other = (SelectionKeyUDT) otherKey; return other.socketId() == this.socketId(); } return false; }
Check socket error condition.
/** * Check socket error condition. */
boolean hasError() throws ExceptionUDT { final int code = socketUDT().getOption(OptionUDT.Epoll_Event_Mask); return EpollUDT.Opt.from(code).hasError(); }
Key hach code based on socket-id.
/** * Key hach code based on socket-id. */
@Override public int hashCode() { return socketId(); } @Override public int interestOps() { return interestOps; } @Override public SelectionKey interestOps(final int interestOps) { assertValidKey(); assertValidOps(interestOps); try { final Opt epollNew = from(interestOps); if (epollNew != epollOpt) { if (Opt.ERROR == epollNew) { epollUDT().remove(socketUDT()); } else { epollUDT().remove(socketUDT()); epollUDT().add(socketUDT(), epollNew); } epollOpt = epollNew; } } catch (final Exception e) { log.error("epoll udpate failure", e); } finally { this.interestOps = interestOps; } return this; }
Check socket termination status.
Returns:true if status is StatusUDT.BROKEN or worse
/** * Check socket termination status. * * @return true if status is {@link StatusUDT#BROKEN} or worse */
protected boolean isSocketBroken() { switch (socketUDT().status()) { case INIT: case OPENED: case LISTENING: case CONNECTING: case CONNECTED: return false; case BROKEN: case CLOSING: case CLOSED: case NONEXIST: return true; default: logError("Unknown socket status."); return true; } } @Override public boolean isValid() { return isValid; }
Channel role.
/** * Channel role. */
protected KindUDT kindUDT() { return channelUDT.kindUDT(); }
Key processing logic error logger.
/** * Key processing logic error logger. */
protected void logError(final String comment) { final String message = "logic error : \n\t" + this; log.warn(message, new Exception("" + comment)); }
Change socket registration with epoll, and change key validity status.
/** * Change socket registration with epoll, and change key validity status. */
protected void makeValid(final boolean isValid) { try { if (isValid) { epollOpt = Opt.ERROR; epollUDT().add(socketUDT(), epollOpt); } else { epollUDT().remove(socketUDT()); } } catch (final Throwable e) { log.error("Epoll failure.", e); } finally { this.isValid = isValid; } } @Override public int readyOps() { return readyOps; } protected void readyOps(final int ops) { readyOps = ops; } @Override public SelectorUDT selector() { return selectorUDT; }
Id of a socket bound to this key.
/** * Id of a socket bound to this key. */
protected int socketId() { return socketUDT().id(); }
Socket bound to this key.
/** * Socket bound to this key. */
protected SocketUDT socketUDT() { return channelUDT.socketUDT(); } @Override public String toString() { return String .format("[id: 0x%08x] poll=%s ready=%s inter=%s %s %s %s bind=%s:%s peer=%s:%s", // socketUDT().id(), // epollOpt, // toStringOps(readyOps), // toStringOps(interestOps), // channelUDT.typeUDT(), // channelUDT.kindUDT(), // socketUDT().status(), // socketUDT().getLocalInetAddress(), // socketUDT().getLocalInetPort(), // socketUDT().getRemoteInetAddress(), // socketUDT().getRemoteInetPort() // ); } }