package reactor.core.publisher;
import java.util.ArrayList;
import java.util.List;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
final class MonoMetrics<T> extends MonoOperator<T, T> {
final String name;
final List<Tag> tags;
@Nullable
final MeterRegistry meterRegistry;
MonoMetrics(Mono<? extends T> mono) {
this(mono, null);
}
MonoMetrics(Mono<? extends T> mono, @Nullable MeterRegistry meterRegistry) {
super(mono);
Tuple2<String, List<Tag>> nameAndTags = FluxMetrics.resolveNameAndTags(mono);
this.name = nameAndTags.getT1();
this.tags = nameAndTags.getT2();
this.meterRegistry = meterRegistry;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
MeterRegistry registry = Metrics.globalRegistry;
if (meterRegistry != null) {
registry = meterRegistry;
}
source.subscribe(new MicrometerMonoMetricsSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags));
}
static class MicrometerMonoMetricsSubscriber<T> implements InnerOperator<T,T> {
final CoreSubscriber<? super T> actual;
final MeterRegistry registry;
final Clock clock;
final Counter malformedSourceCounter;
final Counter subscribedCounter;
Timer.Sample subscribeToTerminateSample;
boolean done;
@Nullable
Fuseable.QueueSubscription<T> qs;
Subscription s;
final Timer subscribeToCompleteTimer;
final Timer.Builder subscribeToErrorTimerBuilder;
final Timer subscribeToCancelTimer;
MicrometerMonoMetricsSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry,
Clock clock,
String sequenceName,
List<Tag> sequenceTags) {
this.actual = actual;
this.registry = registry;
this.clock = clock;
List<Tag> commonTags = new ArrayList<>();
commonTags.add(Tag.of(FluxMetrics.TAG_SEQUENCE_NAME, sequenceName));
commonTags.add(Tag.of(FluxMetrics.TAG_SEQUENCE_TYPE, FluxMetrics.TAGVALUE_MONO));
commonTags.addAll(sequenceTags);
this.subscribeToCompleteTimer = Timer
.builder(FluxMetrics.METER_FLOW_DURATION)
.tags(commonTags)
.tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_COMPLETE)
.description("Times the duration elapsed between a subscription and the onComplete termination of the sequence")
.register(registry);
this.subscribeToCancelTimer = Timer
.builder(FluxMetrics.METER_FLOW_DURATION)
.tags(commonTags)
.tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_CANCEL)
.description("Times the duration elapsed between a subscription and the cancellation of the sequence")
.register(registry);
this.subscribeToErrorTimerBuilder = Timer
.builder(FluxMetrics.METER_FLOW_DURATION)
.tags(commonTags)
.tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_ERROR)
.description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag.");
this.subscribedCounter = Counter
.builder(FluxMetrics.METER_SUBSCRIBED)
.tags(commonTags)
.baseUnit("subscribers")
.description("Counts how many Reactor sequences have been subscribed to")
.register(registry);
this.malformedSourceCounter = registry.counter(FluxMetrics.METER_MALFORMED, commonTags);
}
@Override
public CoreSubscriber<? super T> actual() {
return actual;
}
@Override
public void onNext(T t) {
if (done) {
this.malformedSourceCounter.increment();
Operators.onNextDropped(t, actual.currentContext());
return;
}
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
if (done) {
this.malformedSourceCounter.increment();
Operators.onErrorDropped(e, actual.currentContext());
return;
}
done = true;
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
this.subscribeToTerminateSample.stop(timer);
actual.onError(e);
}
@Override
public void onComplete() {
if (done) {
this.malformedSourceCounter.increment();
return;
}
done = true;
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
actual.onComplete();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.subscribedCounter.increment();
this.subscribeToTerminateSample = Timer.start(registry);
if (s instanceof Fuseable.QueueSubscription) {
this.qs = (Fuseable.QueueSubscription<T>) s;
}
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void request(long l) {
if (Operators.validate(l)) {
s.request(l);
}
}
@Override
public void cancel() {
this.subscribeToTerminateSample.stop(subscribeToCancelTimer);
s.cancel();
}
}
static final class MicrometerMonoMetricsFuseableSubscriber<T> extends MicrometerMonoMetricsSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {
private int mode;
MicrometerMonoMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
super(actual, registry, clock, sequenceName, sequenceTags);
}
@Override
public void onNext(T t) {
super.onNext(t);
}
@Override
public int requestFusion(int mode) {
if (qs != null) {
this.mode = qs.requestFusion(mode);
return this.mode;
}
return Fuseable.NONE;
}
@Override
@Nullable
public T poll() {
if (qs == null) {
return null;
}
try {
T v = qs.poll();
if (v == null && this.mode == SYNC) {
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
}
return v;
} catch (Throwable e) {
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
this.subscribeToTerminateSample.stop(timer);
throw e;
}
}
@Override
public void clear() {
if (qs != null) {
qs.clear();
}
}
@Override
public boolean isEmpty() {
return qs == null || qs.isEmpty();
}
@Override
public int size() {
return qs == null ? 0 : qs.size();
}
}
}