package io.vertx.micrometer.impl;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.micrometer.Label;
import io.vertx.micrometer.MetricsDomain;
import io.vertx.micrometer.MetricsNaming;
import io.vertx.micrometer.impl.meters.Counters;
import io.vertx.micrometer.impl.meters.Gauges;
import io.vertx.micrometer.impl.meters.Summaries;
import java.util.concurrent.atomic.LongAdder;
class VertxEventBusMetrics extends AbstractMetrics implements EventBusMetrics<VertxEventBusMetrics.Handler> {
private final static Handler IGNORED = new Handler(null);
private final Gauges<LongAdder> handlers;
private final Gauges<LongAdder> pending;
private final Counters processed;
private final Counters published;
private final Counters sent;
private final Counters received;
private final Counters delivered;
private final Counters discarded;
private final Counters replyFailures;
private final Summaries bytesRead;
private final Summaries bytesWritten;
VertxEventBusMetrics(MeterRegistry registry, MetricsNaming names) {
super(registry, MetricsDomain.EVENT_BUS);
handlers = longGauges(names.getEbHandlers(), "Number of event bus handlers in use", Label.EB_ADDRESS);
pending = longGauges(names.getEbPending(), "Number of messages not processed yet", Label.EB_ADDRESS, Label.EB_SIDE);
processed = counters(names.getEbProcessed(), "Number of processed messages", Label.EB_ADDRESS, Label.EB_SIDE);
published = counters(names.getEbPublished(), "Number of messages published (publish / subscribe)", Label.EB_ADDRESS, Label.EB_SIDE);
sent = counters(names.getEbSent(), "Number of messages sent (point-to-point)", Label.EB_ADDRESS, Label.EB_SIDE);
received = counters(names.getEbReceived(), "Number of messages received", Label.EB_ADDRESS, Label.EB_SIDE);
delivered = counters(names.getEbDelivered(), "Number of messages delivered to handlers", Label.EB_ADDRESS, Label.EB_SIDE);
discarded = counters(names.getEbDiscarded(), "Number of discarded messages", Label.EB_ADDRESS, Label.EB_SIDE);
replyFailures = counters(names.getEbReplyFailures(), "Number of message reply failures", Label.EB_ADDRESS, Label.EB_FAILURE);
bytesRead = summaries(names.getEbBytesRead(), "Number of bytes received while reading messages from event bus cluster peers", Label.EB_ADDRESS);
bytesWritten = summaries(names.getEbBytesWritten(), "Number of bytes sent while sending messages to event bus cluster peers", Label.EB_ADDRESS);
}
private static boolean isInternal(String address) {
return address.startsWith("__vertx.");
}
@Override
public Handler handlerRegistered(String address, String repliedAddress) {
if (isInternal(address)) {
return IGNORED;
}
handlers.get(address).increment();
return new Handler(address);
}
@Override
public void handlerUnregistered(Handler handler) {
if (isValid(handler)) {
handlers.get(handler.address).decrement();
}
}
@Override
public void scheduleMessage(Handler handler, boolean b) {
}
@Override
public void messageDelivered(Handler handler, boolean local) {
if (isValid(handler)) {
pending.get(handler.address, Labels.getSide(local)).decrement();
processed.get(handler.address, Labels.getSide(local)).increment();
}
}
@Override
public void discardMessage(Handler handler, boolean local, Message<?> msg) {
if (isValid(handler)) {
pending.get(handler.address, Labels.getSide(local)).decrement();
discarded.get(handler.address, Labels.getSide(local)).increment();
}
}
@Override
public void messageSent(String address, boolean publish, boolean local, boolean remote) {
if (!isInternal(address)) {
if (publish) {
published.get(address, Labels.getSide(local)).increment();
} else {
sent.get(address, Labels.getSide(local)).increment();
}
}
}
@Override
public void messageReceived(String address, boolean publish, boolean local, int handlers) {
if (!isInternal(address)) {
String origin = Labels.getSide(local);
pending.get(address, origin).add(handlers);
received.get(address, origin).increment();
if (handlers > 0) {
delivered.get(address, origin).increment();
}
}
}
@Override
public void messageWritten(String address, int numberOfBytes) {
if (!isInternal(address)) {
bytesWritten.get(address).record(numberOfBytes);
}
}
@Override
public void messageRead(String address, int numberOfBytes) {
if (!isInternal(address)) {
bytesRead.get(address).record(numberOfBytes);
}
}
@Override
public void replyFailure(String address, ReplyFailure failure) {
if (!isInternal(address)) {
replyFailures.get(address, failure.name()).increment();
}
}
@Override
public void close() {
}
private static boolean isValid(Handler handler) {
return handler != null && handler.address != null;
}
static class Handler {
private final String address;
Handler(String address) {
this.address = address;
}
}
}