package org.jruby;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Queue;
import java.util.Vector;
import java.util.WeakHashMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import com.headius.backport9.stack.StackWalker;
import org.jcodings.Encoding;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.RaiseException;
import org.jruby.exceptions.ThreadKill;
import org.jruby.exceptions.Unrescuable;
import org.jruby.ext.thread.Mutex;
import org.jruby.internal.runtime.NativeThread;
import org.jruby.internal.runtime.RubyRunnable;
import org.jruby.internal.runtime.ThreadLike;
import org.jruby.internal.runtime.ThreadService;
import org.jruby.java.proxies.ConcreteJavaProxy;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Arity;
import org.jruby.runtime.Block;
import org.jruby.runtime.ClassIndex;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ObjectMarshal;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.ExecutionContext;
import org.jruby.runtime.backtrace.FrameType;
import org.jruby.runtime.backtrace.RubyStackTraceElement;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.jruby.util.StringSupport;
import org.jruby.util.TypeConverter;
import org.jruby.util.io.BlockingIO;
import org.jruby.util.io.ChannelFD;
import org.jruby.util.io.OpenFile;
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;
import org.jruby.common.IRubyWarnings.ID;
import static org.jruby.runtime.Visibility.*;
import static org.jruby.runtime.backtrace.BacktraceData.EMPTY_STACK_TRACE;
import static org.jruby.util.RubyStringBuilder.ids;
import static org.jruby.util.RubyStringBuilder.str;
import static org.jruby.util.RubyStringBuilder.types;
@JRubyClass(name="Thread")
public class RubyThread extends RubyObject implements ExecutionContext {
private static final Logger LOG = LoggerFactory.getLogger(RubyThread.class);
private static final StackWalker WALKER = ThreadContext.WALKER;
private volatile ThreadLike threadImpl = ThreadLike.DUMMY;
private volatile transient Map<IRubyObject, IRubyObject> fiberLocalVariables;
private volatile transient Map<IRubyObject, IRubyObject> threadLocalVariables;
private final Map<Object, IRubyObject> contextVariables = new WeakHashMap<Object, IRubyObject>();
private volatile boolean abortOnException;
private volatile IRubyObject reportOnException;
private volatile IRubyObject finalResult;
private String file; private int line;
private volatile Throwable exitingException;
private volatile RubyThreadGroup threadGroup;
private volatile IRubyObject errorInfo;
private volatile WeakReference<ThreadContext> contextRef;
private final Vector<RubyHash> interruptMaskStack = new Vector<>(4);
private final SleepTask2 sleepTask = new SleepTask2();
public static final int RUBY_MIN_THREAD_PRIORITY = -3;
public static final int RUBY_MAX_THREAD_PRIORITY = 3;
public enum Status {
RUN, SLEEP, ABORTING, DEAD, NATIVE;
public final ByteList bytes;
Status() {
bytes = new ByteList(toString().toLowerCase().getBytes(RubyEncoding.UTF8), false);
}
}
private final AtomicReference<Status> status = new AtomicReference<>(Status.RUN);
private final Queue<IRubyObject> pendingInterruptQueue = new ConcurrentLinkedQueue<>();
private volatile Unblocker unblockFunc;
private volatile Object unblockArg;
private final List<Lock> heldLocks = new Vector<>();
private volatile boolean disposed = false;
private volatile int interruptFlag = 0;
private volatile int interruptMask;
private volatile boolean pendingInterruptQueueChecked = false;
private volatile Selector currentSelector;
private volatile RubyThread fiberCurrentThread;
private static final AtomicIntegerFieldUpdater<RubyThread> INTERRUPT_FLAG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RubyThread.class, "interruptFlag");
private static final int TIMER_INTERRUPT_MASK = 0x01;
private static final int PENDING_INTERRUPT_MASK = 0x02;
private static final int POSTPONED_JOB_INTERRUPT_MASK = 0x04;
private static final int TRAP_INTERRUPT_MASK = 0x08;
private static final int INTERRUPT_NONE = 0;
private static final int INTERRUPT_IMMEDIATE = 1;
private static final int INTERRUPT_ON_BLOCKING = 2;
private static final int INTERRUPT_NEVER = 3;
protected RubyThread(Ruby runtime, RubyClass type) {
super(runtime, type);
finalResult = errorInfo = runtime.getNil();
reportOnException = runtime.getReportOnException();
}
public RubyThread(Ruby runtime, RubyClass klass, Runnable runnable) {
this(runtime, klass);
startThread(runtime.getCurrentContext(), runnable);
}
private void executeInterrupts(ThreadContext context, boolean blockingTiming) {
Ruby runtime = context.runtime;
int interrupt;
boolean postponedJobInterrupt = false;
while ((interrupt = getInterrupts()) != 0) {
boolean timerInterrupt = (interrupt & TIMER_INTERRUPT_MASK) == TIMER_INTERRUPT_MASK;
boolean pendingInterrupt = (interrupt & PENDING_INTERRUPT_MASK) == PENDING_INTERRUPT_MASK;
if (pendingInterrupt && pendingInterruptActive()) {
IRubyObject err = pendingInterruptDeque(context, blockingTiming ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
if (err == UNDEF) {
} else if (err instanceof RubyFixnum && (((RubyFixnum) err).getLongValue() == 0 ||
((RubyFixnum) err).getLongValue() == 1 ||
((RubyFixnum) err).getLongValue() == 2)) {
toKill();
} else {
afterBlockingCall();
if (getStatus() == Status.SLEEP) {
exitSleep();
}
IRubyObject[] args;
if (err instanceof RubyException) {
args = Helpers.arrayOf(err, RubyHash.newKwargs(runtime, "cause", ((RubyException) err).cause(context)));
} else {
args = Helpers.arrayOf(err);
}
RubyKernel.raise(context, this, args, Block.NULL_BLOCK);
}
}
}
}
private void postponedJobFlush(ThreadContext context) {
}
private boolean pendingInterruptActive() {
if (pendingInterruptQueueChecked) return false;
if (pendingInterruptQueue.isEmpty()) return false;
return true;
}
private void toKill() {
pendingInterruptClear();
status.set(Status.ABORTING);
throwThreadKill();
}
private void pendingInterruptClear() {
pendingInterruptQueue.clear();
}
private int getInterrupts() {
int interrupt;
while (true) {
interrupt = interruptFlag;
if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, interrupt, interrupt & interruptMask)) {
break;
}
}
return interrupt & ~interruptMask;
}
private IRubyObject pendingInterruptDeque(ThreadContext context, int timing) {
for (Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator(); iterator.hasNext();) {
IRubyObject err = iterator.next();
int maskTiming = pendingInterruptCheckMask(context, err);
switch (maskTiming) {
case INTERRUPT_ON_BLOCKING:
if (timing != INTERRUPT_ON_BLOCKING) break;
case INTERRUPT_NONE:
case INTERRUPT_IMMEDIATE:
iterator.remove();
return err;
case INTERRUPT_NEVER:
break;
}
}
pendingInterruptQueueChecked = true;
return UNDEF;
}
private int pendingInterruptCheckMask(ThreadContext context, IRubyObject err) {
int idx = interruptMaskStack.size();
if (idx == 0) return INTERRUPT_NONE;
List<IRubyObject> ancestors = getMetaClass(err).getAncestorList();
final int ancestorsLen = ancestors.size();
while (--idx >= 0) {
RubyHash mask = interruptMaskStack.get(idx);
for (int j = 0; j < ancestorsLen; j++) {
IRubyObject klass = ancestors.get(j);
IRubyObject sym;
if (!(sym = mask.op_aref(context, klass)).isNil()) {
return checkInterruptMask(context, sym);
}
}
}
return INTERRUPT_NONE;
}
public IRubyObject getErrorInfo() {
return errorInfo;
}
public IRubyObject setErrorInfo(IRubyObject errorInfo) {
this.errorInfo = errorInfo;
return errorInfo;
}
public void setContext(final ThreadContext context) {
this.contextRef = new WeakReference<>(context);
}
public void clearContext() {
WeakReference<ThreadContext> contextRef = this.contextRef;
if (contextRef != null) {
contextRef.clear();
this.contextRef = null;
}
}
public ThreadContext getContext() {
WeakReference<ThreadContext> contextRef = this.contextRef;
return contextRef == null ? null : contextRef.get();
}
public Thread getNativeThread() {
return threadImpl.nativeThread();
}
public void setFiberCurrentThread(RubyThread fiberCurrentThread) {
this.fiberCurrentThread = fiberCurrentThread;
}
public RubyThread getFiberCurrentThread() {
RubyThread fiberCurrentThread = this.fiberCurrentThread;
return fiberCurrentThread == null ? this : fiberCurrentThread;
}
public void beforeStart() {
}
public void dispose() {
if (disposed) return;
synchronized (this) {
if (disposed) return;
disposed = true;
threadGroup.remove(this);
unlockAll();
beDead();
}
getRuntime().getThreadService().unregisterThread(this);
}
public static RubyClass createThreadClass(Ruby runtime) {
RubyClass threadClass = runtime.defineClass("Thread", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
runtime.setThread(threadClass);
threadClass.setClassIndex(ClassIndex.THREAD);
threadClass.setReifiedClass(RubyThread.class);
threadClass.defineAnnotatedMethods(RubyThread.class);
RubyThread rubyThread = new RubyThread(runtime, threadClass);
rubyThread.threadImpl = new NativeThread(rubyThread, Thread.currentThread());
runtime.getThreadService().setMainThread(Thread.currentThread(), rubyThread);
runtime.getDefaultThreadGroup().addDirectly(rubyThread);
threadClass.setMarshal(ObjectMarshal.NOT_MARSHALABLE_MARSHAL);
RubyClass backtrace = threadClass.defineClassUnder("Backtrace", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
RubyClass location = backtrace.defineClassUnder("Location", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
location.defineAnnotatedMethods(Location.class);
runtime.setLocation(location);
return threadClass;
}
public static class Location extends RubyObject {
private final RubyStackTraceElement element;
private transient RubyString baseLabel = null;
private transient RubyString label = null;
public Location(Ruby runtime, RubyClass klass, RubyStackTraceElement element) {
super(runtime, klass);
this.element = element;
}
@JRubyMethod
public IRubyObject absolute_path(ThreadContext context) {
return context.runtime.newString(element.getFileName());
}
@JRubyMethod
public IRubyObject base_label(ThreadContext context) {
if (baseLabel == null) baseLabel = context.runtime.newString(element.getMethodName());
return baseLabel;
}
@JRubyMethod
public IRubyObject inspect(ThreadContext context) {
return to_s(context).inspect();
}
@JRubyMethod
public IRubyObject label(ThreadContext context) {
if (element.getFrameType() == FrameType.BLOCK) {
if (label == null) label = context.runtime.newString("block in " + element.getMethodName());
return label;
}
return base_label(context);
}
@JRubyMethod
public IRubyObject lineno(ThreadContext context) {
return context.runtime.newFixnum(element.getLineNumber());
}
@JRubyMethod
public IRubyObject path(ThreadContext context) {
return context.runtime.newString(element.getFileName());
}
@JRubyMethod
public IRubyObject to_s(ThreadContext context) {
return RubyStackTraceElement.to_s_mri(context, element);
}
public static RubyArray newLocationArray(Ruby runtime, RubyStackTraceElement[] elements) {
return newLocationArray(runtime, elements, 0, elements.length);
}
public static RubyArray newLocationArray(Ruby runtime, RubyStackTraceElement[] elements,
final int offset, final int length) {
final RubyClass locationClass = runtime.getLocation();
IRubyObject[] ary = new IRubyObject[length];
for ( int i = 0; i < length; i++ ) {
ary[i] = new RubyThread.Location(runtime, locationClass, elements[i + offset]);
}
return RubyArray.newArrayNoCopy(runtime, ary);
}
}
@JRubyMethod(name = {"new", "fork"}, rest = true, meta = true)
public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) {
return startThread(recv, args, true, block);
}
@JRubyMethod(rest = true, name = "start", meta = true)
public static RubyThread start(IRubyObject recv, IRubyObject[] args, Block block) {
if (! block.isGiven()) {
throw recv.getRuntime().newArgumentError("tried to create Proc object without a block");
}
return startThread(recv, args, false, block);
}
@Deprecated
public static RubyThread start19(IRubyObject recv, IRubyObject[] args, Block block) {
return start(recv, args, block);
}
public static RubyThread adopt(IRubyObject recv, Thread t) {
final Ruby runtime = recv.getRuntime();
return adoptThread(runtime, runtime.getThreadService(), (RubyClass) recv, t);
}
public static RubyThread adopt(Ruby runtime, ThreadService service, Thread thread) {
return adoptThread(runtime, service, runtime.getThread(), thread);
}
private static RubyThread adoptThread(final Ruby runtime, final ThreadService service,
final RubyClass recv, final Thread thread) {
final RubyThread rubyThread = new RubyThread(runtime, recv);
rubyThread.threadImpl = new NativeThread(rubyThread, thread);
ThreadContext context = service.registerNewThread(rubyThread);
service.associateThread(thread, rubyThread);
context.preAdoptThread();
runtime.getDefaultThreadGroup().addDirectly(rubyThread);
return rubyThread;
}
@JRubyMethod(rest = true, visibility = PRIVATE)
public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) {
if (!block.isGiven()) throw context.runtime.newThreadError("must be called with a block");
if (threadImpl != ThreadLike.DUMMY) throw context.runtime.newThreadError("already initialized thread");
return startThread(context, new RubyRunnable(this, args, block));
}
private IRubyObject startThread(ThreadContext context, Runnable runnable) throws RaiseException, OutOfMemoryError {
final Ruby runtime = context.runtime;
try {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
this.file = context.getFile();
this.line = context.getLine();
initThreadName(runtime, thread, file, line);
threadImpl = new NativeThread(this, thread);
addToCorrectThreadGroup(context);
runtime.getThreadService().associateThread(thread, this);
copyInterrupts(context, context.getThread().interruptMaskStack, this.interruptMaskStack);
threadImpl.start();
Thread.yield();
return this;
}
catch (OutOfMemoryError oome) {
if ("unable to create new native thread".equals(oome.getMessage())) {
throw runtime.newThreadError(oome.getMessage());
}
throw oome;
}
catch (SecurityException ex) {
throw runtime.newThreadError(ex.getMessage());
}
}
private static final RubyHash[] NULL_ARRAY = new RubyHash[0];
private static void copyInterrupts(ThreadContext context, Vector<RubyHash> sourceStack, Vector<RubyHash> targetStack) {
for (RubyHash h : sourceStack.toArray(NULL_ARRAY)) {
targetStack.add(h.dupFast(context));
}
}
private static final String RUBY_THREAD_PREFIX = "Ruby-";
private static void initThreadName(final Ruby runtime, final Thread thread, final String file, final int line) {
final String newName;
final StringBuilder name = new StringBuilder(24);
name
.append(RUBY_THREAD_PREFIX)
.append(runtime.getRuntimeNumber())
.append('-')
.append("Thread-")
.append(incAndGetThreadCount(runtime));
if ( file != null ) {
name
.append(':')
.append(' ')
.append(file)
.append(':')
.append(line + 1);
}
newName = name.toString();
thread.setName(newName);
}
private static long incAndGetThreadCount(final Ruby runtime) {
return runtime.getThreadService().incrementAndGetThreadCount();
}
private static RubyThread startThread(final IRubyObject recv, final IRubyObject[] args, boolean callInit, Block block) {
Ruby runtime = recv.getRuntime();
RubyThread rubyThread = new RubyThread(runtime, (RubyClass) recv);
if (callInit) {
rubyThread.callInit(args, block);
if (rubyThread.threadImpl == ThreadLike.DUMMY) {
throw runtime.newThreadError(str(runtime, "uninitialized thread - check " , types(runtime, (RubyClass) recv), "#initialize"));
}
} else {
rubyThread.initialize(runtime.getCurrentContext(), args, block);
}
return rubyThread;
}
protected static RubyThread startWaiterThread(final Ruby runtime, int pid, Block block) {
final IRubyObject waiter = runtime.getProcess().getConstantAt("Waiter");
final RubyThread rubyThread = new RubyThread(runtime, (RubyClass) waiter);
rubyThread.op_aset(runtime.newSymbol("pid"), runtime.newFixnum(pid));
rubyThread.callInit(IRubyObject.NULL_ARRAY, block);
return rubyThread;
}
public synchronized void cleanTerminate(IRubyObject result) {
finalResult = result;
}
public void beDead() {
status.set(Status.DEAD);
}
public void pollThreadEvents() {
pollThreadEvents(metaClass.runtime.getCurrentContext());
}
public void pollThreadEvents(ThreadContext context) {
if (anyInterrupted()) {
executeInterrupts(context, true);
}
}
private boolean anyInterrupted() {
return Thread.interrupted() || (interruptFlag & ~interruptMask) != 0;
}
private static void throwThreadKill() {
throw new ThreadKill();
}
@JRubyMethod(meta = true)
public static IRubyObject handle_interrupt(ThreadContext context, IRubyObject self, IRubyObject _mask, Block block) {
if (!block.isGiven()) {
throw context.runtime.newArgumentError("block is needed");
}
final RubyHash mask = (RubyHash) TypeConverter.convertToType(_mask, context.runtime.getHash(), "to_hash");
mask.visitAll(context, HandleInterruptVisitor, null);
final RubyThread thread = context.getThread();
thread.interruptMaskStack.add(mask);
if (thread.pendingInterruptQueue.isEmpty()) {
thread.pendingInterruptQueueChecked = false;
thread.setInterrupt();
}
try {
thread.pollThreadEvents();
return block.call(context);
} finally {
thread.interruptMaskStack.remove(thread.interruptMaskStack.size() - 1);
thread.setInterrupt();
thread.pollThreadEvents(context);
}
}
private static final RubyHash.VisitorWithState HandleInterruptVisitor = new RubyHash.VisitorWithState<Void>() {
@Override
public void visit(ThreadContext context, RubyHash self, IRubyObject key, IRubyObject value, int index, Void state) {
checkInterruptMask(context, value);
}
};
private static int checkInterruptMask(final ThreadContext context, final IRubyObject sym) {
if (sym instanceof RubySymbol) {
switch (((RubySymbol) sym).idString()) {
case "immediate": return INTERRUPT_IMMEDIATE;
case "on_blocking": return INTERRUPT_ON_BLOCKING;
case "never": return INTERRUPT_NEVER;
}
}
throw context.runtime.newArgumentError("unknown mask signature");
}
@JRubyMethod(name = "pending_interrupt?", meta = true, optional = 1)
public static IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject self, IRubyObject[] args) {
return context.getThread().pending_interrupt_p(context, args);
}
@JRubyMethod(name = "pending_interrupt?", optional = 1)
public IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject[] args) {
if (pendingInterruptQueue.isEmpty()) {
return context.fals;
}
if (args.length == 1) {
IRubyObject err = args[0];
if (!(err instanceof RubyModule)) {
throw context.runtime.newTypeError("class or module required for rescue clause");
}
return pendingInterruptInclude((RubyModule) err) ? context.tru : context.fals;
}
return context.tru;
}
private boolean pendingInterruptInclude(RubyModule err) {
Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator();
while (iterator.hasNext()) {
RubyModule e = (RubyModule) iterator.next();
if (e.isKindOfModule(err)) return true;
}
return false;
}
@JRubyMethod(name = "name=", required = 1)
public IRubyObject setName(IRubyObject name) {
final Ruby runtime = getRuntime();
if (!name.isNil()) {
RubyString nameStr = StringSupport.checkEmbeddedNulls(runtime, name);
Encoding enc = nameStr.getEncoding();
if (!enc.isAsciiCompatible()) {
throw runtime.newArgumentError("ASCII incompatible encoding (" + enc + ")");
}
threadImpl.setRubyName(runtime.freezeAndDedupString(nameStr).asJavaString());
} else {
threadImpl.setRubyName(null);
}
return name;
}
@JRubyMethod(name = "name")
public IRubyObject getName() {
Ruby runtime = getRuntime();
CharSequence rubyName = threadImpl.getRubyName();
if (rubyName == null) return runtime.getNil();
return RubyString.newString(runtime, rubyName);
}
@JRubyMethod(name = "abort_on_exception", meta = true)
public static RubyBoolean abort_on_exception_x(IRubyObject recv) {
Ruby runtime = recv.getRuntime();
return runtime.isGlobalAbortOnExceptionEnabled() ? runtime.getTrue() : runtime.getFalse();
}
@JRubyMethod(name = "abort_on_exception=", required = 1, meta = true)
public static IRubyObject abort_on_exception_set_x(IRubyObject recv, IRubyObject value) {
recv.getRuntime().setGlobalAbortOnExceptionEnabled(value.isTrue());
return value;
}
@JRubyMethod(meta = true)
public static RubyThread current(IRubyObject recv) {
return recv.getRuntime().getCurrentContext().getThread();
}
@JRubyMethod(meta = true)
public static RubyThread main(IRubyObject recv) {
return recv.getRuntime().getThreadService().getMainThread();
}
@JRubyMethod(meta = true)
public static IRubyObject pass(ThreadContext context, IRubyObject recv) {
Thread.yield();
return context.nil;
}
@JRubyMethod(meta = true)
public static RubyArray list(IRubyObject recv) {
Ruby runtime = recv.getRuntime();
RubyThread[] activeThreads = runtime.getThreadService().getActiveRubyThreads();
return RubyArray.newArrayMayCopy(runtime, activeThreads);
}
private void addToCorrectThreadGroup(ThreadContext context) {
IRubyObject group = context.getThread().group();
if (!group.isNil()) {
((RubyThreadGroup) group).addDirectly(this);
} else {
context.runtime.getDefaultThreadGroup().addDirectly(this);
}
}
private RubySymbol getSymbolKey(IRubyObject originalKey) {
if (originalKey instanceof RubySymbol) return (RubySymbol) originalKey;
Ruby runtime = getRuntime();
if (originalKey instanceof RubyString) return runtime.newSymbol(((RubyString) originalKey).getByteList());
throw runtime.newTypeError(str(runtime, ids(runtime, originalKey), " is not a symbol nor a string"));
}
private Map<IRubyObject, IRubyObject> getFiberLocals() {
Map<IRubyObject, IRubyObject> locals = fiberLocalVariables;
if (locals == null) {
synchronized (this) {
locals = fiberLocalVariables;
if (locals == null) locals = fiberLocalVariables = new HashMap<>();
}
}
return locals;
}
private Map<IRubyObject, IRubyObject> getThreadLocals() {
return getFiberCurrentThread().getThreadLocals0();
}
private Map<IRubyObject, IRubyObject> getThreadLocals0() {
Map<IRubyObject, IRubyObject> locals = threadLocalVariables;
if (locals == null) {
synchronized (this) {
locals = threadLocalVariables;
if (locals == null) locals = threadLocalVariables = new HashMap<>();
}
}
return locals;
}
@Override
public final Map<Object, IRubyObject> getContextVariables() {
return contextVariables;
}
public boolean isAlive(){
return threadImpl.isAlive() && getStatus() != Status.DEAD;
}
@JRubyMethod
public IRubyObject fetch(ThreadContext context, IRubyObject key, Block block) {
IRubyObject value = op_aref(context, key);
if (value.isNil()) {
if (block.isGiven()) return block.yield(context, key);
throw context.runtime.newKeyError("key not found: " + key.inspect(), this, key);
}
return value;
}
@JRubyMethod
public IRubyObject fetch(ThreadContext context, IRubyObject key, IRubyObject _default, Block block) {
final boolean blockGiven = block.isGiven();
if (blockGiven) {
context.runtime.getWarnings().warn(ID.BLOCK_BEATS_DEFAULT_VALUE, "block supersedes default value argument");
}
IRubyObject value = op_aref(context, key);
if (value == context.nil) {
if (blockGiven) return block.yield(context, key);
return _default;
}
return value;
}
@JRubyMethod(name = "[]", required = 1)
public IRubyObject op_aref(ThreadContext context, IRubyObject key) {
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getFiberLocals();
synchronized (locals) {
IRubyObject value;
return (value = locals.get(key)) == null ? context.nil : value;
}
}
@Deprecated
public IRubyObject op_aref(IRubyObject key) {
return op_aref(getRuntime().getCurrentContext(), key);
}
@JRubyMethod(name = "[]=", required = 2)
public IRubyObject op_aset(IRubyObject key, IRubyObject value) {
checkFrozen();
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getFiberLocals();
synchronized (locals) {
locals.put(key, value);
}
return value;
}
@JRubyMethod(name = "key?", required = 1)
public RubyBoolean key_p(ThreadContext context, IRubyObject key) {
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getFiberLocals();
synchronized (locals) {
return context.runtime.newBoolean(locals.containsKey(key));
}
}
@Deprecated
public RubyBoolean key_p(IRubyObject key) {
return key_p(getRuntime().getCurrentContext(), key);
}
@JRubyMethod
public RubyArray keys() {
final Map<IRubyObject, IRubyObject> locals = getFiberLocals();
IRubyObject[] ary;
synchronized (locals) {
ary = new IRubyObject[locals.size()];
int i = 0;
for (Map.Entry<IRubyObject, IRubyObject> entry : locals.entrySet()) {
ary[i++] = entry.getKey();
}
}
return RubyArray.newArrayMayCopy(getRuntime(), ary);
}
@JRubyMethod(name = "thread_variable?", required = 1)
public IRubyObject thread_variable_p(ThreadContext context, IRubyObject key) {
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getThreadLocals();
synchronized (locals) {
return context.runtime.newBoolean(locals.containsKey(key));
}
}
@JRubyMethod(name = "thread_variable_get", required = 1)
public IRubyObject thread_variable_get(ThreadContext context, IRubyObject key) {
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getThreadLocals();
synchronized (locals) {
IRubyObject value;
return (value = locals.get(key)) == null ? context.nil : value;
}
}
@JRubyMethod(name = "thread_variable_set", required = 2)
public IRubyObject thread_variable_set(ThreadContext context, IRubyObject key, IRubyObject value) {
checkFrozen();
key = getSymbolKey(key);
final Map<IRubyObject, IRubyObject> locals = getThreadLocals();
synchronized (locals) {
locals.put(key, value);
}
return value;
}
@JRubyMethod(name = "thread_variables")
public IRubyObject thread_variables(ThreadContext context) {
final Map<IRubyObject, IRubyObject> locals = getThreadLocals();
IRubyObject[] ary;
synchronized (locals) {
ary = new IRubyObject[locals.size()];
int i = 0;
for (Map.Entry<IRubyObject, IRubyObject> entry : locals.entrySet()) {
ary[i++] = entry.getKey();
}
}
return RubyArray.newArrayMayCopy(context.runtime, ary);
}
@JRubyMethod
public RubyBoolean abort_on_exception() {
return abortOnException ? getRuntime().getTrue() : getRuntime().getFalse();
}
@JRubyMethod(name = "abort_on_exception=", required = 1)
public IRubyObject abort_on_exception_set(IRubyObject val) {
abortOnException = val.isTrue();
return val;
}
@JRubyMethod(name = "alive?")
public RubyBoolean alive_p() {
return isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
}
@Deprecated
public IRubyObject join(IRubyObject[] args) {
return join(getRuntime().getCurrentContext(), args);
}
@JRubyMethod(optional = 1)
public IRubyObject join(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;
long timeoutMillis = Long.MAX_VALUE;
if (args.length > 0 && !args[0].isNil()) {
if (args.length > 1) {
throw runtime.newArgumentError(args.length, 1);
}
timeoutMillis = (long) (1000 * RubyNumeric.num2dbl(args[0]));
if (timeoutMillis <= 0) {
if (threadImpl.isAlive()) {
return context.nil;
} else {
return this;
}
}
}
if (isCurrent()) {
throw runtime.newThreadError("Target thread must not be current thread");
}
RubyThread currentThread = context.getThread();
try {
currentThread.enterSleep();
final long timeToWait = Math.min(timeoutMillis, 200);
long start = System.currentTimeMillis();
while (true) {
currentThread.pollThreadEvents(context);
threadImpl.join(timeToWait);
if (!threadImpl.isAlive()) {
break;
}
if (System.currentTimeMillis() - start > timeoutMillis) {
break;
}
}
} catch (InterruptedException ie) {
ie.printStackTrace();
assert false : ie;
} catch (ExecutionException ie) {
ie.printStackTrace();
assert false : ie;
} finally {
currentThread.exitSleep();
}
final Throwable exception = this.exitingException;
if (exception != null) {
if (exception instanceof RaiseException) {
runtime.getGlobalVariables().set("$!", ((RaiseException) exception).getException());
} else {
runtime.getGlobalVariables().set("$!", JavaUtil.convertJavaToUsableRubyObject(runtime, exception));
}
Helpers.throwException(exception);
}
currentThread.pollThreadEvents(context);
if (threadImpl.isAlive()) {
return context.nil;
} else {
return this;
}
}
@JRubyMethod
public IRubyObject value(ThreadContext context) {
join(context, NULL_ARRAY);
synchronized (this) {
return finalResult;
}
}
@Deprecated
public IRubyObject value() {
return value(getRuntime().getCurrentContext());
}
@JRubyMethod
public IRubyObject group() {
final RubyThreadGroup group = this.threadGroup;
return group == null ? getRuntime().getNil() : group;
}
void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
threadGroup = rubyThreadGroup;
}
@Override
public IRubyObject inspect() {
return inspect(metaClass.runtime.getCurrentContext());
}
@JRubyMethod
public RubyString inspect(ThreadContext context) {
final Ruby runtime = context.runtime;
RubyString result = runtime.newString("#<");
result.cat(getMetaClass().getRealClass().toRubyString(context));
result.cat(':');
result.catString(identityString());
synchronized (this) {
String id = threadImpl.getRubyName();
if (notEmpty(id)) {
result.cat('@');
result.cat(runtime.newSymbol(id).getBytes());
}
if (notEmpty(file) && line >= 0) {
result.cat('@');
result.catString(file);
result.cat(':');
result.catString(Integer.toString(line + 1));
}
result.cat(' ');
result.catString(status.toString().toLowerCase());
result.cat('>');
return result;
}
}
private static boolean notEmpty(String str) {
return str != null && str.length() > 0;
}
@JRubyMethod(meta = true)
public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {
RubyThread rubyThread = context.getThread();
if (context.runtime.getThreadService().getActiveRubyThreads().length == 1) {
throw context.runtime.newThreadError("stopping only thread\n\tnote: use sleep to stop forever");
}
synchronized (rubyThread) {
rubyThread.pollThreadEvents(context);
Status oldStatus = rubyThread.getStatus();
try {
rubyThread.status.set(Status.SLEEP);
rubyThread.wait();
} catch (InterruptedException ie) {
} finally {
rubyThread.pollThreadEvents(context);
rubyThread.status.set(oldStatus);
}
}
return context.nil;
}
@JRubyMethod(required = 1, meta = true)
public static IRubyObject kill(IRubyObject receiver, IRubyObject rubyThread, Block block) {
if (!(rubyThread instanceof RubyThread)) throw receiver.getRuntime().newTypeError(rubyThread, receiver.getRuntime().getThread());
return ((RubyThread)rubyThread).kill();
}
@JRubyMethod(meta = true)
public static IRubyObject exit(IRubyObject receiver, Block block) {
RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
return rubyThread.kill();
}
@JRubyMethod(name = "stop?")
public RubyBoolean stop_p() {
return getRuntime().newBoolean(getStatus() == Status.SLEEP || getStatus() == Status.DEAD);
}
@JRubyMethod
public synchronized RubyThread wakeup() {
if(!threadImpl.isAlive() && getStatus() == Status.DEAD) {
throw getRuntime().newThreadError("killed thread");
}
status.set(Status.RUN);
interrupt();
return this;
}
@JRubyMethod
public RubyFixnum priority() {
return RubyFixnum.newFixnum(getRuntime(), javaPriorityToRubyPriority(threadImpl.getPriority()));
}
@JRubyMethod(name = "priority=", required = 1)
public IRubyObject priority_set(IRubyObject priority) {
int iPriority = RubyNumeric.fix2int(priority);
if (iPriority < RUBY_MIN_THREAD_PRIORITY) {
iPriority = RUBY_MIN_THREAD_PRIORITY;
} else if (iPriority > RUBY_MAX_THREAD_PRIORITY) {
iPriority = RUBY_MAX_THREAD_PRIORITY;
}
if (threadImpl.isAlive()) {
int jPriority = rubyPriorityToJavaPriority(iPriority);
if (jPriority < Thread.MIN_PRIORITY) {
jPriority = Thread.MIN_PRIORITY;
} else if (jPriority > Thread.MAX_PRIORITY) {
jPriority = Thread.MAX_PRIORITY;
}
threadImpl.setPriority(jPriority);
}
return RubyFixnum.newFixnum(getRuntime(), iPriority);
}
public static int javaPriorityToRubyPriority(int javaPriority) {
double d = 1.5 * Math.sqrt(8.0 * javaPriority + 41) - 13.5;
return Math.round((float) d);
}
public static int rubyPriorityToJavaPriority(int rubyPriority) {
double d = (rubyPriority * rubyPriority) / 18.0 + 1.5 * rubyPriority + 5;
return Math.round((float) d);
}
public final IRubyObject raise(IRubyObject exception) {
ThreadContext context = metaClass.runtime.getCurrentContext();
return genericRaise(context, context.getThread(), exception);
}
public final IRubyObject raise(IRubyObject exception, RubyString message) {
ThreadContext context = metaClass.runtime.getCurrentContext();
return genericRaise(context, context.getThread(), exception, message);
}
@JRubyMethod(optional = 3)
public IRubyObject raise(ThreadContext context, IRubyObject[] args, Block block) {
return genericRaise(context, context.getThread(), args);
}
@Deprecated
public IRubyObject raise(IRubyObject[] args, Block block) {
return raise(getRuntime().getCurrentContext(), args, block);
}
private IRubyObject genericRaise(ThreadContext context, RubyThread currentThread, IRubyObject... args) {
if (!isAlive()) return context.nil;
if (currentThread == this) {
RubyKernel.raise(context, this, args, Block.NULL_BLOCK);
assert false;
}
IRubyObject exception = prepareRaiseException(context, args, Block.NULL_BLOCK);
pendingInterruptEnqueue(exception);
interrupt();
return context.nil;
}
private IRubyObject prepareRaiseException(ThreadContext context, IRubyObject[] args, Block block) {
final Ruby runtime = context.runtime;
if (args.length == 0) {
if (errorInfo.isNil()) {
return RaiseException.from(runtime, runtime.getRuntimeError(), "").getException();
}
return errorInfo;
}
final IRubyObject arg = args[0];
IRubyObject tmp;
final RubyException exception;
if (args.length == 1) {
if (arg instanceof RubyString) {
tmp = runtime.getRuntimeError().newInstance(context, args, block);
}
else if (arg instanceof ConcreteJavaProxy ) {
return arg;
}
else if ( ! arg.respondsTo("exception") ) {
throw runtime.newTypeError("exception class/object expected");
} else {
tmp = arg.callMethod(context, "exception");
}
} else {
if ( ! arg.respondsTo("exception") ) {
throw runtime.newTypeError("exception class/object expected");
}
tmp = arg.callMethod(context, "exception", args[1]);
}
if (!runtime.getException().isInstance(tmp)) {
throw runtime.newTypeError("exception object expected");
}
exception = (RubyException) tmp;
if (args.length == 3) {
exception.set_backtrace(args[2]);
}
IRubyObject cause = context.getErrorInfo();
if (cause != exception) {
exception.setCause(cause);
}
return exception;
}
@JRubyMethod
public synchronized IRubyObject run() {
return wakeup();
}
public boolean sleep(long millis) throws InterruptedException {
assert this == getRuntime().getCurrentContext().getThread();
sleepTask.millis = millis;
try {
long timeSlept = executeTask(getContext(), null, sleepTask);
if (millis == 0 || timeSlept >= millis) {
return true;
} else {
return false;
}
} finally {
sleepTask.semaphore.drainPermits();
}
}
public IRubyObject status() {
return status(getRuntime().getCurrentContext());
}
@JRubyMethod
public IRubyObject status(ThreadContext context) {
final Status status = getStatus();
if (threadImpl.isAlive() && status != Status.DEAD) {
return context.runtime.getThreadStatus(status);
}
return exitingException != null ? context.nil : context.fals;
}
@Deprecated
public static interface BlockingTask {
public void run() throws InterruptedException;
public void wakeup();
}
public interface Unblocker<Data> {
public void wakeup(RubyThread thread, Data self);
}
public interface Task<Data, Return> extends Unblocker<Data> {
public Return run(ThreadContext context, Data data) throws InterruptedException;
public void wakeup(RubyThread thread, Data data);
}
public static final class SleepTask implements BlockingTask {
private final Object object;
private final long millis;
private final int nanos;
public SleepTask(Object object, long millis, int nanos) {
this.object = object;
this.millis = millis;
this.nanos = nanos;
}
@Override
public void run() throws InterruptedException {
synchronized (object) {
object.wait(millis, nanos);
}
}
@Override
public void wakeup() {
synchronized (object) {
object.notify();
}
}
}
private static class SleepTask2 implements Task<Object, Long> {
final Semaphore semaphore = new Semaphore(1);
long millis;
{semaphore.drainPermits();}
@Override
public Long run(ThreadContext context, Object data) throws InterruptedException {
long start = System.currentTimeMillis();
try {
if (millis == 0) {
semaphore.tryAcquire(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} else {
semaphore.tryAcquire(millis, TimeUnit.MILLISECONDS);
}
return System.currentTimeMillis() - start;
} finally {
semaphore.drainPermits();
}
}
@Override
public void wakeup(RubyThread thread, Object data) {
semaphore.release();
}
}
@Deprecated
public void executeBlockingTask(BlockingTask task) throws InterruptedException {
try {
this.currentBlockingTask = task;
enterSleep();
pollThreadEvents();
task.run();
} finally {
exitSleep();
currentBlockingTask = null;
pollThreadEvents();
}
}
public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
return executeTask(context, data, Status.SLEEP, task);
}
public <Data, Return> Return executeTask(ThreadContext context, Data data, Status status, Task<Data, Return> task) throws InterruptedException {
Status oldStatus = this.status.get();
try {
this.unblockArg = data;
this.unblockFunc = task;
pollThreadEvents(context);
this.status.set(status);
return task.run(context, data);
} finally {
this.status.set(oldStatus);
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
}
}
public void enterSleep() {
status.set(Status.SLEEP);
}
public void exitSleep() {
if (getStatus() != Status.ABORTING) {
status.set(Status.RUN);
}
}
private Status getStatus() {
Status status = this.status.get();
if (status != Status.NATIVE) return status;
return nativeStatus();
}
private Status nativeStatus() {
switch (getNativeThread().getState()) {
case NEW:
case RUNNABLE:
default:
return Status.RUN;
case BLOCKED:
case WAITING:
case TIMED_WAITING:
return Status.SLEEP;
case TERMINATED:
return Status.DEAD;
}
}
@JRubyMethod(name = {"kill", "exit", "terminate"})
public IRubyObject kill() {
Ruby runtime = getRuntime();
RubyThread currentThread = runtime.getCurrentContext().getThread();
if (currentThread == runtime.getThreadService().getMainThread()) {
}
status.set(Status.ABORTING);
return genericKill(runtime, currentThread);
}
private IRubyObject genericKill(Ruby runtime, RubyThread currentThread) {
if (currentThread == this) throwThreadKill();
pendingInterruptEnqueue(RubyFixnum.zero(runtime));
interrupt();
return this;
}
private void pendingInterruptEnqueue(IRubyObject v) {
pendingInterruptQueue.add(v);
pendingInterruptQueueChecked = false;
}
public void dieFromFinalizer() {
genericKill(getRuntime(), null);
}
@JRubyMethod
public IRubyObject safe_level() {
throw getRuntime().newNotImplementedError("Thread-specific SAFE levels are not supported");
}
@JRubyMethod(name = "backtrace")
public IRubyObject backtrace(ThreadContext context) {
return backtraceInternal(context, null, null);
}
@JRubyMethod(name = "backtrace")
public IRubyObject backtrace(ThreadContext context, IRubyObject level) {
return backtraceInternal(context, level, null);
}
@JRubyMethod(name = "backtrace")
public IRubyObject backtrace(ThreadContext context, IRubyObject level, IRubyObject length) {
return backtraceInternal(context, level, length);
}
private IRubyObject backtraceInternal(ThreadContext callerContext, IRubyObject level, IRubyObject length) {
ThreadContext context = getContext();
Thread nativeThread = getNativeThread();
if (context == null || nativeThread == null || !nativeThread.isAlive()) return callerContext.nil;
return RubyKernel.withLevelAndLength(
context, level, length, 0,
(ctx, lev, len) -> WALKER.walk(getNativeThread().getStackTrace(), stream -> ctx.createCallerBacktrace(lev, len, stream)));
}
@JRubyMethod
public IRubyObject backtrace_locations(ThreadContext context) {
return backtraceLocationsInternal(context, null, null);
}
@JRubyMethod
public IRubyObject backtrace_locations(ThreadContext context, IRubyObject level) {
return backtraceLocationsInternal(context, level, null);
}
@JRubyMethod
public IRubyObject backtrace_locations(ThreadContext context, IRubyObject level, IRubyObject length) {
return backtraceLocationsInternal(context, level, length);
}
private IRubyObject backtraceLocationsInternal(ThreadContext callerContext, IRubyObject level, IRubyObject length) {
ThreadContext context = getContext();
Thread nativeThread = getNativeThread();
if (context == null || nativeThread == null || !nativeThread.isAlive()) return callerContext.nil;
return RubyKernel.withLevelAndLength(
context, level, length, 0,
(ctx, lev, len) -> WALKER.walk(getNativeThread().getStackTrace(), stream -> ctx.createCallerLocations(lev, len, stream)));
}
@JRubyMethod(name = "report_on_exception=")
public IRubyObject report_on_exception_set(ThreadContext context, IRubyObject state) {
if (state.isNil()) {
reportOnException = state;
} else {
reportOnException = state.isTrue() ? context.tru : context.fals;
}
return this;
}
@JRubyMethod(name = "report_on_exception")
public IRubyObject report_on_exception(ThreadContext context) {
return reportOnException;
}
@JRubyMethod(name = "report_on_exception=", meta = true)
public static IRubyObject report_on_exception_set(ThreadContext context, IRubyObject self, IRubyObject state) {
Ruby runtime = context.runtime;
if (state.isNil()) {
runtime.setReportOnException(state);
} else {
runtime.setReportOnException(runtime.newBoolean(state.isTrue()));
}
return self;
}
@JRubyMethod(name = "report_on_exception", meta = true)
public static IRubyObject report_on_exception(ThreadContext context, IRubyObject self) {
return context.runtime.getReportOnException();
}
public StackTraceElement[] javaBacktrace() {
if (threadImpl instanceof NativeThread) {
return ((NativeThread)threadImpl).getThread().getStackTrace();
}
return EMPTY_STACK_TRACE;
}
private boolean isCurrent() {
return threadImpl.isCurrent();
}
public void exceptionRaised(RaiseException exception) {
exceptionRaised((Throwable) exception);
}
protected void printReportExceptionWarning() {
Ruby runtime = getRuntime();
String name = threadImpl.getReportName();
runtime.getErrorStream().println("warning: thread \"" + name + "\" terminated with exception (report_on_exception is true):");
}
public void exceptionRaised(Throwable throwable) {
if (throwable instanceof Unrescuable) {
Helpers.throwException(throwable);
}
final Ruby runtime = getRuntime();
assert isCurrent();
IRubyObject rubyException;
if (throwable instanceof RaiseException) {
RaiseException exception = (RaiseException) throwable;
rubyException = exception.getException();
} else {
rubyException = JavaUtil.convertJavaToUsableRubyObject(runtime, throwable);
}
boolean report;
if (runtime.getSystemExit().isInstance(rubyException)) {
runtime.getThreadService().getMainThread().raise(rubyException);
} else if ((report = reportOnException.isTrue()) || abortOnException(runtime)) {
if (report) {
printReportExceptionWarning();
runtime.printError(throwable);
}
if (abortOnException(runtime)) {
runtime.getThreadService().getMainThread().raise(rubyException);
}
} else if (runtime.isDebug()) {
runtime.printError(throwable);
}
exitingException = throwable;
}
private boolean abortOnException(Ruby runtime) {
return (runtime.isGlobalAbortOnExceptionEnabled() || abortOnException);
}
public static RubyThread mainThread(IRubyObject receiver) {
return receiver.getRuntime().getThreadService().getMainThread();
}
public boolean select(RubyIO io, int ops) {
return select(io.getChannel(), io.getOpenFile(), ops);
}
public boolean select(RubyIO io, int ops, long timeout) {
return select(io.getChannel(), io.getOpenFile(), ops, timeout);
}
public boolean select(Channel channel, OpenFile fptr, int ops) {
return select(channel, fptr, ops, -1);
}
public boolean select(Channel channel, RubyIO io, int ops) {
return select(channel, io == null ? null : io.getOpenFile(), ops, -1);
}
public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
}
public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
ChannelFD fd = fptr == null ? null : fptr.fd();
if (channel instanceof SelectableChannel && fd != null) {
SelectableChannel selectable = (SelectableChannel)channel;
boolean locked = false;
if (fptr != null) {
locked = fptr.lock();
fptr.unlock();
}
try {
synchronized (selectable.blockingLock()) {
boolean oldBlocking = selectable.isBlocking();
SelectionKey key;
try {
selectable.configureBlocking(false);
if (fptr != null) fptr.addBlockingThread(this);
currentSelector = getRuntime().getSelectorPool().get(selectable.provider());
key = selectable.register(currentSelector, ops);
beforeBlockingCall();
int result;
if (timeout < 0) {
result = currentSelector.select();
} else if (timeout == 0) {
result = currentSelector.selectNow();
} else {
result = currentSelector.select(timeout);
}
pollThreadEvents();
if (result == 1) {
Set<SelectionKey> keySet = currentSelector.selectedKeys();
if (keySet.iterator().next() == key) {
return true;
}
}
return false;
} catch (IOException ioe) {
throw getRuntime().newIOErrorFromException(ioe);
} finally {
try {
if (currentSelector != null) {
getRuntime().getSelectorPool().put(currentSelector);
}
} catch (Exception e) {
} finally {
currentSelector = null;
}
if (fptr != null) fptr.removeBlockingThread(this);
try {
selectable.configureBlocking(oldBlocking);
} catch (Exception e) {
}
afterBlockingCall();
}
}
} finally {
if (fptr != null) {
fptr.lock();
if (locked) fptr.unlock();
}
}
} else {
return true;
}
}
@SuppressWarnings("deprecated")
public synchronized void interrupt() {
setInterrupt();
Selector activeSelector = currentSelector;
if (activeSelector != null) {
activeSelector.wakeup();
}
BlockingIO.Condition iowait = blockingIO;
if (iowait != null) {
iowait.cancel();
}
Unblocker task = this.unblockFunc;
if (task != null) {
task.wakeup(this, unblockArg);
}
{
BlockingTask t = currentBlockingTask;
if (t != null) {
t.wakeup();
}
}
notify();
}
public void setInterrupt() {
while (true) {
int oldFlag = interruptFlag;
if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, oldFlag, oldFlag | PENDING_INTERRUPT_MASK)) {
return;
}
}
}
private volatile BlockingIO.Condition blockingIO = null;
public boolean waitForIO(ThreadContext context, RubyIO io, int ops) {
Channel channel = io.getChannel();
if (!(channel instanceof SelectableChannel)) {
return true;
}
try {
io.addBlockingThread(this);
blockingIO = BlockingIO.newCondition(channel, ops);
boolean ready = blockingIO.await();
pollThreadEvents(context);
return ready;
} catch (IOException ioe) {
throw context.runtime.newRuntimeError("Error with selector: " + ioe);
} catch (InterruptedException ex) {
throw context.runtime.newRuntimeError("Interrupted");
} finally {
blockingIO = null;
io.removeBlockingThread(this);
}
}
public void beforeBlockingCall() {
pollThreadEvents();
enterSleep();
}
public void afterBlockingCall() {
exitSleep();
pollThreadEvents();
}
public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedException {
if ( timeout != null ) {
long delay_ns = (long)(timeout.doubleValue() * 1000000000.0);
long start_ns = System.nanoTime();
if (delay_ns > 0) {
long delay_ms = delay_ns / 1000000;
int delay_ns_remainder = (int)( delay_ns % 1000000 );
executeBlockingTask(new SleepTask(o, delay_ms, delay_ns_remainder));
}
long end_ns = System.nanoTime();
return ( end_ns - start_ns ) <= delay_ns;
} else {
executeBlockingTask(new SleepTask(o, 0, 0));
return true;
}
}
public RubyThreadGroup getThreadGroup() {
return threadGroup;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final RubyThread other = (RubyThread)obj;
if (this.threadImpl != other.threadImpl && (this.threadImpl == ThreadLike.DUMMY || !this.threadImpl.equals(other.threadImpl))) {
return false;
}
return true;
}
@Override
public int hashCode() {
return 97 * (3 + (this.threadImpl != ThreadLike.DUMMY ? this.threadImpl.hashCode() : 0));
}
@Override
public String toString() {
return threadImpl.toString();
}
public void lock(Lock lock) {
assert Thread.currentThread() == getNativeThread();
lock.lock();
heldLocks.add(lock);
}
public void lockInterruptibly(Lock lock) throws InterruptedException {
assert Thread.currentThread() == getNativeThread();
executeTask(getContext(), lock, new RubyThread.Task<Lock, Object>() {
@Override
public Object run(ThreadContext context, Lock reentrantLock) throws InterruptedException {
reentrantLock.lockInterruptibly();
return reentrantLock;
}
@Override
public void wakeup(RubyThread thread, Lock reentrantLock) {
thread.getNativeThread().interrupt();
}
});
heldLocks.add(lock);
}
public boolean tryLock(Lock lock) {
assert Thread.currentThread() == getNativeThread();
boolean locked = lock.tryLock();
if (locked) {
heldLocks.add(lock);
}
return locked;
}
public void unlock(Lock lock) {
assert Thread.currentThread() == getNativeThread();
lock.unlock();
heldLocks.remove(lock);
}
public void unlockAll() {
assert Thread.currentThread() == getNativeThread();
for (Lock lock : heldLocks) {
lock.unlock();
}
}
public void sleep(Lock lock) throws InterruptedException {
sleep(lock, 0);
}
public void sleep(Lock lock, long millis) throws InterruptedException {
assert Thread.currentThread() == getNativeThread();
executeTask(getContext(), lock.newCondition(), Status.NATIVE, new Task<Condition, Object>() {
@Override
public Object run(ThreadContext context, Condition condition) throws InterruptedException {
if (millis == 0) {
condition.await();
} else {
condition.await(millis, TimeUnit.MILLISECONDS);
}
return null;
}
@Override
public void wakeup(RubyThread thread, Condition condition) {
thread.getNativeThread().interrupt();
}
});
}
private String identityString() {
return "0x" + Integer.toHexString(System.identityHashCode(this));
}
private static final String MUTEX_FOR_THREAD_EXCLUSIVE = "MUTEX_FOR_THREAD_EXCLUSIVE";
@Deprecated
@JRubyMethod(meta = true)
public static IRubyObject exclusive(ThreadContext context, IRubyObject recv, Block block) {
recv.callMethod(context, "warn", context.runtime.newString("Thread.exclusive is deprecated, use Thread::Mutex"));
return getMutexForThreadExclusive(context, (RubyClass) recv).synchronize(context, block);
}
private static Mutex getMutexForThreadExclusive(ThreadContext context, RubyClass recv) {
Mutex mutex = (Mutex) recv.getConstantNoConstMissing(MUTEX_FOR_THREAD_EXCLUSIVE, false, false);
if (mutex != null) return mutex;
synchronized (recv) {
mutex = (Mutex) recv.getConstantNoConstMissing(MUTEX_FOR_THREAD_EXCLUSIVE, false, false);
if (mutex == null) {
mutex = Mutex.newInstance(context, context.runtime.getThread().getClass("Mutex"), NULL_ARRAY, Block.NULL_BLOCK);
recv.setConstant(MUTEX_FOR_THREAD_EXCLUSIVE, mutex, true);
}
return mutex;
}
}
@Deprecated
public void internalRaise(IRubyObject[] args) {
ThreadContext context = getRuntime().getCurrentContext();
genericRaise(context, context.getThread(), args);
}
@Deprecated
public void receiveMail(ThreadService.Event event) {
}
@Deprecated
public void checkMail(ThreadContext context) {
}
@Deprecated
private volatile BlockingTask currentBlockingTask;
@Deprecated
public boolean selectForAccept(RubyIO io) {
return select(io, SelectionKey.OP_ACCEPT);
}
@Deprecated
public IRubyObject backtrace20(ThreadContext context, IRubyObject[] args) {
return backtrace(context);
}
@Deprecated
public IRubyObject backtrace(ThreadContext context, IRubyObject[] args) {
switch (args.length) {
case 0:
return backtrace(context);
case 1:
return backtrace(context, args[0]);
case 2:
return backtrace(context, args[0], args[1]);
default:
Arity.checkArgumentCount(context.runtime, args, 0, 2);
return null;
}
}
@Deprecated
public IRubyObject backtrace_locations(ThreadContext context, IRubyObject[] args) {
switch (args.length) {
case 0:
return backtrace_locations(context);
case 1:
return backtrace_locations(context, args[0]);
case 2:
return backtrace_locations(context, args[0], args[1]);
default:
Arity.checkArgumentCount(context.runtime, args, 0, 2);
return null;
}
}
@Deprecated
public static IRubyObject pass(IRubyObject recv) {
Ruby runtime = recv.getRuntime();
return pass(runtime.getCurrentContext(), recv);
}
}