package org.jruby.ext.fiber;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.JumpException;
import org.jruby.exceptions.RaiseException;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ExecutionContext;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;
import org.jruby.ir.runtime.IRBreakJump;
import org.jruby.ir.runtime.IRReturnJump;
import org.jruby.ir.operands.IRException;
public class ThreadFiber extends RubyObject implements ExecutionContext {
private static final Logger LOG = LoggerFactory.getLogger(ThreadFiber.class);
public ThreadFiber(Ruby runtime, RubyClass klass) {
super(runtime, klass);
}
public static void initRootFiber(ThreadContext context, RubyThread currentThread) {
Ruby runtime = context.runtime;
ThreadFiber rootFiber = new ThreadFiber(runtime, runtime.getFiber());
rootFiber.data = new FiberData(new FiberQueue(runtime), currentThread, rootFiber);
rootFiber.thread = currentThread;
context.setRootFiber(rootFiber);
}
@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context, Block block) {
Ruby runtime = context.runtime;
if (!block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without block");
data = new FiberData(new FiberQueue(runtime), context.getFiberCurrentThread(), this);
FiberData currentFiberData = context.getFiber().data;
thread = createThread(runtime, data, currentFiberData.queue, block);
return context.nil;
}
@JRubyMethod(rest = true)
public IRubyObject resume(ThreadContext context, IRubyObject[] values) {
Ruby runtime = context.runtime;
final FiberData data = this.data;
if (data.prev != null || data.transferred) throw runtime.newFiberError("double resume");
if (!alive()) throw runtime.newFiberError("dead fiber called");
FiberData currentFiberData = context.getFiber().data;
if (data == currentFiberData) {
switch (values.length) {
case 0: return context.nil;
case 1: return values[0];
default: return RubyArray.newArrayMayCopy(runtime, values);
}
}
IRubyObject val;
switch (values.length) {
case 0: val = NEVER; break;
case 1: val = values[0]; break;
default: val = RubyArray.newArrayMayCopy(runtime, values);
}
if (data.parent != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");
data.prev = context.getFiber();
try {
return exchangeWithFiber(context, currentFiberData, data, val);
} finally {
data.prev = null;
}
}
private static IRubyObject exchangeWithFiber(ThreadContext context, FiberData currentFiberData, FiberData targetFiberData, IRubyObject val) {
try {
targetFiberData.queue.push(context, new IRubyObject[] {val});
} catch (RaiseException re) {
handleExceptionDuringExchange(context, currentFiberData, targetFiberData, re);
}
while (true) {
try {
IRubyObject result = currentFiberData.queue.pop(context);
return result == NEVER ? context.nil : result;
} catch (RaiseException re) {
handleExceptionDuringExchange(context, currentFiberData, targetFiberData, re);
}
}
}
private static void handleExceptionDuringExchange(ThreadContext context, FiberData currentFiberData, FiberData targetFiberData, RaiseException re) {
if (context.runtime.getLocalJumpError().isInstance(re.getException())) {
throw re;
}
if (currentFiberData.queue.isShutdown()) {
throw re;
}
if (targetFiberData.queue.isShutdown()) {
throw re;
}
final ThreadFiber fiber = targetFiberData.fiber.get();
if ( fiber != null && fiber.alive() ) fiber.thread.raise(re.getException());
else LOG.warn("no fiber thread to raise: {}", re.getException().inspect(context));
}
@JRubyMethod(rest = true)
public IRubyObject __transfer__(ThreadContext context, IRubyObject[] values) {
Ruby runtime = context.runtime;
final FiberData data = this.data;
if (data.prev != null) throw runtime.newFiberError("double resume");
if (!alive()) throw runtime.newFiberError("dead fiber called");
FiberData currentFiberData = context.getFiber().data;
if (data == currentFiberData) {
switch (values.length) {
case 0: return context.nil;
case 1: return values[0];
default: return RubyArray.newArrayMayCopy(runtime, values);
}
}
IRubyObject val;
switch (values.length) {
case 0: val = NEVER; break;
case 1: val = values[0]; break;
default: val = RubyArray.newArrayMayCopy(runtime, values);
}
if (data.parent != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");
if (currentFiberData.prev != null) {
data.prev = currentFiberData.prev;
currentFiberData.prev = null;
currentFiberData.transferred = true;
} else {
data.prev = context.getFiber();
}
try {
return exchangeWithFiber(context, currentFiberData, data, val);
} finally {
data.prev = null;
currentFiberData.transferred = false;
}
}
@JRubyMethod(meta = true)
public static IRubyObject yield(ThreadContext context, IRubyObject recv) {
return yield(context, recv, context.nil);
}
@JRubyMethod(meta = true)
public static IRubyObject yield(ThreadContext context, IRubyObject recv, IRubyObject value) {
Ruby runtime = context.runtime;
FiberData currentFiberData = verifyCurrentFiber(context, runtime);
FiberData prevFiberData = currentFiberData.prev.data;
return exchangeWithFiber(context, currentFiberData, prevFiberData, value);
}
@JRubyMethod(meta = true, rest = true)
public static IRubyObject yield(ThreadContext context, IRubyObject recv, IRubyObject[] value) {
switch (value.length) {
case 0: return yield(context, recv);
case 1: return yield(context, recv, value[0]);
}
Ruby runtime = context.runtime;
FiberData currentFiberData = verifyCurrentFiber(context, runtime);
FiberData prevFiberData = currentFiberData.prev.data;
return exchangeWithFiber(context, currentFiberData, prevFiberData, RubyArray.newArrayNoCopy(runtime, value));
}
private static FiberData verifyCurrentFiber(ThreadContext context, Ruby runtime) {
FiberData currentFiberData = context.getFiber().data;
if (currentFiberData.parent == null) throw runtime.newFiberError("can't yield from root fiber");
if (currentFiberData.prev == null)
throw runtime.newFiberError("BUG: yield occurred with null previous fiber. Report this at http://bugs.jruby.org");
if (currentFiberData.queue.isShutdown()) throw runtime.newFiberError("dead fiber yielded");
return currentFiberData;
}
@JRubyMethod
public IRubyObject __alive__(ThreadContext context) {
return context.runtime.newBoolean(alive());
}
@JRubyMethod(meta = true)
public static IRubyObject __current__(ThreadContext context, IRubyObject recv) {
return context.getFiber();
}
@Override
public Map<Object, IRubyObject> getContextVariables() {
return thread.getContextVariables();
}
final boolean alive() {
RubyThread thread = this.thread;
if (thread == null || !thread.isAlive() || data.queue.isShutdown()) {
return false;
}
return true;
}
static RubyThread createThread(final Ruby runtime, final FiberData data, final FiberQueue queue, final Block block) {
final AtomicReference<RubyThread> fiberThread = new AtomicReference();
boolean retried = false;
while (!retried) {
try {
runtime.getFiberExecutor().execute(() -> {
ThreadContext context = runtime.getCurrentContext();
context.setFiber(data.fiber.get());
context.useRecursionGuardsFrom(data.parent.getContext());
fiberThread.set(context.getThread());
context.getThread().setFiberCurrentThread(data.parent);
Thread thread = Thread.currentThread();
String oldName = thread.getName();
thread.setName("Fiber thread for block at: " + block.getBody().getFile() + ":" + block.getBody().getLine());
try {
IRubyObject init = data.queue.pop(context);
try {
IRubyObject result;
if (init == NEVER) {
result = block.yieldSpecific(context);
} else {
result = block.yieldArray(context, init, null);
}
ThreadFiber tf = data.fiber.get();
if (tf != null) tf.thread = null;
data.prev.data.queue.push(context, new IRubyObject[]{result});
} finally {
data.queue.shutdown();
runtime.getThreadService().unregisterCurrentThread(context);
ThreadFiber tf = data.fiber.get();
if (tf != null) tf.thread = null;
}
} catch (JumpException.FlowControlException fce) {
if (data.prev != null) {
data.prev.thread.raise(fce.buildException(runtime).getException());
}
} catch (IRBreakJump bj) {
if (data.prev != null) {
data.prev.thread.raise(((RaiseException) IRException.BREAK_LocalJumpError.getException(runtime)).getException());
}
} catch (IRReturnJump rj) {
if (data.prev != null) {
data.prev.thread.raise(((RaiseException) IRException.RETURN_LocalJumpError.getException(runtime)).getException());
}
} catch (RaiseException re) {
if (data.prev != null) {
data.prev.thread.raise(re.getException());
}
} catch (Throwable t) {
if (data.prev != null) {
data.prev.thread.raise(JavaUtil.convertJavaToUsableRubyObject(runtime, t));
}
} finally {
thread.setName(oldName);
}
});
break;
} catch (OutOfMemoryError oome) {
String oomeMessage = oome.getMessage();
if (!retried && oomeMessage != null && oomeMessage.contains("unable to create new native thread")) {
System.gc();
retried = true;
} else {
throw oome;
}
}
}
while (fiberThread.get() == null) { Thread.yield(); }
return fiberThread.get();
}
@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject __finalize__(ThreadContext context) {
try {
doFinalize();
} catch (Exception ignore) { return context.fals; }
return context.nil;
}
private void doFinalize() {
FiberData data = this.data;
this.data = null;
if (data != null) {
if (data.parent == null) return;
data.queue.shutdown();
}
RubyThread thread = this.thread;
this.thread = null;
if (thread != null) {
thread.dieFromFinalizer();
thread.interrupt();
data = null; thread = null;
}
}
@Override
protected void finalize() throws Throwable {
try {
doFinalize();
} finally {
super.finalize();
}
}
public FiberData getData() {
return data;
}
public RubyThread getThread() {
return thread;
}
public static class FiberData {
FiberData(FiberQueue queue, RubyThread parent, ThreadFiber fiber) {
this.queue = queue;
this.parent = parent;
this.fiber = new WeakReference<ThreadFiber>(fiber);
}
public ThreadFiber getPrev() {
return prev;
}
final FiberQueue queue;
volatile ThreadFiber prev;
final RubyThread parent;
final WeakReference<ThreadFiber> fiber;
volatile boolean transferred;
}
volatile FiberData data;
volatile RubyThread thread;
}