package com.oracle.svm.core.thread;
import static com.oracle.svm.core.SubstrateOptions.MultiThreaded;
import static com.oracle.svm.core.thread.ThreadingSupportImpl.Options.SupportRecurringCallback;
import java.util.concurrent.TimeUnit;
import org.graalvm.compiler.api.replacements.Fold;
import org.graalvm.compiler.options.Option;
import org.graalvm.nativeimage.CurrentIsolate;
import org.graalvm.nativeimage.ImageSingletons;
import org.graalvm.nativeimage.IsolateThread;
import org.graalvm.nativeimage.Threading.RecurringCallback;
import org.graalvm.nativeimage.Threading.RecurringCallbackAccess;
import org.graalvm.nativeimage.hosted.Feature;
import org.graalvm.nativeimage.impl.ThreadingSupport;
import com.oracle.svm.core.annotate.AutomaticFeature;
import com.oracle.svm.core.annotate.RestrictHeapAccess;
import com.oracle.svm.core.annotate.Uninterruptible;
import com.oracle.svm.core.option.HostedOptionKey;
import com.oracle.svm.core.option.SubstrateOptionsParser;
import com.oracle.svm.core.thread.Safepoint.SafepointException;
import com.oracle.svm.core.thread.VMThreads.ActionOnTransitionToJavaSupport;
import com.oracle.svm.core.thread.VMThreads.StatusSupport;
import com.oracle.svm.core.threadlocal.FastThreadLocalFactory;
import com.oracle.svm.core.threadlocal.FastThreadLocalInt;
import com.oracle.svm.core.threadlocal.FastThreadLocalObject;
import com.oracle.svm.core.util.UserError;
public class ThreadingSupportImpl implements ThreadingSupport {
public static class Options {
@Option(help = "Support a per-thread timer that is called at a specific interval.")
public static final HostedOptionKey<Boolean> SupportRecurringCallback = new HostedOptionKey<>(true);
@Option(help = "Test whether a thread's recurring callback is pending on each transition from native code to Java.")
public static final HostedOptionKey<Boolean> CheckRecurringCallbackOnNativeToJavaTransition = new HostedOptionKey<>(false);
}
static void initialize() {
ImageSingletons.add(ThreadingSupport.class, new ThreadingSupportImpl());
}
private static class RecurringCallbackTimer {
private static final RecurringCallbackAccess CALLBACK_ACCESS = new RecurringCallbackAccess() {
@Override
public void throwException(Throwable t) {
throw new SafepointException(t);
}
};
private static final double EWMA_LAMBDA = 0.3;
private static final double TARGET_INTERVAL_FLEXIBILITY = 0.95;
private static final int INITIAL_CHECKS = 100;
private static final long MINIMUM_INTERVAL_NANOS = 1_000;
private final long targetIntervalNanos;
private final long flexibleTargetIntervalNanos;
private final RecurringCallback callback;
private int requestedChecks;
private double ewmaChecksPerNano;
private long lastCapture;
private long lastCallbackExecution;
private volatile boolean isExecuting = false;
RecurringCallbackTimer(long targetIntervalNanos, RecurringCallback callback) {
this.targetIntervalNanos = Math.max(MINIMUM_INTERVAL_NANOS, targetIntervalNanos);
this.flexibleTargetIntervalNanos = (long) (targetIntervalNanos * TARGET_INTERVAL_FLEXIBILITY);
this.callback = callback;
long now = System.nanoTime();
this.lastCapture = now;
this.lastCallbackExecution = now;
this.requestedChecks = INITIAL_CHECKS;
}
@Uninterruptible(reason = "Must not contain safepoint checks.")
public void evaluate() {
updateStatistics();
try {
executeCallback();
} finally {
updateSafepointRequested();
}
}
@Uninterruptible(reason = "Must be uninterruptible to avoid races with the safepoint code.")
public void updateStatistics() {
long now = System.nanoTime();
long elapsedNanos = now - lastCapture;
int skippedChecks = getSkippedChecks(CurrentIsolate.getCurrentThread());
int executedChecks = requestedChecks - skippedChecks;
assert executedChecks >= 0;
if (elapsedNanos > 0 && executedChecks > 0) {
double checksPerNano = executedChecks / (double) elapsedNanos;
if (ewmaChecksPerNano == 0) {
ewmaChecksPerNano = checksPerNano;
} else {
ewmaChecksPerNano = EWMA_LAMBDA * checksPerNano + (1 - EWMA_LAMBDA) * ewmaChecksPerNano;
}
lastCapture = now;
}
}
@Uninterruptible(reason = "Must be uninterruptible to avoid races with the safepoint code.")
private static int getSkippedChecks(IsolateThread thread) {
int rawValue = Safepoint.getSafepointRequested(thread);
return rawValue >= 0 ? rawValue : -rawValue;
}
@Uninterruptible(reason = "Called by uninterruptible code.")
private void executeCallback() {
if (isCallbackDisabled()) {
return;
}
isExecuting = true;
try {
if (System.nanoTime() >= lastCallbackExecution + flexibleTargetIntervalNanos) {
setSafepointRequested(Safepoint.THREAD_REQUEST_RESET);
try {
invokeCallback();
} finally {
lastCallbackExecution = System.nanoTime();
updateStatistics();
}
}
} finally {
isExecuting = false;
}
}
@Uninterruptible(reason = "Called by uninterruptible code.")
private void updateSafepointRequested() {
long nextDeadline = lastCallbackExecution + targetIntervalNanos;
long remainingNanos = nextDeadline - System.nanoTime();
if (remainingNanos < 0 && isCallbackDisabled()) {
setSafepointRequested(Safepoint.THREAD_REQUEST_RESET);
} else {
remainingNanos = (remainingNanos < MINIMUM_INTERVAL_NANOS) ? MINIMUM_INTERVAL_NANOS : remainingNanos;
double checks = ewmaChecksPerNano * remainingNanos;
setSafepointRequested(checks > Safepoint.THREAD_REQUEST_RESET ? Safepoint.THREAD_REQUEST_RESET : ((checks < 1) ? 1 : (int) checks));
}
}
@Uninterruptible(reason = "Called by uninterruptible code.")
public void setSafepointRequested(int value) {
requestedChecks = value;
Safepoint.setSafepointRequested(value);
}
@Uninterruptible(reason = "Called by uninterruptible code.")
private boolean isCallbackDisabled() {
return isExecuting || isRecurringCallbackPaused();
}
@Uninterruptible(reason = "Required by caller, but does not apply to callee.", calleeMustBe = false)
@RestrictHeapAccess(reason = "Callee may allocate", access = RestrictHeapAccess.Access.UNRESTRICTED, overridesCallers = true)
private void invokeCallback() {
try {
callback.run(CALLBACK_ACCESS);
} catch (SafepointException se) {
throw se;
} catch (Throwable t) {
}
}
}
private static final FastThreadLocalObject<RecurringCallbackTimer> activeTimer = FastThreadLocalFactory.createObject(RecurringCallbackTimer.class);
private static final FastThreadLocalInt currentPauseDepth = FastThreadLocalFactory.createInt();
private static final String enableSupportOption = SubstrateOptionsParser.commandArgument(SupportRecurringCallback, "+");
@Override
public void registerRecurringCallback(long interval, TimeUnit unit, RecurringCallback callback) {
if (callback != null) {
UserError.guarantee(SupportRecurringCallback.getValue(), "Recurring callbacks must be enabled during image build with option %s", enableSupportOption);
UserError.guarantee(MultiThreaded.getValue(), "Recurring callbacks are only supported in multi-threaded mode.");
long intervalNanos = unit.toNanos(interval);
if (intervalNanos < 1) {
throw new IllegalArgumentException("intervalNanos");
}
RecurringCallbackTimer timer = new RecurringCallbackTimer(intervalNanos, callback);
activeTimer.set(timer);
Safepoint.setSafepointRequested(timer.requestedChecks);
} else {
activeTimer.set(null);
}
}
@Uninterruptible(reason = "Must not contain safepoint checks.")
static void onSafepointCheckSlowpath() {
assert StatusSupport.isStatusJava() : "must only be executed when the thread is in Java state";
RecurringCallbackTimer timer = isRecurringCallbackSupported() ? activeTimer.get() : null;
if (timer != null) {
timer.evaluate();
} else {
Safepoint.setSafepointRequested(Safepoint.THREAD_REQUEST_RESET);
}
}
@Uninterruptible(reason = "Called by uninterruptible code.", mayBeInlined = true)
static boolean isRecurringCallbackRegistered(IsolateThread thread) {
return isRecurringCallbackSupported() && activeTimer.get(thread) != null;
}
static boolean needsNativeToJavaSlowpath() {
return ActionOnTransitionToJavaSupport.isActionPending() || (isRecurringCallbackSupported() && Options.CheckRecurringCallbackOnNativeToJavaTransition.getValue() && activeTimer.get() != null);
}
@Uninterruptible(reason = "Must not contain safepoint checks.")
public static void pauseRecurringCallback(@SuppressWarnings("unused") String reason) {
if (!isRecurringCallbackSupported()) {
return;
}
assert currentPauseDepth.get() >= 0;
currentPauseDepth.set(currentPauseDepth.get() + 1);
}
@Uninterruptible(reason = "Must not contain safepoint checks.")
public static void resumeRecurringCallbackAtNextSafepoint() {
if (resumeCallbackExecution()) {
RecurringCallbackTimer timer = activeTimer.get();
assert timer != null;
timer.updateStatistics();
timer.setSafepointRequested(1);
}
}
public static void resumeRecurringCallback() {
if (resumeCallbackExecution()) {
try {
onSafepointCheckSlowpath();
} catch (SafepointException e) {
throwUnchecked(e.inner);
}
}
}
@Uninterruptible(reason = "Called from uninterruptible code.", mayBeInlined = true)
private static boolean resumeCallbackExecution() {
if (!isRecurringCallbackSupported()) {
return false;
}
assert currentPauseDepth.get() > 0;
currentPauseDepth.set(currentPauseDepth.get() - 1);
return !isRecurringCallbackPaused() && isRecurringCallbackRegistered(CurrentIsolate.getCurrentThread());
}
@Uninterruptible(reason = "Called from uninterruptible code.", mayBeInlined = true)
public static boolean isRecurringCallbackPaused() {
if (!isRecurringCallbackSupported()) {
return false;
}
return currentPauseDepth.get() != 0;
}
@Fold
public static boolean isRecurringCallbackSupported() {
return SupportRecurringCallback.getValue() && MultiThreaded.getValue();
}
@Uninterruptible(reason = "Called by uninterruptible code.")
@SuppressWarnings("unchecked")
private static <T extends Throwable> void throwUnchecked(Throwable exception) throws T {
throw (T) exception;
}
}
@AutomaticFeature
class ThreadingSupportFeature implements Feature {
@Override
public void afterRegistration(AfterRegistrationAccess access) {
ThreadingSupportImpl.initialize();
}
}