package io.vertx.micrometer.impl;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MetricsDomain;
import io.vertx.micrometer.impl.meters.Counters;
import io.vertx.micrometer.impl.meters.Gauges;
import io.vertx.micrometer.impl.meters.Summaries;
import io.vertx.micrometer.impl.meters.Timers;
import java.util.concurrent.atomic.LongAdder;
class VertxEventBusMetrics extends AbstractMetrics implements EventBusMetrics<VertxEventBusMetrics.Handler> {
private final static Handler IGNORED = new Handler(null);
private final Gauges<LongAdder> handlers;
private final Gauges<LongAdder> pending;
private final Counters published;
private final Counters sent;
private final Counters received;
private final Counters delivered;
private final Counters errorCount;
private final Counters replyFailures;
private final Timers processTime;
private final Summaries bytesRead;
private final Summaries bytesWritten;
VertxEventBusMetrics(MeterRegistry registry) {
super(registry, MetricsDomain.EVENT_BUS);
handlers = longGauges("handlers", "Number of event bus handlers in use", Label.EB_ADDRESS);
pending = longGauges("pending", "Number of messages not processed yet", Label.EB_ADDRESS, Label.EB_SIDE);
published = counters("published", "Number of messages published (publish / subscribe)", Label.EB_ADDRESS, Label.EB_SIDE);
sent = counters("sent", "Number of messages sent (point-to-point)", Label.EB_ADDRESS, Label.EB_SIDE);
received = counters("received", "Number of messages received", Label.EB_ADDRESS, Label.EB_SIDE);
delivered = counters("delivered", "Number of messages delivered to handlers", Label.EB_ADDRESS, Label.EB_SIDE);
errorCount = counters("errors", "Number of errors", Label.EB_ADDRESS, Label.CLASS_NAME);
replyFailures = counters("replyFailures", "Number of message reply failures", Label.EB_ADDRESS, Label.EB_FAILURE);
processTime = timers("processingTime", "Processing time", Label.EB_ADDRESS);
bytesRead = summaries("bytesRead", "Number of bytes received while reading messages from event bus cluster peers", Label.EB_ADDRESS);
bytesWritten = summaries("bytesWritten", "Number of bytes sent while sending messages to event bus cluster peers", Label.EB_ADDRESS);
}
private static boolean isInternal(String address) {
return address.startsWith("__vertx.");
}
@Override
public Handler handlerRegistered(String address, String repliedAddress) {
if (isInternal(address)) {
return IGNORED;
}
handlers.get(address).increment();
return new Handler(address);
}
@Override
public void handlerUnregistered(Handler handler) {
if (isValid(handler)) {
handlers.get(handler.address).decrement();
}
}
@Override
public void scheduleMessage(Handler handler, boolean b) {
}
@Override
public void beginHandleMessage(Handler handler, boolean local) {
if (isValid(handler)) {
pending.get(handler.address, Labels.getSide(local)).decrement();
handler.timer = processTime.start();
}
}
@Override
public void endHandleMessage(Handler handler, Throwable failure) {
if (isValid(handler)) {
handler.timer.end(handler.address);
if (failure != null) {
errorCount.get(handler.address, failure.getClass().getSimpleName()).increment();
}
}
}
@Override
public void messageSent(String address, boolean publish, boolean local, boolean remote) {
if (!isInternal(address)) {
if (publish) {
published.get(address, Labels.getSide(local)).increment();
} else {
sent.get(address, Labels.getSide(local)).increment();
}
}
}
@Override
public void messageReceived(String address, boolean publish, boolean local, int handlers) {
if (!isInternal(address)) {
String origin = Labels.getSide(local);
pending.get(address, origin).add(handlers);
received.get(address, origin).increment();
if (handlers > 0) {
delivered.get(address, origin).increment();
}
}
}
@Override
public void messageWritten(String address, int numberOfBytes) {
if (!isInternal(address)) {
bytesWritten.get(address).record(numberOfBytes);
}
}
@Override
public void messageRead(String address, int numberOfBytes) {
if (!isInternal(address)) {
bytesRead.get(address).record(numberOfBytes);
}
}
@Override
public void replyFailure(String address, ReplyFailure failure) {
if (!isInternal(address)) {
replyFailures.get(address, failure.name()).increment();
}
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public void close() {
}
private static boolean isValid(Handler handler) {
return handler != null && handler.address != null;
}
static class Handler {
private final String address;
private Timers.EventTiming timer;
Handler(String address) {
this.address = address;
}
}
}