package reactor.core.publisher;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.Metrics.MicrometerConfiguration;
import reactor.util.function.Tuple2;
final class FluxMetrics<T> extends InternalFluxOperator<T, T> {
final String name;
final Tags tags;
final MeterRegistry registryCandidate;
FluxMetrics(Flux<? extends T> flux) {
super(flux);
this.name = resolveName(flux);
this.tags = resolveTags(flux, DEFAULT_TAGS_FLUX);
this.registryCandidate = MicrometerConfiguration.getRegistry();
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new MetricsSubscriber<>(actual, registryCandidate, Clock.SYSTEM, this.name, this.tags);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
static class MetricsSubscriber<T> implements InnerOperator<T, T> {
final CoreSubscriber<? super T> actual;
final Clock clock;
final String sequenceName;
final Tags commonTags;
final MeterRegistry registry;
final DistributionSummary requestedCounter;
final Timer onNextIntervalTimer;
Timer.Sample subscribeToTerminateSample;
long lastNextEventNanos = -1L;
boolean done;
Subscription s;
MetricsSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry,
Clock clock,
String sequenceName,
Tags commonTags) {
this.actual = actual;
this.clock = clock;
this.sequenceName = sequenceName;
this.commonTags = commonTags;
this.registry = registry;
this.onNextIntervalTimer = Timer.builder(sequenceName + METER_ON_NEXT_DELAY)
.tags(commonTags)
.description(
"Measures delays between onNext signals (or between onSubscribe and first onNext)")
.register(registry);
if (!REACTOR_DEFAULT_NAME.equals(sequenceName)) {
this.requestedCounter = DistributionSummary.builder(sequenceName + METER_REQUESTED)
.tags(commonTags)
.description(
"Counts the amount requested to a named Flux by all subscribers, until at least one requests an unbounded amount")
.register(registry);
}
else {
requestedCounter = null;
}
}
@Override
final public CoreSubscriber<? super T> actual() {
return actual;
}
@Override
final public void cancel() {
recordCancel(sequenceName, commonTags, registry, subscribeToTerminateSample);
s.cancel();
}
@Override
final public void onComplete() {
if (done) {
return;
}
done = true;
if (this.onNextIntervalTimer.count() == 0) {
recordOnCompleteEmpty(sequenceName, commonTags, registry, subscribeToTerminateSample);
} else {
recordOnComplete(sequenceName, commonTags, registry, subscribeToTerminateSample);
}
actual.onComplete();
}
@Override
final public void onError(Throwable e) {
if (done) {
recordMalformed(sequenceName, commonTags, registry);
Operators.onErrorDropped(e, actual.currentContext());
return;
}
done = true;
recordOnError(sequenceName, commonTags, registry, subscribeToTerminateSample, e);
actual.onError(e);
}
@Override
public void onNext(T t) {
if (done) {
recordMalformed(sequenceName, commonTags, registry);
Operators.onNextDropped(t, actual.currentContext());
return;
}
long last = this.lastNextEventNanos;
this.lastNextEventNanos = clock.monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
actual.onNext(t);
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
recordOnSubscribe(sequenceName, commonTags, registry);
this.subscribeToTerminateSample = Timer.start(clock);
this.lastNextEventNanos = clock.monotonicTime();
this.s = s;
actual.onSubscribe(this);
}
}
@Override
final public void request(long l) {
if (Operators.validate(l)) {
if (requestedCounter != null) {
requestedCounter.record(l);
}
s.request(l);
}
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return InnerOperator.super.scanUnsafe(key);
}
}
static final String REACTOR_DEFAULT_NAME = "reactor";
static final String METER_MALFORMED = ".malformed.source";
static final String METER_SUBSCRIBED = ".subscribed";
static final String METER_FLOW_DURATION = ".flow.duration";
static final String METER_ON_NEXT_DELAY = ".onNext.delay";
static final String METER_REQUESTED = ".requested";
static final String TAG_KEY_EXCEPTION = "exception";
static final Tags DEFAULT_TAGS_FLUX = Tags.of("type", "Flux");
static final Tags DEFAULT_TAGS_MONO = Tags.of("type", "Mono");
static final Tag TAG_ON_ERROR = Tag.of("status", "error");
static final Tags TAG_ON_COMPLETE = Tags.of("status", "completed", TAG_KEY_EXCEPTION, "");
static final Tags TAG_ON_COMPLETE_EMPTY = Tags.of("status", "completedEmpty", TAG_KEY_EXCEPTION, "");
static final Tags TAG_CANCEL = Tags.of("status", "cancelled", TAG_KEY_EXCEPTION, "");
static final Logger log = Loggers.getLogger(FluxMetrics.class);
static final BiFunction<Tags, Tuple2<String, String>, Tags> TAG_ACCUMULATOR =
(prev, tuple) -> prev.and(Tag.of(tuple.getT1(), tuple.getT2()));
static final BinaryOperator<Tags> TAG_COMBINER = Tags::and;
static String resolveName(Publisher<?> source) {
Scannable scannable = Scannable.from(source);
if (scannable.isScanAvailable()) {
String nameOrDefault = scannable.name();
if (scannable.stepName()
.equals(nameOrDefault)) {
return REACTOR_DEFAULT_NAME;
}
else {
return nameOrDefault;
}
}
else {
log.warn("Attempting to activate metrics but the upstream is not Scannable. " + "You might want to use `name()` (and optionally `tags()`) right before `metrics()`");
return REACTOR_DEFAULT_NAME;
}
}
static Tags resolveTags(Publisher<?> source, Tags tags) {
Scannable scannable = Scannable.from(source);
if (scannable.isScanAvailable()) {
return scannable.tags()
.reduce(tags, TAG_ACCUMULATOR, TAG_COMBINER);
}
return tags;
}
static void recordCancel(String name, Tags commonTags, MeterRegistry registry, Timer.Sample flowDuration) {
Timer timer = Timer.builder(name + METER_FLOW_DURATION)
.tags(commonTags.and(TAG_CANCEL))
.description(
"Times the duration elapsed between a subscription and the cancellation of the sequence")
.register(registry);
flowDuration.stop(timer);
}
static void recordMalformed(String name, Tags commonTags, MeterRegistry registry) {
registry.counter(name + FluxMetrics.METER_MALFORMED, commonTags)
.increment();
}
static void recordOnError(String name, Tags commonTags, MeterRegistry registry, Timer.Sample flowDuration, Throwable e) {
Timer timer = Timer.builder(name + METER_FLOW_DURATION)
.tags(commonTags.and(TAG_ON_ERROR))
.tag(TAG_KEY_EXCEPTION,
e.getClass()
.getName())
.description(
"Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag.")
.register(registry);
flowDuration.stop(timer);
}
static void recordOnComplete(String name, Tags commonTags, MeterRegistry registry, Timer.Sample flowDuration) {
Timer timer = Timer.builder(name + METER_FLOW_DURATION)
.tags(commonTags.and(TAG_ON_COMPLETE))
.description(
"Times the duration elapsed between a subscription and the onComplete termination of a sequence that did emit some elements")
.register(registry);
flowDuration.stop(timer);
}
static void recordOnCompleteEmpty(String name, Tags commonTags, MeterRegistry registry, Timer.Sample flowDuration) {
Timer timer = Timer.builder(name + METER_FLOW_DURATION)
.tags(commonTags.and(TAG_ON_COMPLETE_EMPTY))
.description(
"Times the duration elapsed between a subscription and the onComplete termination of a sequence that didn't emit any element")
.register(registry);
flowDuration.stop(timer);
}
static void recordOnSubscribe(String name, Tags commonTags, MeterRegistry registry) {
Counter.builder(name + METER_SUBSCRIBED)
.tags(commonTags)
.description("Counts how many Reactor sequences have been subscribed to")
.register(registry)
.increment();
}
}