package reactor.core.publisher;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
final class FluxMetrics<T> extends FluxOperator<T, T> {
private static final Logger LOGGER = Loggers.getLogger(FluxMetrics.class);
private static final boolean isMicrometerAvailable;
static {
boolean micrometer;
try {
io.micrometer.core.instrument.Metrics.globalRegistry.getRegistries();
micrometer = true;
}
catch (Throwable t) {
micrometer = false;
}
isMicrometerAvailable = micrometer;
}
static boolean isMicrometerAvailable() {
return isMicrometerAvailable;
}
static final String REACTOR_DEFAULT_NAME = "reactor";
static final String METER_MALFORMED = "reactor.malformed.source";
static final String METER_SUBSCRIBED = "reactor.subscribed";
static final String METER_FLOW_DURATION = "reactor.flow.duration";
static final String METER_ON_NEXT_DELAY = "reactor.onNext.delay";
static final String METER_REQUESTED = "reactor.requested";
static final String TAG_STATUS = "status";
static final String TAG_EXCEPTION = "exception";
static final String TAG_SEQUENCE_NAME = "flow";
static final String TAG_SEQUENCE_TYPE = "type";
static final String TAGVALUE_ON_ERROR = "error";
static final String TAGVALUE_ON_COMPLETE = "completed";
static final String TAGVALUE_CANCEL = "cancelled";
static final String TAGVALUE_FLUX = "Flux";
static final String TAGVALUE_MONO = "Mono";
static Tuple2<String, List<Tag>> resolveNameAndTags(Publisher<?> source) {
String name;
List<Tag> tags;
Scannable scannable = Scannable.from(source);
if (scannable.isScanAvailable()) {
String nameOrDefault = scannable.name();
if (scannable.stepName().equals(nameOrDefault)) {
name = REACTOR_DEFAULT_NAME;
}
else {
name = nameOrDefault;
}
tags = scannable.tags()
.map(tuple -> Tag.of(tuple.getT1(), tuple.getT2()))
.collect(Collectors.toList());
}
else {
LOGGER.warn("Attempting to activate metrics but the upstream is not Scannable. " +
"You might want to use `name()` (and optionally `tags()`) right before `metrics()`");
name = REACTOR_DEFAULT_NAME;
tags = Collections.emptyList();
}
return Tuples.of(name, tags);
}
final String name;
final List<Tag> tags;
@Nullable
final MeterRegistry registryCandidate;
FluxMetrics(Flux<? extends T> flux) {
this(flux, null);
}
FluxMetrics(Flux<? extends T> flux, @Nullable MeterRegistry registry) {
super(flux);
Tuple2<String, List<Tag>> nameAndTags = resolveNameAndTags(flux);
this.name = nameAndTags.getT1();
this.tags = nameAndTags.getT2();
this.registryCandidate = registry;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
MeterRegistry registry = Metrics.globalRegistry;
if (registryCandidate != null) {
registry = registryCandidate;
}
source.subscribe(new MicrometerFluxMetricsSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags));
}
static class MicrometerFluxMetricsSubscriber<T> implements InnerOperator<T,T> {
final CoreSubscriber<? super T> actual;
final MeterRegistry registry;
final Clock clock;
final Counter malformedSourceCounter;
final Counter subscribedCounter;
final DistributionSummary requestedCounter;
Timer.Sample subscribeToTerminateSample;
long lastNextEventNanos = -1L;
boolean done;
@Nullable
Fuseable.QueueSubscription<T> qs;
Subscription s;
final Timer onNextIntervalTimer;
final Timer subscribeToCompleteTimer;
final Timer.Builder subscribeToErrorTimerBuilder;
final Timer subscribeToCancelTimer;
MicrometerFluxMetricsSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry,
Clock clock,
String sequenceName,
List<Tag> sequenceTags) {
this.actual = actual;
this.registry = registry;
this.clock = clock;
List<Tag> commonTags = new ArrayList<>();
commonTags.add(Tag.of(TAG_SEQUENCE_NAME, sequenceName));
commonTags.add(Tag.of(TAG_SEQUENCE_TYPE, TAGVALUE_FLUX));
commonTags.addAll(sequenceTags);
this.subscribeToCompleteTimer = Timer
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_STATUS, TAGVALUE_ON_COMPLETE)
.description("Times the duration elapsed between a subscription and the onComplete termination of the sequence")
.register(registry);
this.subscribeToCancelTimer = Timer
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_STATUS, TAGVALUE_CANCEL)
.description("Times the duration elapsed between a subscription and the cancellation of the sequence")
.register(registry);
this.subscribeToErrorTimerBuilder = Timer
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag");
this.onNextIntervalTimer = Timer
.builder(METER_ON_NEXT_DELAY)
.tags(commonTags)
.description("Measures delays between onNext signals (or between onSubscribe and first onNext)")
.register(registry);
this.subscribedCounter = Counter
.builder(METER_SUBSCRIBED)
.tags(commonTags)
.baseUnit("subscribers")
.description("Counts how many Reactor sequences have been subscribed to")
.register(registry);
this.malformedSourceCounter = registry.counter(METER_MALFORMED, commonTags);
if (!REACTOR_DEFAULT_NAME.equals(sequenceName)) {
this.requestedCounter = DistributionSummary
.builder(METER_REQUESTED)
.tags(commonTags)
.description("Counts the amount requested to a named Flux by all subscribers, until at least one requests an unbounded amount")
.baseUnit("requested amount")
.register(registry);
}
else {
requestedCounter = null;
}
}
@Override
public CoreSubscriber<? super T> actual() {
return actual;
}
@Override
public void onNext(T t) {
if (done) {
this.malformedSourceCounter.increment();
Operators.onNextDropped(t, actual.currentContext());
return;
}
long last = this.lastNextEventNanos;
this.lastNextEventNanos = clock.monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
if (done) {
this.malformedSourceCounter.increment();
Operators.onErrorDropped(e, actual.currentContext());
return;
}
done = true;
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
this.subscribeToTerminateSample.stop(timer);
actual.onError(e);
}
@Override
public void onComplete() {
if (done) {
this.malformedSourceCounter.increment();
return;
}
done = true;
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
actual.onComplete();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.subscribedCounter.increment();
this.subscribeToTerminateSample = Timer.start(registry);
this.lastNextEventNanos = clock.monotonicTime();
if (s instanceof Fuseable.QueueSubscription) {
this.qs = (Fuseable.QueueSubscription<T>) s;
}
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void request(long l) {
if (Operators.validate(l)) {
if (requestedCounter != null) {
requestedCounter.record(l);
}
s.request(l);
}
}
@Override
public void cancel() {
this.subscribeToTerminateSample.stop(subscribeToCancelTimer);
s.cancel();
}
}
static final class MicrometerFluxMetricsFuseableSubscriber<T> extends MicrometerFluxMetricsSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {
private int fusionMode;
MicrometerFluxMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
super(actual, registry, clock, sequenceName, sequenceTags);
}
@Override
public void onNext(T t) {
if (this.fusionMode == Fuseable.ASYNC) {
actual.onNext(null);
return;
}
if (done) {
this.malformedSourceCounter.increment();
Operators.onNextDropped(t, actual.currentContext());
return;
}
long last = this.lastNextEventNanos;
this.lastNextEventNanos = clock.monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
actual.onNext(t);
}
@Override
public int requestFusion(int mode) {
if (qs != null) {
this.fusionMode = qs.requestFusion(mode);
return fusionMode;
}
return Fuseable.NONE;
}
@Override
@Nullable
public T poll() {
if (qs == null) {
return null;
}
try {
T v = qs.poll();
if (v == null && fusionMode == SYNC) {
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
}
if (v != null) {
long last = this.lastNextEventNanos;
this.lastNextEventNanos = clock.monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
}
return v;
} catch (Throwable e) {
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
this.subscribeToTerminateSample.stop(timer);
throw e;
}
}
@Override
public void clear() {
if (qs != null) {
qs.clear();
}
}
@Override
public boolean isEmpty() {
return qs == null || qs.isEmpty();
}
@Override
public int size() {
return qs == null ? 0 : qs.size();
}
}
}