package com.oracle.truffle.api.instrumentation;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import com.oracle.truffle.api.source.Source;
final class SourceInstrumentationHandler {
private final InstrumentationHandler.CopyOnWriteList<EventBinding.Source<?>> bindings = new InstrumentationHandler.CopyOnWriteList<>(new EventBinding.Source<?>[0]);
private final WeakHashMap<Source, Void> sources = new WeakHashMap<>();
private final InstrumentationHandler.WeakAsyncList<Source> sourcesList = new InstrumentationHandler.WeakAsyncList<>(16);
private final AtomicBoolean sourcesInitialized = new AtomicBoolean();
private final ReentrantReadWriteLock bindingsLock = new ReentrantReadWriteLock();
private final BiConsumer<EventBinding.Source<?>[], Source> notificationConsumer;
private SourcesNotificationQueue notifications = new SourcesNotificationQueue();
SourceInstrumentationHandler(BiConsumer<EventBinding.Source<?>[], Source> notificationConsumer) {
this.notificationConsumer = notificationConsumer;
}
private SourcesNotificationQueue addInitializeSourcesNotification() {
assert bindingsLock.getWriteHoldCount() > 0;
assert bindings.size() == 1;
notifications.enqueue(new InitializeSourcesNotification());
assert notifications.shouldProcess() : "Thread that added InitializeSourcesNotification is not the one to process the notification queue.";
assert notifications.isSourcesInitializationRequired();
return notifications;
}
private SourcesNotificationQueue addAllSourcesNotification(EventBinding.Source<?> binding) {
assert bindingsLock.getWriteHoldCount() > 0;
notifications.enqueue(new AllSourcesNotification(new EventBinding.Source<?>[]{binding}));
if (notifications.shouldProcess()) {
assert (!notifications.isSourcesInitializationRequired() && bindings.size() > 1) || (notifications.isSourcesInitializationRequired() && bindings.size() == 1);
return notifications;
} else {
return null;
}
}
private SourcesNotificationQueue addNotification(Map<Source, Void> collectedSources, EventBinding.Source<?>[] bindingsToNotify) {
assert bindingsLock.getReadHoldCount() > 0;
assert Thread.holdsLock(sources);
assert !bindings.isEmpty();
notifications.enqueue(new NewSourcesNotification(bindingsToNotify, collectedSources.keySet()));
if (notifications.shouldProcess()) {
assert !notifications.isSourcesInitializationRequired();
return notifications;
} else {
return null;
}
}
void setInitialized() {
sourcesInitialized.set(true);
}
boolean hasBindings() {
return !bindings.isEmpty();
}
EventBinding.Source<?>[] getBindingsArray() {
return bindings.getArray();
}
void clearForDisposedBinding(EventBinding.Source<?> disposedBinding) {
Lock lock = bindingsLock.writeLock();
lock.lock();
try {
bindings.remove(disposedBinding);
if (bindings.isEmpty()) {
clearAllInternal();
}
} finally {
lock.unlock();
}
}
void clearForDisposedInstrumenter(InstrumentationHandler.AbstractInstrumenter disposedInstrumenter) {
Lock lock = bindingsLock.writeLock();
lock.lock();
try {
Collection<EventBinding<?>> disposedSourceLoadedBindings = InstrumentationHandler.filterBindingsForInstrumenter(bindings, disposedInstrumenter);
InstrumentationHandler.disposeBindingsBulk(disposedSourceLoadedBindings);
bindings.removeAll(disposedSourceLoadedBindings);
if (bindings.isEmpty()) {
clearAllInternal();
}
} finally {
lock.unlock();
}
}
void clearAll() {
Lock lock = bindingsLock.writeLock();
lock.lock();
try {
clearAllInternal();
} finally {
lock.unlock();
}
}
private void clearAllInternal() {
assert bindingsLock.getWriteHoldCount() > 0;
bindings.clear();
sources.clear();
sourcesList.clear();
sourcesInitialized.set(false);
notifications.clear();
notifications.invalidate();
notifications = new SourcesNotificationQueue();
}
SourcesNotificationQueue addBinding(EventBinding.Source<?> binding, boolean notify) {
SourcesNotificationQueue notificationsToProcess = null;
Lock lock = bindingsLock.writeLock();
lock.lock();
try {
boolean initializeSources = false;
if (bindings.isEmpty()) {
initializeSources = true;
}
bindings.add(binding);
if (notify) {
notificationsToProcess = addAllSourcesNotification(binding);
} else if (initializeSources) {
notificationsToProcess = addInitializeSourcesNotification();
}
} finally {
lock.unlock();
}
return notificationsToProcess;
}
SourcesNotificationQueue addNewSources(Map<Source, Void> newSources, boolean notify) {
SourcesNotificationQueue notificationsToProcess = null;
Lock lock = bindingsLock.readLock();
lock.lock();
try {
if (!bindings.isEmpty()) {
synchronized (sources) {
if (notify) {
notificationsToProcess = addNotification(newSources, bindings.getArray());
} else {
for (Source src : newSources.keySet()) {
if (!sources.containsKey(src)) {
sources.put(src, null);
sourcesList.add(src);
}
}
}
}
}
} finally {
lock.unlock();
}
return notificationsToProcess;
}
final class SourcesNotificationQueue {
private boolean sourcesInitializationRequired;
private boolean valid = true;
private final Deque<SourcesNotification> notificationQueue = new ArrayDeque<>();
SourcesNotificationQueue() {
this.sourcesInitializationRequired = true;
}
private boolean shouldProcess() {
assert bindingsLock.getWriteHoldCount() > 0 || (bindingsLock.getReadHoldCount() > 0 && Thread.holdsLock(sources));
return notificationQueue.size() == 1;
}
private void enqueue(SourcesNotification notification) {
assert bindingsLock.getWriteHoldCount() > 0 || (bindingsLock.getReadHoldCount() > 0 && Thread.holdsLock(sources));
notificationQueue.add(notification);
}
private SourcesNotification resolveFirst() {
SourcesNotification notification = null;
Lock lock = bindingsLock.readLock();
lock.lock();
try {
if (valid) {
synchronized (sources) {
notification = notificationQueue.peekFirst();
sourcesInitializationRequired = false;
if (notification != null) {
notification.resolveSources();
}
}
}
} finally {
lock.unlock();
}
return notification;
}
private boolean removeFirst() {
boolean queueNotEmpty = false;
Lock lock = bindingsLock.readLock();
lock.lock();
try {
if (valid) {
synchronized (sources) {
notificationQueue.removeFirst();
if (!notificationQueue.isEmpty()) {
queueNotEmpty = true;
}
}
}
} finally {
lock.unlock();
}
return queueNotEmpty;
}
private void clear() {
assert bindingsLock.getWriteHoldCount() > 0;
notificationQueue.clear();
}
private void invalidate() {
assert bindingsLock.getWriteHoldCount() > 0;
valid = false;
}
void process() {
do {
SourcesNotification notification = resolveFirst();
if (notification != null) {
try {
notification.runNotifications();
} catch (Throwable t) {
Lock writeLock = bindingsLock.writeLock();
writeLock.lock();
try {
clear();
} finally {
writeLock.unlock();
}
throw t;
}
}
} while (removeFirst());
}
boolean isSourcesInitializationRequired() {
return sourcesInitializationRequired;
}
}
private abstract static class SourcesNotification {
protected final AtomicBoolean sourcesResolved = new AtomicBoolean();
protected final AtomicBoolean notificationsRun = new AtomicBoolean();
protected abstract void resolveSources();
protected abstract void runNotifications();
}
private class InitializeSourcesNotification extends SourcesNotification {
@Override
protected void runNotifications() {
boolean firstCall = notificationsRun.compareAndSet(false, true);
assert firstCall : "runNotifications called more than once.";
}
@Override
protected void resolveSources() {
assert bindingsLock.getReadHoldCount() > 0;
assert Thread.holdsLock(sources);
assert sourcesInitialized.get();
boolean firstCall = sourcesResolved.compareAndSet(false, true);
assert firstCall : "resolveSources called more than once.";
}
}
private class AllSourcesNotification extends InitializeSourcesNotification {
protected final EventBinding.Source<?>[] bindingsToNotify;
protected Collection<Source> sourcesForNotification;
AllSourcesNotification(EventBinding.Source<?>[] bindingsToNotify) {
this.bindingsToNotify = bindingsToNotify;
}
@Override
protected void resolveSources() {
assert bindingsLock.getReadHoldCount() > 0;
assert Thread.holdsLock(sources);
assert sourcesInitialized.get();
boolean firstCall = sourcesResolved.compareAndSet(false, true);
assert firstCall : "resolveSources called more than once.";
sourcesForNotification = new ArrayList<>(sourcesList.getNextInsertionIndex());
for (Source source : sourcesList) {
sourcesForNotification.add(source);
}
}
@Override
protected final void runNotifications() {
assert bindingsLock.getReadHoldCount() + bindingsLock.getWriteHoldCount() == 0;
boolean firstCall = notificationsRun.compareAndSet(false, true);
assert firstCall : "runNotifications called more than once.";
if (sourcesForNotification != null) {
for (Source src : sourcesForNotification) {
notificationConsumer.accept(bindingsToNotify, src);
}
}
}
}
private class NewSourcesNotification extends AllSourcesNotification {
protected final Collection<Source> collectedSources;
NewSourcesNotification(EventBinding.Source<?>[] bindingsToNotify, Collection<Source> collectedSources) {
super(bindingsToNotify);
this.collectedSources = collectedSources;
}
@Override
protected void resolveSources() {
assert bindingsLock.getReadHoldCount() > 0;
assert Thread.holdsLock(sources);
assert sourcesInitialized.get();
boolean firstCall = sourcesResolved.compareAndSet(false, true);
assert firstCall : "resolveSources called more than once.";
sourcesForNotification = new ArrayList<>();
for (Source src : collectedSources) {
if (!sources.containsKey(src)) {
sources.put(src, null);
sourcesList.add(src);
sourcesForNotification.add(src);
}
}
}
}
}