package org.jruby.util.io;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyInteger;
import org.jruby.RubyFloat;
import org.jruby.RubyIO;
import org.jruby.RubyThread;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jruby.exceptions.RaiseException;
import static com.headius.backport9.buffer.Buffers.flipBuffer;
@Deprecated
public class SelectBlob {
public SelectBlob() {}
public IRubyObject goForIt(ThreadContext context, Ruby runtime, IRubyObject[] args) {
this.runtime = runtime;
try {
processReads(runtime, args, context);
processWrites(runtime, args, context);
if (args.length > 2 && !args[2].isNil()) {
checkArrayType(runtime, args[2]);
}
final boolean has_timeout = args.length > 3 && !args[3].isNil();
final long timeout = !has_timeout ? 0 : convertTimeout(context, args[3]);
if (timeout < 0) {
throw runtime.newArgumentError("time interval must be positive");
}
if (args[0].isNil() && args[1].isNil() && args[2].isNil()) {
RubyThread thread = context.getThread();
if (has_timeout) {
if (timeout > 0) {
long now = System.currentTimeMillis();
thread.sleep(timeout);
while (System.currentTimeMillis() < now + timeout) {
thread.sleep(1);
}
}
} else {
thread.sleep(0);
}
} else {
doSelect(runtime, has_timeout, timeout);
processSelectedKeys(runtime);
processPendingAndUnselectable();
tidyUp();
}
if (readResults == null && writeResults == null && errorResults == null) {
return runtime.getNil();
}
return constructResults(runtime);
} catch (BadDescriptorException e) {
throw runtime.newErrnoEBADFError();
} catch (CancelledKeyException e) {
throw runtime.newErrnoEBADFError();
} catch (IOException e) {
throw runtime.newIOErrorFromException(e);
} catch (InterruptedException ie) {
throw runtime.newThreadError("select interrupted");
} finally {
for (Selector selector : selectors.values()) {
try {
selector.close();
} catch (Exception e) {
}
}
}
}
private void processReads(Ruby runtime, IRubyObject[] args, ThreadContext context) throws BadDescriptorException, IOException {
if (!args[0].isNil()) {
checkArrayType(runtime, args[0]);
readArray = (RubyArray) args[0];
readSize = readArray.size();
if (readSize == 0) {
readArray = null;
} else {
readIOs = new RubyIO[readSize];
Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
for (int i = 0; i < readSize; i++) {
RubyIO ioObj = saveReadIO(i, context);
saveReadBlocking(ioObj, i);
saveBufferedRead(ioObj, i);
attachment.clear();
attachment.put('r', i);
trySelectRead(context, attachment, ioObj.getOpenFileChecked());
}
}
}
}
private RubyIO saveReadIO(int i, ThreadContext context) {
IRubyObject obj = readArray.eltOk(i);
RubyIO ioObj = RubyIO.convertToIO(context, obj);
readIOs[i] = ioObj;
return ioObj;
}
private void saveReadBlocking(RubyIO ioObj, int i) {
if (ioObj.getChannel() instanceof SelectableChannel) {
getReadBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
}
}
private void saveBufferedRead(RubyIO ioObj, int i) throws BadDescriptorException {
if (ioObj.getOpenFile().READ_DATA_BUFFERED()) {
getUnselectableReads()[i] = true;
}
}
private void trySelectRead(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException {
if (fptr.selectChannel() != null && registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), READ_ACCEPT_OPS)) {
selectedReads++;
if (fptr.READ_CHAR_PENDING() || fptr.READ_DATA_PENDING()) {
getPendingReads()[attachment.get('r')] = true;
}
} else {
if (fptr.isReadable()) {
getUnselectableReads()[attachment.get('r')] = true;
}
}
}
private void processWrites(Ruby runtime, IRubyObject[] args, ThreadContext context) throws IOException {
if (args.length > 1 && !args[1].isNil()) {
checkArrayType(runtime, args[1]);
writeArray = (RubyArray) args[1];
writeSize = writeArray.size();
if (writeArray.size() == 0) {
writeArray = null;
} else {
writeIOs = new RubyIO[writeSize];
Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
for (int i = 0; i < writeSize; i++) {
RubyIO ioObj = saveWriteIO(i, context);
saveWriteBlocking(ioObj, i);
attachment.clear();
attachment.put('w', i);
trySelectWrite(context, attachment, ioObj.getOpenFileChecked());
}
}
}
}
private RubyIO saveWriteIO(int i, ThreadContext context) {
IRubyObject obj = writeArray.eltOk(i);
RubyIO ioObj = RubyIO.convertToIO(context, obj);
writeIOs[i] = ioObj.GetWriteIO();
return ioObj;
}
private void saveWriteBlocking(RubyIO ioObj, int i) {
if (ioObj.getChannel() instanceof SelectableChannel) {
if (readBlocking != null) {
int readIndex = fastSearch(readIOs, ioObj);
if (readIndex == -1) {
getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
}
} else {
getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
}
}
}
private void trySelectWrite(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException {
if (fptr.selectChannel() == null
|| false == registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), WRITE_CONNECT_OPS)) {
selectedReads++;
if (fptr.isWritable()) {
getUnselectableWrites()[attachment.get('w')] = true;
}
}
}
private static long convertTimeout(final ThreadContext context, IRubyObject timeoutArg) {
final long timeout;
if (timeoutArg instanceof RubyFloat) {
timeout = Math.round(((RubyFloat) timeoutArg).getDoubleValue() * 1000);
}
else if (timeoutArg instanceof RubyInteger) {
timeout = Math.round(((RubyInteger) timeoutArg).getDoubleValue() * 1000);
}
else {
final Ruby runtime = context.runtime;
if ( ! runtime.is1_8() ) {
RubyFloat t = null;
try {
t = timeoutArg.callMethod(context, "to_f").convertToFloat();
}
catch (RaiseException e) { }
timeout = t != null ? Math.round(t.getDoubleValue() * 1000) : -1;
}
else timeout = -1;
if ( timeout == -1 ) {
throw runtime.newTypeError("can't convert " + timeoutArg.getMetaClass().getName() + " into time interval");
}
}
if ( timeout < 0 ) throw context.runtime.newArgumentError("negative timeout given");
return timeout;
}
private void doSelect(Ruby runtime, final boolean has_timeout, long timeout) throws IOException {
if (mainSelector != null) {
if (pendingReads == null && unselectableReads == null && unselectableWrites == null) {
if (has_timeout && timeout == 0) {
for (Selector selector : selectors.values()) selector.selectNow();
} else {
List<Future> futures = new ArrayList<Future>(enxioSelectors.size());
for (ENXIOSelector enxioSelector : enxioSelectors) {
futures.add(runtime.getExecutor().submit(enxioSelector));
}
mainSelector.select(has_timeout ? timeout : 0);
for (ENXIOSelector enxioSelector : enxioSelectors) enxioSelector.selector.wakeup();
for (Future f : futures) try {
f.get();
} catch (InterruptedException iex) {
} catch (ExecutionException eex) {
if (eex.getCause() instanceof IOException) {
throw (IOException) eex.getCause();
}
}
}
} else {
for (Selector selector : selectors.values()) selector.selectNow();
}
}
for (ENXIOSelector enxioSelector : enxioSelectors) {
Pipe.SourceChannel source = enxioSelector.pipe.source();
SelectionKey key = source.keyFor(mainSelector);
if (key != null && mainSelector.selectedKeys().contains(key)) {
mainSelector.selectedKeys().remove(key);
ByteBuffer buf = ByteBuffer.allocate(1);
source.read(buf);
}
}
}
public static final int READ_ACCEPT_OPS = SelectExecutor.READ_ACCEPT_OPS;
public static final int WRITE_CONNECT_OPS = SelectExecutor.WRITE_CONNECT_OPS;
private static final int CANCELLED_OPS = SelectionKey.OP_READ | SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT;
private static boolean ready(int ops, int mask) {
return (ops & mask) != 0;
}
private static boolean readAcceptReady(int ops) {
return ready(ops, READ_ACCEPT_OPS);
}
private static boolean writeConnectReady(int ops) {
return ready(ops, WRITE_CONNECT_OPS);
}
private static boolean cancelReady(int ops) {
return ready(ops, CANCELLED_OPS);
}
private static boolean writeReady(int ops) {
return ready(ops, SelectionKey.OP_WRITE);
}
@SuppressWarnings("unchecked")
private void processSelectedKeys(Ruby runtime) throws IOException {
for (Selector selector : selectors.values()) {
for (SelectionKey key : selector.selectedKeys()) {
int readIoIndex = 0;
int writeIoIndex = 0;
int interestAndReady = key.interestOps() & key.readyOps();
if (readArray != null && readAcceptReady(interestAndReady)) {
readIoIndex = ((Map<Character,Integer>)key.attachment()).get('r');
getReadResults().append(readArray.eltOk(readIoIndex));
if (pendingReads != null) {
pendingReads[readIoIndex] = false;
}
}
if (writeArray != null && writeConnectReady(interestAndReady)) {
writeIoIndex = ((Map<Character,Integer>)key.attachment()).get('w');
getWriteResults().append(writeArray.eltOk(writeIoIndex));
}
}
}
}
private void processPendingAndUnselectable() {
if (pendingReads != null) {
for (int i = 0; i < pendingReads.length; i++) {
if (pendingReads[i]) {
getReadResults().append(readArray.eltOk(i));
}
}
}
if (unselectableReads != null) {
for (int i = 0; i < unselectableReads.length; i++) {
if (unselectableReads[i]) {
getReadResults().append(readArray.eltOk(i));
}
}
}
if (unselectableWrites != null) {
for (int i = 0; i < unselectableWrites.length; i++) {
if (unselectableWrites[i]) {
getWriteResults().append(writeArray.eltOk(i));
}
}
}
}
private void tidyUp() throws IOException {
for (Selector selector : selectors.values()) {
selector.close();
}
for (ENXIOSelector enxioSelector : enxioSelectors) {
enxioSelector.pipe.sink().close();
enxioSelector.pipe.source().close();
}
if (readBlocking != null) {
for (int i = 0; i < readBlocking.length; i++) {
if (readBlocking[i] != null) {
try {
((SelectableChannel) readIOs[i].getChannel()).configureBlocking(readBlocking[i]);
} catch (IllegalBlockingModeException ibme) {
throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
}
}
}
}
if (writeBlocking != null) {
for (int i = 0; i < writeBlocking.length; i++) {
if (writeBlocking[i] != null) {
try {
((SelectableChannel) writeIOs[i].getChannel()).configureBlocking(writeBlocking[i]);
} catch (IllegalBlockingModeException ibme) {
throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
}
}
}
}
}
private RubyArray getReadResults() {
if (readResults == null) {
readResults = RubyArray.newArray(runtime, readArray.size());
}
return readResults;
}
private RubyArray getWriteResults() {
if (writeResults == null) {
writeResults = RubyArray.newArray(runtime, writeArray.size());
}
return writeResults;
}
private RubyArray getErrorResults() {
if (errorResults != null) {
errorResults = RubyArray.newArray(runtime, readArray.size() + writeArray.size());
}
return errorResults;
}
private Selector getSelector(ThreadContext context, SelectableChannel channel) throws IOException {
Selector selector = selectors.get(channel.provider());
if (selector == null) {
selector = SelectorFactory.openWithRetryFrom(context.runtime, channel.provider());
if (selectors.isEmpty()) {
selectors = new HashMap<SelectorProvider, Selector>();
}
selectors.put(channel.provider(), selector);
if (!selector.provider().equals(SelectorProvider.provider())) {
Pipe pipe = Pipe.open();
ENXIOSelector enxioSelector = new ENXIOSelector(selector, pipe);
if (enxioSelectors.isEmpty()) enxioSelectors = new ArrayList<ENXIOSelector>();
enxioSelectors.add(enxioSelector);
pipe.source().configureBlocking(false);
pipe.source().register(getSelector(context, pipe.source()), SelectionKey.OP_READ, enxioSelector);
} else if (mainSelector == null) {
mainSelector = selector;
}
}
return selector;
}
private Boolean[] getReadBlocking() {
if (readBlocking == null) {
readBlocking = new Boolean[readSize];
}
return readBlocking;
}
private Boolean[] getWriteBlocking() {
if (writeBlocking == null) {
writeBlocking = new Boolean[writeSize];
}
return writeBlocking;
}
private boolean[] getUnselectableReads() {
if (unselectableReads == null) {
unselectableReads = new boolean[readSize];
}
return unselectableReads;
}
private boolean[] getUnselectableWrites() {
if (unselectableWrites == null) {
unselectableWrites = new boolean[writeSize];
}
return unselectableWrites;
}
private boolean[] getPendingReads() {
if (pendingReads == null) {
pendingReads = new boolean[readSize];
}
return pendingReads;
}
private IRubyObject constructResults(Ruby runtime) {
return RubyArray.newArrayLight(
runtime,
readResults == null ? RubyArray.newEmptyArray(runtime) : readResults,
writeResults == null ? RubyArray.newEmptyArray(runtime) : writeResults,
errorResults == null ? RubyArray.newEmptyArray(runtime) : errorResults);
}
private int fastSearch(Object[] ary, Object obj) {
for (int i = 0; i < ary.length; i++) {
if (ary[i] == obj) {
return i;
}
}
return -1;
}
private static void checkArrayType(Ruby runtime, IRubyObject obj) {
if (!(obj instanceof RubyArray)) {
throw runtime.newTypeError("wrong argument type "
+ obj.getMetaClass().getName() + " (expected Array)");
}
}
@SuppressWarnings("unchecked")
private static boolean registerSelect(Selector selector, Map<Character,Integer> obj, SelectableChannel channel, int ops) throws IOException {
channel.configureBlocking(false);
int real_ops = channel.validOps() & ops;
SelectionKey key = channel.keyFor(selector);
if (key == null) {
Map<Character,Integer> attachment = new HashMap<Character,Integer> (1);
attachment.putAll(obj);
channel.register(selector, real_ops, attachment );
} else {
key.interestOps(key.interestOps() | real_ops);
Map<Character,Integer> att = (Map<Character,Integer>)key.attachment();
att.putAll(obj);
key.attach(att);
}
return true;
}
private static final class ENXIOSelector implements Callable<Object> {
private final Selector selector;
private final Pipe pipe;
private ENXIOSelector(Selector selector, Pipe pipe) {
this.selector = selector;
this.pipe = pipe;
}
public Object call() throws Exception {
try {
selector.select();
} finally {
ByteBuffer buf = ByteBuffer.allocate(1);
buf.put((byte) 0);
flipBuffer(buf);
pipe.sink().write(buf);
}
return null;
}
}
Ruby runtime;
RubyArray readArray = null;
int readSize = 0;
RubyIO[] readIOs = null;
boolean[] unselectableReads = null;
boolean[] pendingReads = null;
Boolean[] readBlocking = null;
int selectedReads = 0;
RubyArray writeArray = null;
int writeSize = 0;
RubyIO[] writeIOs = null;
boolean[] unselectableWrites = null;
Boolean[] writeBlocking = null;
int selectedWrites = 0;
Selector mainSelector = null;
Map<SelectorProvider, Selector> selectors = Collections.emptyMap();
Collection<ENXIOSelector> enxioSelectors = Collections.emptyList();
RubyArray readResults = null;
RubyArray writeResults = null;
RubyArray errorResults = null;
}