/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Thread safe non blocking selector pool
/**
* Thread safe non blocking selector pool
*/
public class NioSelectorPool {
protected NioBlockingSelector blockingSelector;
protected volatile Selector sharedSelector;
protected boolean shared = Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true"));
protected int maxSelectors = 200;
protected long sharedSelectorTimeout = 30000;
protected int maxSpareSelectors = -1;
protected boolean enabled = true;
protected AtomicInteger active = new AtomicInteger(0);
protected AtomicInteger spare = new AtomicInteger(0);
protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<>();
protected Selector getSharedSelector() throws IOException {
if (shared && sharedSelector == null) {
synchronized (NioSelectorPool.class) {
if (sharedSelector == null) {
sharedSelector = Selector.open();
}
}
}
return sharedSelector;
}
public Selector get() throws IOException{
if (shared) {
return getSharedSelector();
}
if ((!enabled) || active.incrementAndGet() >= maxSelectors) {
if (enabled) {
active.decrementAndGet();
}
return null;
}
Selector s = null;
try {
s = selectors.size() > 0 ? selectors.poll() : null;
if (s == null) {
s = Selector.open();
} else {
spare.decrementAndGet();
}
} catch (NoSuchElementException x) {
try {
s = Selector.open();
} catch (IOException iox) {
}
} finally {
if (s == null) {
active.decrementAndGet();// we were unable to find a selector
}
}
return s;
}
public void put(Selector s) throws IOException {
if (shared) {
return;
}
if (enabled) {
active.decrementAndGet();
}
if (enabled && (maxSpareSelectors == -1
|| spare.get() < Math.min(maxSpareSelectors, maxSelectors))) {
spare.incrementAndGet();
selectors.offer(s);
} else {
s.close();
}
}
public void close() throws IOException {
enabled = false;
Selector s;
while ((s = selectors.poll()) != null) {
s.close();
}
spare.set(0);
active.set(0);
if (blockingSelector != null) {
blockingSelector.close();
}
if (shared && getSharedSelector() != null) {
getSharedSelector().close();
sharedSelector = null;
}
}
public void open(String name) throws IOException {
enabled = true;
getSharedSelector();
if (shared) {
blockingSelector = new NioBlockingSelector();
blockingSelector.open(name, getSharedSelector());
}
}
Performs a write using the bytebuffer for data to be written and a
selector to block (if blocking is requested). If the
selector
parameter is null, and blocking is requested then
it will perform a busy write that could take up a lot of CPU cycles.
Params: - buf – The buffer containing the data, we will write as long as
(buf.hasRemaining()==true)
- socket – The socket to write data to
- selector – The selector to use for blocking, if null then a busy write will be initiated
- writeTimeout – The timeout for this write operation in milliseconds, -1 means no timeout
Throws: - EOFException – if write returns -1
- SocketTimeoutException – if the write times out
- IOException – if an IO Exception occurs in the underlying socket logic
Returns: the number of bytes written
/**
* Performs a write using the bytebuffer for data to be written and a
* selector to block (if blocking is requested). If the
* <code>selector</code> parameter is null, and blocking is requested then
* it will perform a busy write that could take up a lot of CPU cycles.
* @param buf The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
* @param socket The socket to write data to
* @param selector The selector to use for blocking, if null then a busy write will be initiated
* @param writeTimeout The timeout for this write operation in milliseconds, -1 means no timeout
* @return the number of bytes written
* @throws EOFException if write returns -1
* @throws SocketTimeoutException if the write times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout)
throws IOException {
if (shared) {
return blockingSelector.write(buf, socket, writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ((!timedout) && buf.hasRemaining()) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) {
throw new EOFException();
}
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
}
if (selector != null) {
//register OP_WRITE to the selector
if (key == null) {
key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
if (writeTimeout == 0) {
timedout = buf.hasRemaining();
} else if (writeTimeout < 0) {
keycount = selector.select();
} else {
keycount = selector.select(writeTimeout);
}
}
if (writeTimeout > 0 && (selector == null || keycount == 0)) {
timedout = (System.currentTimeMillis() - time) >= writeTimeout;
}
}
if (timedout) {
throw new SocketTimeoutException();
}
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}
Performs a blocking read using the bytebuffer for data to be read and a selector to block.
If the selector
parameter is null, then it will perform a busy read that could
take up a lot of CPU cycles.
Params: - buf – ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
- socket – SocketChannel - the socket to write data to
- selector – Selector - the selector to use for blocking, if null then a busy read will be initiated
- readTimeout – long - the timeout for this read operation in milliseconds, -1 means no timeout
Throws: - EOFException – if read returns -1
- SocketTimeoutException – if the read times out
- IOException – if an IO Exception occurs in the underlying socket logic
Returns: the number of bytes read
/**
* Performs a blocking read using the bytebuffer for data to be read and a selector to block.
* If the <code>selector</code> parameter is null, then it will perform a busy read that could
* take up a lot of CPU cycles.
* @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
* @param socket SocketChannel - the socket to write data to
* @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
* @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
* @return the number of bytes read
* @throws EOFException if read returns -1
* @throws SocketTimeoutException if the read times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout)
throws IOException {
if (shared) {
return blockingSelector.read(buf, socket, readTimeout);
}
SelectionKey key = null;
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while (!timedout) {
int cnt = 0;
if (keycount > 0) { //only read if we were registered for a read
cnt = socket.read(buf);
if (cnt == -1) {
if (read == 0) {
read = -1;
}
break;
}
read += cnt;
if (cnt > 0) continue; //read some more
if (cnt == 0 && read > 0) {
break; //we are done reading
}
}
if (selector != null) {//perform a blocking read
//register OP_WRITE to the selector
if (key == null) {
key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
}
else key.interestOps(SelectionKey.OP_READ);
if (readTimeout == 0) {
timedout = (read == 0);
} else if (readTimeout < 0) {
keycount = selector.select();
} else {
keycount = selector.select(readTimeout);
}
}
if (readTimeout > 0 && (selector == null || keycount == 0) ) {
timedout = (System.currentTimeMillis() - time) >= readTimeout;
}
}
if (timedout) {
throw new SocketTimeoutException();
}
} finally {
if (key != null) {
key.cancel();
if (selector != null) {
selector.selectNow();//removes the key from this selector
}
}
}
return read;
}
public void setMaxSelectors(int maxSelectors) {
this.maxSelectors = maxSelectors;
}
public void setMaxSpareSelectors(int maxSpareSelectors) {
this.maxSpareSelectors = maxSpareSelectors;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
this.sharedSelectorTimeout = sharedSelectorTimeout;
}
public int getMaxSelectors() {
return maxSelectors;
}
public int getMaxSpareSelectors() {
return maxSpareSelectors;
}
public boolean isEnabled() {
return enabled;
}
public long getSharedSelectorTimeout() {
return sharedSelectorTimeout;
}
public ConcurrentLinkedQueue<Selector> getSelectors() {
return selectors;
}
public AtomicInteger getSpare() {
return spare;
}
public boolean isShared() {
return shared;
}
public void setShared(boolean shared) {
this.shared = shared;
}
}