/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.compat.JreCompat;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.jsse.JSSESupport;
NIO tailored thread pool, providing the following services:
- Socket acceptor thread
- Socket poller thread
- Worker threads pool
When switching to Java 5, there's an opportunity to use the virtual
machine's thread pool.
Author: Mladen Turk, Remy Maucherat
/**
* NIO tailored thread pool, providing the following services:
* <ul>
* <li>Socket acceptor thread</li>
* <li>Socket poller thread</li>
* <li>Worker threads pool</li>
* </ul>
*
* When switching to Java 5, there's an opportunity to use the virtual
* machine's thread pool.
*
* @author Mladen Turk
* @author Remy Maucherat
*/
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
// -------------------------------------------------------------- Constants
private static final Log log = LogFactory.getLog(NioEndpoint.class);
public static final int OP_REGISTER = 0x100; //register interest op
// ----------------------------------------------------------------- Fields
Server socket "pointer".
/**
* Server socket "pointer".
*/
private volatile ServerSocketChannel serverSock = null;
Stop latch used to wait for poller stop
/**
* Stop latch used to wait for poller stop
*/
private volatile CountDownLatch stopLatch = null;
Cache for poller events
/**
* Cache for poller events
*/
private SynchronizedStack<PollerEvent> eventCache;
Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
/**
* Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
*/
private SynchronizedStack<NioChannel> nioChannels;
// ------------------------------------------------------------- Properties
Use System.inheritableChannel to obtain channel from stdin/stdout.
/**
* Use System.inheritableChannel to obtain channel from stdin/stdout.
*/
private boolean useInheritedChannel = false;
public void setUseInheritedChannel(boolean useInheritedChannel) { this.useInheritedChannel = useInheritedChannel; }
public boolean getUseInheritedChannel() { return useInheritedChannel; }
Priority of the poller thread.
/**
* Priority of the poller thread.
*/
private int pollerThreadPriority = Thread.NORM_PRIORITY;
public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
public int getPollerThreadPriority() { return pollerThreadPriority; }
private long selectorTimeout = 1000;
public void setSelectorTimeout(long timeout) { this.selectorTimeout = timeout;}
public long getSelectorTimeout() { return this.selectorTimeout; }
The socket poller.
/**
* The socket poller.
*/
private Poller poller = null;
// --------------------------------------------------------- Public Methods
Number of keep-alive sockets.
Returns: The number of sockets currently in the keep-alive state waiting
for the next request to be received on the socket
/**
* Number of keep-alive sockets.
*
* @return The number of sockets currently in the keep-alive state waiting
* for the next request to be received on the socket
*/
public int getKeepAliveCount() {
if (poller == null) {
return 0;
} else {
return poller.getKeyCount();
}
}
// ----------------------------------------------- Public Lifecycle Methods
Initialize the endpoint.
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
initServerSocket();
setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
}
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
Start the NIO endpoint, creating acceptor, poller threads.
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
if (socketProperties.getProcessorCache() != 0) {
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
}
if (socketProperties.getEventCache() != 0) {
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
}
int actualBufferPool =
socketProperties.getActualBufferPool(isSSLEnabled() ? getSniParseLimit() * 2 : 0);
if (actualBufferPool != 0) {
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
actualBufferPool);
}
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
startAcceptorThread();
}
}
Stop the endpoint. This will cause all processing threads to stop.
/**
* Stop the endpoint. This will cause all processing threads to stop.
*/
@Override
public void stopInternal() {
if (!paused) {
pause();
}
if (running) {
running = false;
acceptor.stop(10);
if (poller != null) {
poller.destroy();
poller = null;
}
try {
if (!getStopLatch().await(selectorTimeout + 100, TimeUnit.MILLISECONDS)) {
log.warn(sm.getString("endpoint.nio.stopLatchAwaitFail"));
}
} catch (InterruptedException e) {
log.warn(sm.getString("endpoint.nio.stopLatchAwaitInterrupted"), e);
}
shutdownExecutor();
if (eventCache != null) {
eventCache.clear();
eventCache = null;
}
if (nioChannels != null) {
nioChannels.clear();
nioChannels = null;
}
if (processorCache != null) {
processorCache.clear();
processorCache = null;
}
}
}
Deallocate NIO memory pools, and close server socket.
/**
* Deallocate NIO memory pools, and close server socket.
*/
@Override
public void unbind() throws Exception {
if (log.isDebugEnabled()) {
log.debug("Destroy initiated for " +
new InetSocketAddress(getAddress(),getPortWithOffset()));
}
if (running) {
stop();
}
try {
doCloseServerSocket();
} catch (IOException ioe) {
getLog().warn(sm.getString("endpoint.serverSocket.closeFailed", getName()), ioe);
}
destroySsl();
super.unbind();
if (getHandler() != null ) {
getHandler().recycle();
}
if (log.isDebugEnabled()) {
log.debug("Destroy completed for " +
new InetSocketAddress(getAddress(), getPortWithOffset()));
}
}
@Override
protected void doCloseServerSocket() throws IOException {
if (!getUseInheritedChannel() && serverSock != null) {
// Close server socket
serverSock.close();
}
serverSock = null;
}
// ------------------------------------------------------ Protected Methods
protected SynchronizedStack<NioChannel> getNioChannels() {
return nioChannels;
}
protected Poller getPoller() {
return poller;
}
protected CountDownLatch getStopLatch() {
return stopLatch;
}
protected void setStopLatch(CountDownLatch stopLatch) {
this.stopLatch = stopLatch;
}
Process the specified connection.
Params: - socket – The socket channel
Returns: true
if the socket was correctly configured
and processing may continue, false
if the socket needs to be
close immediately
/**
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
@Override
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
socketProperties.setProperties(socket.socket());
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
poller.register(socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
@Override
protected void destroySocket(SocketChannel socket) {
countDownConnection();
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
}
@Override
protected NetworkChannel getServerSocket() {
return serverSock;
}
@Override
protected SocketChannel serverSocketAccept() throws Exception {
return serverSock.accept();
}
@Override
protected Log getLog() {
return log;
}
@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
// ----------------------------------------------------- Poller Inner Classes
PollerEvent, cacheable object for poller events to avoid GC
/**
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent {
private NioSocketWrapper socketWrapper;
private int interestOps;
public PollerEvent(NioSocketWrapper socketWrapper, int intOps) {
reset(socketWrapper, intOps);
}
public void reset(NioSocketWrapper socketWrapper, int intOps) {
this.socketWrapper = socketWrapper;
interestOps = intOps;
}
public NioSocketWrapper getSocketWrapper() {
return socketWrapper;
}
public int getInterestOps() {
return interestOps;
}
public void reset() {
reset(null, 0);
}
@Override
public String toString() {
return "Poller event: socket [" + socketWrapper.getSocket() + "], socketWrapper [" + socketWrapper +
"], interestOps [" + interestOps + "]";
}
}
Poller class.
/**
* Poller class.
*/
public class Poller implements Runnable {
private Selector selector;
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();
private volatile boolean close = false;
// Optimize expiration handling
private long nextExpiration = 0;
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
public Poller() throws IOException {
this.selector = Selector.open();
}
public int getKeyCount() { return keyCount; }
public Selector getSelector() { return selector; }
Destroy the poller.
/**
* Destroy the poller.
*/
protected void destroy() {
// Wait for polltime before doing anything, so that the poller threads
// exit, otherwise parallel closure of sockets which are still
// in the poller can cause problems
close = true;
selector.wakeup();
}
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
Add specified socket and associated pool to the poller. The socket will
be added to a temporary array, and polled first after a maximum amount
of time equal to pollTime (in most cases, latency will be much lower,
however).
Params: - socketWrapper – to add to the poller
- interestOps – Operations for which to register this socket with
the Poller
/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
* of time equal to pollTime (in most cases, latency will be much lower,
* however).
*
* @param socketWrapper to add to the poller
* @param interestOps Operations for which to register this socket with
* the Poller
*/
public void add(NioSocketWrapper socketWrapper, int interestOps) {
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socketWrapper, interestOps);
} else {
r.reset(socketWrapper, interestOps);
}
addEvent(r);
if (close) {
processSocket(socketWrapper, SocketEvent.STOP, false);
}
}
Processes events in the event queue of the Poller.
Returns: true
if some events were processed,
false
if queue was empty
/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
NioSocketWrapper socketWrapper = pe.getSocketWrapper();
SocketChannel sc = socketWrapper.getSocket().getIOChannel();
int interestOps = pe.getInterestOps();
if (sc == null) {
log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
socketWrapper.close();
} else if (interestOps == OP_REGISTER) {
try {
sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = sc.keyFor(getSelector());
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socketWrapper.close();
} else {
final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
if (attachment != null) {
// We are registering the key to start with, reset the fairness counter.
try {
int ops = key.interestOps() | interestOps;
attachment.interestOps(ops);
key.interestOps(ops);
} catch (CancelledKeyException ckx) {
cancelledKey(key, socketWrapper);
}
} else {
cancelledKey(key, socketWrapper);
}
}
}
if (running && !paused && eventCache != null) {
pe.reset();
eventCache.push(pe);
}
}
return result;
}
Registers a newly created socket with the poller.
Params: - socketWrapper – The socket wrapper
/**
* Registers a newly created socket with the poller.
*
* @param socketWrapper The socket wrapper
*/
public void register(final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socketWrapper, OP_REGISTER);
} else {
event.reset(socketWrapper, OP_REGISTER);
}
addEvent(event);
}
public void cancelledKey(SelectionKey sk, SocketWrapperBase<NioChannel> socketWrapper) {
if (JreCompat.isJre11Available() && socketWrapper != null) {
socketWrapper.close();
} else {
try {
// If is important to cancel the key first, otherwise a deadlock may occur between the
// poller select and the socket channel close which would cancel the key
// This workaround is not needed on Java 11+
// TODO: verify, if not fixed in Java 11+ then 14+ is needed, see BZ 64007
// TODO: the cancelledKey method can be removed with Java 11 as minimum
if (sk != null) {
sk.attach(null);
if (sk.isValid()) {
sk.cancel();
}
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) {
log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
}
} finally {
if (socketWrapper != null) {
socketWrapper.close();
}
}
}
}
The background thread that adds sockets to the Poller, checks the
poller for triggered events and hands the associated socket off to an
appropriate processor as events occur.
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper != null) {
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid()) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
}
} else if (socketWrapper.readBlocking) {
synchronized (socketWrapper.readLock) {
socketWrapper.readBlocking = false;
socketWrapper.readLock.notify();
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (socketWrapper.writeBlocking) {
synchronized (socketWrapper.writeLock) {
socketWrapper.writeBlocking = false;
socketWrapper.writeLock.notify();
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
// Invalid key
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, socketWrapper);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
public SendfileState processSendfile(SelectionKey sk, NioSocketWrapper socketWrapper,
boolean calledByProcessor) {
NioChannel sc = null;
try {
unreg(sk, socketWrapper, sk.readyOps());
SendfileData sd = socketWrapper.getSendfileData();
if (log.isTraceEnabled()) {
log.trace("Processing send file for: " + sd.fileName);
}
if (sd.fchannel == null) {
// Setup the file channel
File f = new File(sd.fileName);
@SuppressWarnings("resource") // Closed when channel is closed
FileInputStream fis = new FileInputStream(f);
sd.fchannel = fis.getChannel();
}
// Configure output channel
sc = socketWrapper.getSocket();
// TLS/SSL channel is slightly different
WritableByteChannel wc = ((sc instanceof SecureNioChannel) ? sc : sc.getIOChannel());
// We still have data in the buffer
if (sc.getOutboundRemaining() > 0) {
if (sc.flushOutbound()) {
socketWrapper.updateLastWrite();
}
} else {
long written = sd.fchannel.transferTo(sd.pos, sd.length, wc);
if (written > 0) {
sd.pos += written;
sd.length -= written;
socketWrapper.updateLastWrite();
} else {
// Unusual not to be able to transfer any bytes
// Check the length was set correctly
if (sd.fchannel.size() <= sd.pos) {
throw new IOException(sm.getString("endpoint.sendfile.tooMuchData"));
}
}
}
if (sd.length <= 0 && sc.getOutboundRemaining()<=0) {
if (log.isDebugEnabled()) {
log.debug("Send file complete for: " + sd.fileName);
}
socketWrapper.setSendfileData(null);
try {
sd.fchannel.close();
} catch (Exception ignore) {
}
// For calls from outside the Poller, the caller is
// responsible for registering the socket for the
// appropriate event(s) if sendfile completes.
if (!calledByProcessor) {
switch (sd.keepAliveState) {
case NONE: {
if (log.isDebugEnabled()) {
log.debug("Send file connection is being closed");
}
poller.cancelledKey(sk, socketWrapper);
break;
}
case PIPELINED: {
if (log.isDebugEnabled()) {
log.debug("Connection is keep alive, processing pipe-lined data");
}
if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
poller.cancelledKey(sk, socketWrapper);
}
break;
}
case OPEN: {
if (log.isDebugEnabled()) {
log.debug("Connection is keep alive, registering back for OP_READ");
}
reg(sk, socketWrapper, SelectionKey.OP_READ);
break;
}
}
}
return SendfileState.DONE;
} else {
if (log.isDebugEnabled()) {
log.debug("OP_WRITE for sendfile: " + sd.fileName);
}
if (calledByProcessor) {
add(socketWrapper, SelectionKey.OP_WRITE);
} else {
reg(sk, socketWrapper, SelectionKey.OP_WRITE);
}
return SendfileState.PENDING;
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("Unable to complete sendfile request:", e);
}
if (!calledByProcessor && sc != null) {
poller.cancelledKey(sk, socketWrapper);
}
return SendfileState.ERROR;
} catch (Throwable t) {
log.error(sm.getString("endpoint.sendfile.error"), t);
if (!calledByProcessor && sc != null) {
poller.cancelledKey(sk, socketWrapper);
}
return SendfileState.ERROR;
}
}
protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
// This is a must, so that we don't have multiple threads messing with the socket
reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
}
protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
sk.interestOps(intops);
socketWrapper.interestOps(intops);
}
protected void timeout(int keyCount, boolean hasEvents) {
long now = System.currentTimeMillis();
// This method is called on every loop of the Poller. Don't process
// timeouts on every loop of the Poller since that would create too
// much load and timeouts can afford to wait a few seconds.
// However, do process timeouts if any of the following are true:
// - the selector simply timed out (suggests there isn't much load)
// - the nextExpiration time has passed
// - the server socket is being closed
if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
return;
}
int keycount = 0;
try {
for (SelectionKey key : selector.keys()) {
keycount++;
NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
try {
if (socketWrapper == null) {
// We don't support any keys without attachments
cancelledKey(key, null);
} else if (close) {
key.interestOps(0);
// Avoid duplicate stop calls
socketWrapper.interestOps(0);
cancelledKey(key, socketWrapper);
} else if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
(socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
boolean readTimeout = false;
boolean writeTimeout = false;
// Check for read timeout
if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
long delta = now - socketWrapper.getLastRead();
long timeout = socketWrapper.getReadTimeout();
if (timeout > 0 && delta > timeout) {
readTimeout = true;
}
}
// Check for write timeout
if (!readTimeout && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
long delta = now - socketWrapper.getLastWrite();
long timeout = socketWrapper.getWriteTimeout();
if (timeout > 0 && delta > timeout) {
writeTimeout = true;
}
}
if (readTimeout || writeTimeout) {
key.interestOps(0);
// Avoid duplicate timeout calls
socketWrapper.interestOps(0);
socketWrapper.setError(new SocketTimeoutException());
if (readTimeout && socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
cancelledKey(key, socketWrapper);
}
} else if (writeTimeout && socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
cancelledKey(key, socketWrapper);
}
} else if (!processSocket(socketWrapper, SocketEvent.ERROR, true)) {
cancelledKey(key, socketWrapper);
}
}
}
} catch (CancelledKeyException ckx) {
cancelledKey(key, socketWrapper);
}
}
} catch (ConcurrentModificationException cme) {
// See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
}
// For logging purposes only
long prevExp = nextExpiration;
nextExpiration = System.currentTimeMillis() +
socketProperties.getTimeoutInterval();
if (log.isTraceEnabled()) {
log.trace("timeout completed: keys processed=" + keycount +
"; now=" + now + "; nextExpiration=" + prevExp +
"; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
"; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
}
}
}
// --------------------------------------------------- Socket Wrapper Class
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
private final SynchronizedStack<NioChannel> nioChannels;
private final Poller poller;
private int interestOps = 0;
private volatile SendfileData sendfileData = null;
private volatile long lastRead = System.currentTimeMillis();
private volatile long lastWrite = lastRead;
private final Object readLock;
private volatile boolean readBlocking = false;
private final Object writeLock;
private volatile boolean writeBlocking = false;
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
super(channel, endpoint);
nioChannels = endpoint.getNioChannels();
poller = endpoint.getPoller();
socketBufferHandler = channel.getBufHandler();
readLock = (readPending == null) ? new Object() : readPending;
writeLock = (writePending == null) ? new Object() : writePending;
}
public Poller getPoller() { return poller; }
public int interestOps() { return interestOps; }
public int interestOps(int ops) { this.interestOps = ops; return ops; }
public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
public SendfileData getSendfileData() { return this.sendfileData; }
public void updateLastWrite() { lastWrite = System.currentTimeMillis(); }
public long getLastWrite() { return lastWrite; }
public void updateLastRead() { lastRead = System.currentTimeMillis(); }
public long getLastRead() { return lastRead; }
@Override
public boolean isReadyForRead() throws IOException {
socketBufferHandler.configureReadBufferForRead();
if (socketBufferHandler.getReadBuffer().remaining() > 0) {
return true;
}
fillReadBuffer(false);
boolean isReady = socketBufferHandler.getReadBuffer().position() > 0;
return isReady;
}
@Override
public int read(boolean block, byte[] b, int off, int len) throws IOException {
int nRead = populateReadBuffer(b, off, len);
if (nRead > 0) {
return nRead;
/*
* Since more bytes may have arrived since the buffer was last
* filled, it is an option at this point to perform a
* non-blocking read. However correctly handling the case if
* that read returns end of stream adds complexity. Therefore,
* at the moment, the preference is for simplicity.
*/
}
// Fill the read buffer as best we can.
nRead = fillReadBuffer(block);
updateLastRead();
// Fill as much of the remaining byte array as possible with the
// data that was just read
if (nRead > 0) {
socketBufferHandler.configureReadBufferForRead();
nRead = Math.min(nRead, len);
socketBufferHandler.getReadBuffer().get(b, off, nRead);
}
return nRead;
}
@Override
public int read(boolean block, ByteBuffer to) throws IOException {
int nRead = populateReadBuffer(to);
if (nRead > 0) {
return nRead;
/*
* Since more bytes may have arrived since the buffer was last
* filled, it is an option at this point to perform a
* non-blocking read. However correctly handling the case if
* that read returns end of stream adds complexity. Therefore,
* at the moment, the preference is for simplicity.
*/
}
// The socket read buffer capacity is socket.appReadBufSize
int limit = socketBufferHandler.getReadBuffer().capacity();
if (to.remaining() >= limit) {
to.limit(to.position() + limit);
nRead = fillReadBuffer(block, to);
if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
}
updateLastRead();
} else {
// Fill the read buffer as best we can.
nRead = fillReadBuffer(block);
if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
}
updateLastRead();
// Fill as much of the remaining byte array as possible with the
// data that was just read
if (nRead > 0) {
nRead = populateReadBuffer(to);
}
}
return nRead;
}
@Override
protected void doClose() {
if (log.isDebugEnabled()) {
log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
}
try {
getEndpoint().connections.remove(getSocket().getIOChannel());
if (getSocket().isOpen()) {
getSocket().close(true);
}
if (getEndpoint().running && !getEndpoint().paused) {
if (nioChannels == null || !nioChannels.push(getSocket())) {
getSocket().free();
}
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) {
log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
}
} finally {
socketBufferHandler = SocketBufferHandler.EMPTY;
nonBlockingWriteBuffer.clear();
reset(NioChannel.CLOSED_NIO_CHANNEL);
}
try {
SendfileData data = getSendfileData();
if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
data.fchannel.close();
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) {
log.error(sm.getString("endpoint.sendfile.closeError"), e);
}
}
}
private int fillReadBuffer(boolean block) throws IOException {
socketBufferHandler.configureReadBufferForWrite();
return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
}
private int fillReadBuffer(boolean block, ByteBuffer buffer) throws IOException {
int n = 0;
if (getSocket() == NioChannel.CLOSED_NIO_CHANNEL) {
throw new ClosedChannelException();
}
if (block) {
long timeout = getReadTimeout();
long startNanos = 0;
do {
if (startNanos > 0) {
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (elapsedMillis == 0) {
elapsedMillis = 1;
}
timeout -= elapsedMillis;
if (timeout <= 0) {
throw new SocketTimeoutException();
}
}
n = getSocket().read(buffer);
if (n == -1) {
throw new EOFException();
} else if (n == 0) {
readBlocking = true;
registerReadInterest();
synchronized (readLock) {
if (readBlocking) {
try {
if (timeout > 0) {
startNanos = System.nanoTime();
readLock.wait(timeout);
} else {
readLock.wait();
}
} catch (InterruptedException e) {
// Continue
}
readBlocking = false;
}
}
}
} while (n == 0); // TLS needs to loop as reading zero application bytes is possible
} else {
n = getSocket().read(buffer);
if (n == -1) {
throw new EOFException();
}
}
return n;
}
@Override
protected void doWrite(boolean block, ByteBuffer buffer) throws IOException {
int n = 0;
if (getSocket() == NioChannel.CLOSED_NIO_CHANNEL) {
throw new ClosedChannelException();
}
if (block) {
long timeout = getWriteTimeout();
long startNanos = 0;
do {
if (startNanos > 0) {
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (elapsedMillis == 0) {
elapsedMillis = 1;
}
timeout -= elapsedMillis;
if (timeout <= 0) {
throw new SocketTimeoutException();
}
}
n = getSocket().write(buffer);
if (n == -1) {
throw new EOFException();
} else if (n == 0) {
writeBlocking = true;
registerWriteInterest();
synchronized (writeLock) {
if (writeBlocking) {
try {
if (timeout > 0) {
startNanos = System.nanoTime();
writeLock.wait(timeout);
} else {
writeLock.wait();
}
} catch (InterruptedException e) {
// Continue
}
writeBlocking = false;
}
}
} else if (startNanos > 0) {
// If something was written, reset timeout
timeout = getWriteTimeout();
startNanos = 0;
}
} while (buffer.hasRemaining());
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
} else {
do {
n = getSocket().write(buffer);
if (n == -1) {
throw new EOFException();
}
} while (n > 0 && buffer.hasRemaining());
}
updateLastWrite();
}
@Override
public void registerReadInterest() {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.registerRead", this));
}
getPoller().add(this, SelectionKey.OP_READ);
}
@Override
public void registerWriteInterest() {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.registerWrite", this));
}
getPoller().add(this, SelectionKey.OP_WRITE);
}
@Override
public SendfileDataBase createSendfileData(String filename, long pos, long length) {
return new SendfileData(filename, pos, length);
}
@Override
public SendfileState processSendfile(SendfileDataBase sendfileData) {
setSendfileData((SendfileData) sendfileData);
SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector());
// Might as well do the first write on this thread
return getPoller().processSendfile(key, this, true);
}
@Override
protected void populateRemoteAddr() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
InetAddress inetAddr = sc.socket().getInetAddress();
if (inetAddr != null) {
remoteAddr = inetAddr.getHostAddress();
}
}
}
@Override
protected void populateRemoteHost() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
InetAddress inetAddr = sc.socket().getInetAddress();
if (inetAddr != null) {
remoteHost = inetAddr.getHostName();
if (remoteAddr == null) {
remoteAddr = inetAddr.getHostAddress();
}
}
}
}
@Override
protected void populateRemotePort() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
remotePort = sc.socket().getPort();
}
}
@Override
protected void populateLocalName() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
InetAddress inetAddr = sc.socket().getLocalAddress();
if (inetAddr != null) {
localName = inetAddr.getHostName();
}
}
}
@Override
protected void populateLocalAddr() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
InetAddress inetAddr = sc.socket().getLocalAddress();
if (inetAddr != null) {
localAddr = inetAddr.getHostAddress();
}
}
}
@Override
protected void populateLocalPort() {
SocketChannel sc = getSocket().getIOChannel();
if (sc != null) {
localPort = sc.socket().getLocalPort();
}
}
{@inheritDoc}
Params: - clientCertProvider – Ignored for this implementation
/**
* {@inheritDoc}
* @param clientCertProvider Ignored for this implementation
*/
@Override
public SSLSupport getSslSupport(String clientCertProvider) {
if (getSocket() instanceof SecureNioChannel) {
SecureNioChannel ch = (SecureNioChannel) getSocket();
SSLEngine sslEngine = ch.getSslEngine();
if (sslEngine != null) {
SSLSession session = sslEngine.getSession();
return ((NioEndpoint) getEndpoint()).getSslImplementation().getSSLSupport(session);
}
}
return null;
}
@Override
public void doClientAuth(SSLSupport sslSupport) throws IOException {
SecureNioChannel sslChannel = (SecureNioChannel) getSocket();
SSLEngine engine = sslChannel.getSslEngine();
if (!engine.getNeedClientAuth()) {
// Need to re-negotiate SSL connection
engine.setNeedClientAuth(true);
sslChannel.rehandshake(getEndpoint().getConnectionTimeout());
((JSSESupport) sslSupport).setSession(engine.getSession());
}
}
@Override
public void setAppReadBufHandler(ApplicationBufferHandler handler) {
getSocket().setAppReadBufHandler(handler);
}
@Override
protected <A> OperationState<A> newOperationState(boolean read,
ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler,
Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
return new NioOperationState<>(read, buffers, offset, length, block,
timeout, unit, attachment, check, handler, semaphore, completion);
}
private class NioOperationState<A> extends OperationState<A> {
private volatile boolean inline = true;
private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
VectoredIOCompletionHandler<A> completion) {
super(read, buffers, offset, length, block,
timeout, unit, attachment, check, handler, semaphore, completion);
}
@Override
protected boolean isInline() {
return inline;
}
@Override
public void run() {
// Perform the IO operation
// Called from the poller to continue the IO operation
long nBytes = 0;
if (getError() == null) {
try {
synchronized (this) {
if (!completionDone) {
// This filters out same notification until processing
// of the current one is done
if (log.isDebugEnabled()) {
log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
}
return;
}
if (read) {
// Read from main buffer first
if (!socketBufferHandler.isReadBufferEmpty()) {
// There is still data inside the main read buffer, it needs to be read first
socketBufferHandler.configureReadBufferForRead();
for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
}
}
if (nBytes == 0) {
nBytes = getSocket().read(buffers, offset, length);
updateLastRead();
}
} else {
boolean doWrite = true;
// Write from main buffer first
if (!socketBufferHandler.isWriteBufferEmpty()) {
// There is still data inside the main write buffer, it needs to be written first
socketBufferHandler.configureWriteBufferForRead();
do {
nBytes = getSocket().write(socketBufferHandler.getWriteBuffer());
} while (!socketBufferHandler.isWriteBufferEmpty() && nBytes > 0);
if (!socketBufferHandler.isWriteBufferEmpty()) {
doWrite = false;
}
// Preserve a negative value since it is an error
if (nBytes > 0) {
nBytes = 0;
}
}
if (doWrite) {
long n = 0;
do {
n = getSocket().write(buffers, offset, length);
if (n == -1) {
nBytes = n;
} else {
nBytes += n;
}
} while (n > 0);
updateLastWrite();
}
}
if (nBytes != 0 || !buffersArrayHasRemaining(buffers, offset, length)) {
completionDone = false;
}
}
} catch (IOException e) {
setError(e);
}
}
if (nBytes > 0 || (nBytes == 0 && !buffersArrayHasRemaining(buffers, offset, length))) {
// The bytes processed are only updated in the completion handler
completion.completed(Long.valueOf(nBytes), this);
} else if (nBytes < 0 || getError() != null) {
IOException error = getError();
if (error == null) {
error = new EOFException();
}
completion.failed(error, this);
} else {
// As soon as the operation uses the poller, it is no longer inline
inline = false;
if (read) {
registerReadInterest();
} else {
registerWriteInterest();
}
}
}
}
}
// ---------------------------------------------- SocketProcessor Inner Class
This class is the equivalent of the Worker, but will simply use in an
external Executor thread pool.
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
/*
* Do not cache and re-use the value of socketWrapper.getSocket() in
* this method. If the socket closes the value will be updated to
* CLOSED_NIO_CHANNEL and the previous value potentially re-used for
* a new connection. That can result in a stale cached value which
* in turn can result in unintentionally closing currently active
* connections.
*/
Poller poller = NioEndpoint.this.poller;
if (poller == null) {
socketWrapper.close();
return;
}
try {
int handshake = -1;
try {
if (socketWrapper.getSocket().isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused && processorCache != null) {
processorCache.push(this);
}
}
}
private SelectionKey getSelectionKey() {
// Shortcut for Java 11 onwards
if (JreCompat.isJre11Available()) {
return null;
}
SocketChannel socketChannel = socketWrapper.getSocket().getIOChannel();
if (socketChannel == null) {
return null;
}
return socketChannel.keyFor(NioEndpoint.this.poller.getSelector());
}
}
// ----------------------------------------------- SendfileData Inner Class
SendfileData class.
/**
* SendfileData class.
*/
public static class SendfileData extends SendfileDataBase {
public SendfileData(String filename, long pos, long length) {
super(filename, pos, length);
}
protected volatile FileChannel fchannel;
}
}