package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.core.publisher.FluxOnAssembly.AssemblySnapshotException;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class SignalLogger<IN> implements SignalPeek<IN> {
final static int CONTEXT_PARENT = 0b0100000000;
final static int SUBSCRIBE = 0b0010000000;
final static int ON_SUBSCRIBE = 0b0001000000;
final static int ON_NEXT = 0b0000100000;
final static int ON_ERROR = 0b0000010000;
final static int ON_COMPLETE = 0b0000001000;
final static int REQUEST = 0b0000000100;
final static int CANCEL = 0b0000000010;
final static int AFTER_TERMINATE = 0b0000000001;
final static int ALL =
CONTEXT_PARENT | CANCEL | ON_COMPLETE | ON_ERROR | REQUEST | ON_SUBSCRIBE | ON_NEXT | SUBSCRIBE;
final static AtomicLong IDS = new AtomicLong(1);
final Publisher<IN> source;
final Logger log;
final boolean fuseable;
final int options;
final Level level;
final String operatorLine;
final long id;
static final String LOG_TEMPLATE = "{}({})";
static final String LOG_TEMPLATE_FUSEABLE = "| {}({})";
SignalLogger(Publisher<IN> source,
@Nullable String category,
Level level,
boolean correlateStack,
SignalType... options) {
this(source, category, level, correlateStack, Loggers::getLogger, options);
}
SignalLogger(Publisher<IN> source,
@Nullable String category,
Level level,
boolean correlateStack,
Function<String, Logger> loggerSupplier,
@Nullable SignalType... options) {
this.source = Objects.requireNonNull(source, "source");
this.id = IDS.getAndIncrement();
this.fuseable = source instanceof Fuseable;
if (correlateStack) {
operatorLine = Traces.extractOperatorAssemblyInformation(new AssemblySnapshotException().toString());
}
else {
operatorLine = null;
}
boolean generated =
category == null || category.isEmpty() || category.endsWith(".");
category = generated && category == null ? "reactor." : category;
if (generated) {
if (source instanceof Mono) {
category += "Mono." + source.getClass()
.getSimpleName()
.replace("Mono", "");
}
else if (source instanceof ParallelFlux) {
category += "Parallel." + source.getClass()
.getSimpleName()
.replace("Parallel", "");
}
else {
category += "Flux." + source.getClass()
.getSimpleName()
.replace("Flux", "");
}
category += "." + id;
}
this.log = loggerSupplier.apply(category);
this.level = level;
if (options == null || options.length == 0) {
this.options = ALL;
}
else {
int opts = 0;
for (SignalType option : options) {
if (option == SignalType.CANCEL) {
opts |= CANCEL;
}
else if (option == SignalType.CURRENT_CONTEXT) {
opts |= CONTEXT_PARENT;
}
else if (option == SignalType.ON_SUBSCRIBE) {
opts |= ON_SUBSCRIBE;
}
else if (option == SignalType.REQUEST) {
opts |= REQUEST;
}
else if (option == SignalType.ON_NEXT) {
opts |= ON_NEXT;
}
else if (option == SignalType.ON_ERROR) {
opts |= ON_ERROR;
}
else if (option == SignalType.ON_COMPLETE) {
opts |= ON_COMPLETE;
}
else if (option == SignalType.SUBSCRIBE) {
opts |= SUBSCRIBE;
}
else if (option == SignalType.AFTER_TERMINATE) {
opts |= AFTER_TERMINATE;
}
}
this.options = opts;
}
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
return null;
}
void log(SignalType signalType, Object signalValue) {
String line = fuseable ? LOG_TEMPLATE_FUSEABLE : LOG_TEMPLATE;
if (operatorLine != null) {
line = line + " " + operatorLine;
}
if (level == Level.FINEST) {
log.trace(line, signalType, signalValue);
}
else if (level == Level.FINE) {
log.debug(line, signalType, signalValue);
}
else if (level == Level.INFO) {
log.info(line, signalType, signalValue);
}
else if (level == Level.WARNING) {
log.warn(line, signalType, signalValue);
}
else if (level == Level.SEVERE) {
log.error(line, signalType, signalValue);
}
}
void safeLog(SignalType signalType, Object signalValue) {
try {
log(signalType, signalValue);
}
catch (UnsupportedOperationException uoe) {
log(signalType, String.valueOf(signalValue));
if (log.isDebugEnabled()) {
log.debug("UnsupportedOperationException has been raised by the logging framework, does your log() placement make sense? " +
"eg. 'window(2).log()' instead of 'window(2).flatMap(w -> w.log())'", uoe);
}
}
}
static String subscriptionAsString(@Nullable Subscription s) {
if (s == null) {
return "null subscription";
}
StringBuilder asString = new StringBuilder();
if (s instanceof Fuseable.SynchronousSubscription) {
asString.append("[Synchronous Fuseable] ");
}
else if (s instanceof Fuseable.QueueSubscription) {
asString.append("[Fuseable] ");
}
Class<? extends Subscription> clazz = s.getClass();
String name = clazz.getCanonicalName();
if (name == null) {
name = clazz.getName();
}
name = name.replaceFirst(clazz.getPackage()
.getName() + ".", "");
asString.append(name);
return asString.toString();
}
@Override
@Nullable
public Consumer<? super Subscription> onSubscribeCall() {
if ((options & ON_SUBSCRIBE) == ON_SUBSCRIBE && (level != Level.INFO || log.isInfoEnabled())) {
return s -> log(SignalType.ON_SUBSCRIBE, subscriptionAsString(s));
}
return null;
}
@Nullable
@Override
public Consumer<? super Context> onCurrentContextCall() {
if ((options & CONTEXT_PARENT) == CONTEXT_PARENT && (level != Level.INFO || log
.isInfoEnabled())) {
return c -> log(SignalType.ON_CONTEXT, c);
}
return null;
}
@Override
@Nullable
public Consumer<? super IN> onNextCall() {
if ((options & ON_NEXT) == ON_NEXT && (level != Level.INFO || log.isInfoEnabled())) {
return d -> safeLog(SignalType.ON_NEXT, d);
}
return null;
}
@Override
@Nullable
public Consumer<? super Throwable> onErrorCall() {
boolean shouldLogAsDebug = level == Level.FINE && log.isDebugEnabled();
boolean shouldLogAsTrace = level == Level.FINEST && log.isTraceEnabled();
boolean shouldLogAsError = level != Level.FINE && level != Level.FINEST && log.isErrorEnabled();
if ((options & ON_ERROR) == ON_ERROR && (shouldLogAsError || shouldLogAsDebug ||
shouldLogAsTrace)) {
String line = fuseable ? LOG_TEMPLATE_FUSEABLE : LOG_TEMPLATE;
if (operatorLine != null) {
line = line + " " + operatorLine;
}
String s = line;
if (shouldLogAsTrace) {
return e -> {
log.trace(s, SignalType.ON_ERROR, e, source);
log.trace("", e);
};
}
else if (shouldLogAsDebug) {
return e -> {
log.debug(s, SignalType.ON_ERROR, e, source);
log.debug("", e);
};
}
else {
return e -> {
log.error(s, SignalType.ON_ERROR, e, source);
log.error("", e);
};
}
}
return null;
}
@Override
@Nullable
public Runnable onCompleteCall() {
if ((options & ON_COMPLETE) == ON_COMPLETE && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.ON_COMPLETE, "");
}
return null;
}
@Override
@Nullable
public Runnable onAfterTerminateCall() {
if ((options & AFTER_TERMINATE) == AFTER_TERMINATE && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.AFTER_TERMINATE, "");
}
return null;
}
@Override
@Nullable
public LongConsumer onRequestCall() {
if ((options & REQUEST) == REQUEST && (level != Level.INFO || log.isInfoEnabled())) {
return n -> log(SignalType.REQUEST,
Long.MAX_VALUE == n ? "unbounded" : n);
}
return null;
}
@Override
@Nullable
public Runnable onCancelCall() {
if ((options & CANCEL) == CANCEL && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.CANCEL, "");
}
return null;
}
@Override
public String toString() {
return "/loggers/" + log.getName() + "/" + id;
}
}