package reactor.core.scheduler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.search.Search;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.Scannable.Attr;
import reactor.util.Metrics;
final class SchedulerMetricDecorator
implements BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService>,
Disposable {
static final String TAG_SCHEDULER_ID = "reactor.scheduler.id";
static final String METRICS_DECORATOR_KEY = "reactor.metrics.decorator";
final WeakHashMap<Scheduler, String> seenSchedulers = new WeakHashMap<>();
final Map<String, AtomicInteger> schedulerDifferentiator = new HashMap<>();
final WeakHashMap<Scheduler, AtomicInteger> executorDifferentiator = new WeakHashMap<>();
final MeterRegistry registry;
SchedulerMetricDecorator() {
registry = Metrics.MicrometerConfiguration.getRegistry();
}
@Override
public synchronized ScheduledExecutorService apply(Scheduler scheduler, ScheduledExecutorService service) {
String schedulerName = Scannable
.from(scheduler)
.scanOrDefault(Attr.NAME, scheduler.getClass().getName());
String schedulerId =
seenSchedulers.computeIfAbsent(scheduler, s -> {
int schedulerDifferentiator = this.schedulerDifferentiator
.computeIfAbsent(schedulerName, k -> new AtomicInteger(0))
.getAndIncrement();
return (schedulerDifferentiator == 0) ? schedulerName
: schedulerName + "#" + schedulerDifferentiator;
});
String executorId = schedulerId + "-" +
executorDifferentiator.computeIfAbsent(scheduler, key -> new AtomicInteger(0))
.getAndIncrement();
Tags tags = Tags.of(TAG_SCHEDULER_ID, schedulerId);
class MetricsRemovingScheduledExecutorService extends DelegatingScheduledExecutorService {
MetricsRemovingScheduledExecutorService() {
super(ExecutorServiceMetrics.monitor(registry, service, executorId, tags));
}
@Override
public List<Runnable> shutdownNow() {
removeMetrics();
return super.shutdownNow();
}
@Override
public void shutdown() {
removeMetrics();
super.shutdown();
}
void removeMetrics() {
Search.in(registry)
.tag("name", executorId)
.meters()
.forEach(registry::remove);
}
}
return new MetricsRemovingScheduledExecutorService();
}
@Override
public void dispose() {
Search.in(registry)
.tagKeys(TAG_SCHEDULER_ID)
.meters()
.forEach(registry::remove);
this.seenSchedulers.clear();
this.schedulerDifferentiator.clear();
this.executorDifferentiator.clear();
}
}