/*
 * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.ArrayList;
import java.util.List;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

Activate metrics gathering on a Mono, assumes Micrometer is on the classpath.
Author:Simon Baslé
Implementation Note:Metrics.isMicrometerAvailable() test should be performed BEFORE instantiating or referencing this class, otherwise a NoClassDefFoundError will be thrown if Micrometer is not there.
/** * Activate metrics gathering on a {@link Mono}, assumes Micrometer is on the classpath. * * @implNote Metrics.isMicrometerAvailable() test should be performed BEFORE instantiating * or referencing this class, otherwise a {@link NoClassDefFoundError} will be thrown if * Micrometer is not there. * * @author Simon Baslé */
final class MonoMetrics<T> extends MonoOperator<T, T> { final String name; final List<Tag> tags; @Nullable final MeterRegistry meterRegistry; MonoMetrics(Mono<? extends T> mono) { this(mono, null); }
For testing purposes.
Params:
  • meterRegistry – the registry to use
/** * For testing purposes. * * @param meterRegistry the registry to use */
MonoMetrics(Mono<? extends T> mono, @Nullable MeterRegistry meterRegistry) { super(mono); Tuple2<String, List<Tag>> nameAndTags = FluxMetrics.resolveNameAndTags(mono); this.name = nameAndTags.getT1(); this.tags = nameAndTags.getT2(); this.meterRegistry = meterRegistry; } @Override public void subscribe(CoreSubscriber<? super T> actual) { MeterRegistry registry = Metrics.globalRegistry; if (meterRegistry != null) { registry = meterRegistry; } source.subscribe(new MicrometerMonoMetricsSubscriber<>(actual, registry, Clock.SYSTEM, this.name, this.tags)); } static class MicrometerMonoMetricsSubscriber<T> implements InnerOperator<T,T> { final CoreSubscriber<? super T> actual; final MeterRegistry registry; final Clock clock; final Counter malformedSourceCounter; final Counter subscribedCounter; Timer.Sample subscribeToTerminateSample; boolean done; @Nullable Fuseable.QueueSubscription<T> qs; Subscription s; final Timer subscribeToCompleteTimer; final Timer.Builder subscribeToErrorTimerBuilder; final Timer subscribeToCancelTimer; MicrometerMonoMetricsSubscriber(CoreSubscriber<? super T> actual, MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) { this.actual = actual; this.registry = registry; this.clock = clock; List<Tag> commonTags = new ArrayList<>(); commonTags.add(Tag.of(FluxMetrics.TAG_SEQUENCE_NAME, sequenceName)); commonTags.add(Tag.of(FluxMetrics.TAG_SEQUENCE_TYPE, FluxMetrics.TAGVALUE_MONO)); commonTags.addAll(sequenceTags); this.subscribeToCompleteTimer = Timer .builder(FluxMetrics.METER_FLOW_DURATION) .tags(commonTags) .tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_COMPLETE) .description("Times the duration elapsed between a subscription and the onComplete termination of the sequence") .register(registry); this.subscribeToCancelTimer = Timer .builder(FluxMetrics.METER_FLOW_DURATION) .tags(commonTags) .tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_CANCEL) .description("Times the duration elapsed between a subscription and the cancellation of the sequence") .register(registry); //note that Builder ISN'T TRULY IMMUTABLE. This is ok though as there will only ever be one usage. this.subscribeToErrorTimerBuilder = Timer .builder(FluxMetrics.METER_FLOW_DURATION) .tags(commonTags) .tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_ERROR) .description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag."); this.subscribedCounter = Counter .builder(FluxMetrics.METER_SUBSCRIBED) .tags(commonTags) .baseUnit("subscribers") .description("Counts how many Reactor sequences have been subscribed to") .register(registry); this.malformedSourceCounter = registry.counter(FluxMetrics.METER_MALFORMED, commonTags); } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override public void onNext(T t) { if (done) { this.malformedSourceCounter.increment(); Operators.onNextDropped(t, actual.currentContext()); return; } actual.onNext(t); } @Override public void onError(Throwable e) { if (done) { this.malformedSourceCounter.increment(); Operators.onErrorDropped(e, actual.currentContext()); return; } done = true; //register a timer for that particular exception Timer timer = subscribeToErrorTimerBuilder .tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName()) .register(registry); //record error termination this.subscribeToTerminateSample.stop(timer); actual.onError(e); } @Override public void onComplete() { if (done) { this.malformedSourceCounter.increment(); return; } done = true; this.subscribeToTerminateSample.stop(subscribeToCompleteTimer); actual.onComplete(); } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.subscribedCounter.increment(); this.subscribeToTerminateSample = Timer.start(registry); if (s instanceof Fuseable.QueueSubscription) { //noinspection unchecked this.qs = (Fuseable.QueueSubscription<T>) s; } this.s = s; actual.onSubscribe(this); } } @Override public void request(long l) { if (Operators.validate(l)) { s.request(l); } } @Override public void cancel() { this.subscribeToTerminateSample.stop(subscribeToCancelTimer); s.cancel(); } }
Type parameters:
  • <T> –
Implementation Note:we don't want to particularly track fusion-specific calls, as this subscriber would only detect fused subsequences immediately upstream of it, so Counters would be a bit irrelevant. We however want to instrument onNext counts.
/** * @implNote we don't want to particularly track fusion-specific calls, as this * subscriber would only detect fused subsequences immediately upstream of it, so * Counters would be a bit irrelevant. We however want to instrument onNext counts. * * @param <T> */
static final class MicrometerMonoMetricsFuseableSubscriber<T> extends MicrometerMonoMetricsSubscriber<T> implements Fuseable, Fuseable.QueueSubscription<T> { private int mode; MicrometerMonoMetricsFuseableSubscriber(CoreSubscriber<? super T> actual, MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) { super(actual, registry, clock, sequenceName, sequenceTags); } @Override public void onNext(T t) { // if (this.mode == ASYNC) { // actual.onNext(null); // } // else { super.onNext(t); // } } @Override public int requestFusion(int mode) { //Simply negotiate the fusion by delegating: if (qs != null) { this.mode = qs.requestFusion(mode); return this.mode; } return Fuseable.NONE; //should not happen unless requestFusion called before subscribe } @Override @Nullable public T poll() { if (qs == null) { return null; } try { T v = qs.poll(); if (v == null && this.mode == SYNC) { //this is also a complete event this.subscribeToTerminateSample.stop(subscribeToCompleteTimer); } return v; } catch (Throwable e) { //register a timer for that particular exception Timer timer = subscribeToErrorTimerBuilder .tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName()) .register(registry); //record error termination this.subscribeToTerminateSample.stop(timer); throw e; } } @Override public void clear() { if (qs != null) { qs.clear(); } } @Override public boolean isEmpty() { return qs == null || qs.isEmpty(); } @Override public int size() { return qs == null ? 0 : qs.size(); } } }