/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package reactor.core.publisher;

import java.util.LinkedList;
import java.util.List;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

Captures the current stacktrace when this publisher is created and makes it available/visible for debugging purposes from the inner Subscriber.

Note that getting a stacktrace is a costly operation.

The operator sanitizes the stacktrace and removes noisy entries such as:

  • java.lang.Thread entries
  • method references with source line of 1 (bridge methods)
  • Tomcat worker thread entries
  • JUnit setup
Type parameters:
  • <T> – the value type passing through
See Also:
/** * Captures the current stacktrace when this publisher is created and * makes it available/visible for debugging purposes from * the inner Subscriber. * <p> * Note that getting a stacktrace is a costly operation. * <p> * The operator sanitizes the stacktrace and removes noisy entries such as: * <ul> * <li>java.lang.Thread entries</li> * <li>method references with source line of 1 (bridge methods)</li> * <li>Tomcat worker thread entries</li> * <li>JUnit setup</li> * </ul> * * @param <T> the value type passing through * @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a> */
final class FluxOnAssembly<T> extends FluxOperator<T, T> implements Fuseable, AssemblyOp { final AssemblySnapshotException snapshotStack;
If set to true, the creation of FluxOnAssembly will capture the raw stacktrace instead of the sanitized version.
/** * If set to true, the creation of FluxOnAssembly will capture the raw stacktrace * instead of the sanitized version. */
static final boolean fullStackTrace = Boolean.parseBoolean(System.getProperty( "reactor.trace.assembly.fullstacktrace", "false"));
Create an assembly trace decorated as a Flux.
/** * Create an assembly trace decorated as a {@link Flux}. */
FluxOnAssembly(Flux<? extends T> source) { super(source); this.snapshotStack = new AssemblySnapshotException(); }
If light, create an assembly marker that has no trace but just shows a custom description (eg. a name for a Flux or a wider correlation ID) and exposed as a Flux.
/** * If light, create an assembly marker that has no trace but just shows a custom * description (eg. a name for a Flux or a wider correlation ID) and exposed as a * {@link Flux}. */
FluxOnAssembly(Flux<? extends T> source, @Nullable String description, boolean light) { super(source); if (light) { this.snapshotStack = new AssemblyLightSnapshotException(description); } else { this.snapshotStack = new AssemblySnapshotException(description); } } @Override public String stepName() { return snapshotStack.operatorAssemblyInformation(); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.ACTUAL_METADATA) return !snapshotStack.checkpointed; return super.scanUnsafe(key); } @Override public String toString() { return snapshotStack.operatorAssemblyInformation(); } static String getStacktrace(AssemblySnapshotException snapshotStack) { StackTraceElement[] stes = snapshotStack.getStackTrace(); if (!fullStackTrace) { return Traces.stackTraceToSanitizedString(stes); } else { return Traces.stackTraceToString(stes); } } static void fillStacktraceHeader(StringBuilder sb, Class<?> sourceClass, AssemblySnapshotException ase) { if (ase.isLight()) { sb.append("\nAssembly site of producer [") .append(sourceClass.getName()) .append("] is identified by light checkpoint [") .append(ase.getMessage()) .append("]."); return; } sb.append("\nAssembly trace from producer [") .append(sourceClass.getName()) .append("]"); if (ase.getMessage() != null) { sb.append(", described as [") .append(ase.getMessage()) .append("]"); } sb.append(" :\n"); } @SuppressWarnings("unchecked") static <T> void subscribe(CoreSubscriber<? super T> s, Flux<? extends T> source, @Nullable AssemblySnapshotException snapshotStack) { if(snapshotStack != null) { if (s instanceof ConditionalSubscriber) { ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s; source.subscribe(new OnAssemblyConditionalSubscriber<>(cs, snapshotStack, source)); } else { source.subscribe(new OnAssemblySubscriber<>(s, snapshotStack, source)); } } } @Override @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber<? super T> actual) { if(snapshotStack != null) { if (actual instanceof ConditionalSubscriber) { ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) actual; source.subscribe(new OnAssemblyConditionalSubscriber<>(cs, snapshotStack, source)); } else { source.subscribe(new OnAssemblySubscriber<>(actual, snapshotStack, source)); } } }
The exception that captures assembly context, possibly with a user-readable description or a wider correlation ID (which serves as the exception's message). Use the empty constructor if the later is not relevant.
/** * The exception that captures assembly context, possibly with a user-readable * description or a wider correlation ID (which serves as the exception's * {@link #getMessage() message}). Use the empty constructor if the later is not * relevant. */
static class AssemblySnapshotException extends RuntimeException { final boolean checkpointed; String cached; AssemblySnapshotException() { super(); this.checkpointed = false; }
Params:
/** * @param description a description for the assembly traceback. * Use {@link #AssemblySnapshotException()} rather than null if not relevant. */
AssemblySnapshotException(@Nullable String description) { super(description); this.checkpointed = true; } public boolean isLight() { return false; } @Override public String toString() { if(cached == null){ cached = getStacktrace(this); } return cached; } String operatorAssemblyInformation() { return Traces.extractOperatorAssemblyInformation(toString()); } } static final class AssemblyLightSnapshotException extends AssemblySnapshotException { AssemblyLightSnapshotException(@Nullable String description) { super(description); cached = "checkpoint(\""+description+"\")"; } @Override public synchronized Throwable fillInStackTrace() { return this; //intentionally NO-OP } @Override public boolean isLight() { return true; } @Override String operatorAssemblyInformation() { return toString(); } }
The holder for the assembly stacktrace (as its message).
/** * The holder for the assembly stacktrace (as its message). */
static final class OnAssemblyException extends RuntimeException { final List<Tuple3<Integer, String, Integer>> chainOrder = new LinkedList<>(); /** */ private static final long serialVersionUID = 5278398300974016773L; OnAssemblyException(Publisher<?> parent, AssemblySnapshotException ase, String message) { super(message); //skip the "error seen by" if light (no stack) if (!ase.isLight()) { chainOrder.add(Tuples.of(parent.hashCode(), Traces.extractOperatorAssemblyInformation(message, true), 0)); } } void mapLine(int indent, StringBuilder sb, String s) { for (int i = 0; i < indent; i++) { sb.append("\t"); } sb.append("\t|_\t") .append(s) .append("\n"); } @Override public synchronized Throwable fillInStackTrace() { return this; } void add(Publisher<?> parent, String stacktrace) { //noinspection ConstantConditions int key = getParentOrThis(Scannable.from(parent)); synchronized (chainOrder) { int i = 0; int n = chainOrder.size(); int j = n - 1; Tuple3<Integer, String, Integer> tmp; while(j >= 0){ tmp = chainOrder.get(j); //noinspection ConstantConditions if(tmp.getT1() == key){ //noinspection ConstantConditions i = tmp.getT3(); break; } j--; } for(;;){ Tuple3<Integer, String, Integer> t = Tuples.of( parent.hashCode(), Traces.extractOperatorAssemblyInformation(stacktrace, true), i); if(!chainOrder.contains(t)){ chainOrder.add(t); break; } i++; } } } @Override public String getMessage() { //skip the "error has been observed" traceback if mapped traceback is empty synchronized (chainOrder) { if (chainOrder.isEmpty()) { return super.getMessage(); } StringBuilder sb = new StringBuilder(super.getMessage()).append( "Error has been observed by the following operator(s):\n"); for(Tuple3<Integer, String, Integer> t : chainOrder) { //noinspection ConstantConditions mapLine(t.getT3(), sb, t.getT2()); } return sb.toString(); } } } static int getParentOrThis(Scannable parent) { return parent.parents() .filter(s -> !(s instanceof AssemblyOp)) .findFirst() .map(Object::hashCode) .orElse(parent.hashCode()); } static class OnAssemblySubscriber<T> implements InnerOperator<T, T>, QueueSubscription<T> { final AssemblySnapshotException snapshotStack; final Publisher<?> parent; final CoreSubscriber<? super T> actual; QueueSubscription<T> qs; Subscription s; int fusionMode; OnAssemblySubscriber(CoreSubscriber<? super T> actual, AssemblySnapshotException snapshotStack, Publisher<?> parent) { this.actual = actual; this.snapshotStack = snapshotStack; this.parent = parent; } @Override public final CoreSubscriber<? super T> actual() { return actual; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return s; if (key == Attr.ACTUAL_METADATA) return !snapshotStack.checkpointed; return InnerOperator.super.scanUnsafe(key); } @Override public String toString() { return snapshotStack.operatorAssemblyInformation(); } @Override public String stepName() { return toString(); } @Override final public void onNext(T t) { actual.onNext(t); } @Override final public void onError(Throwable t) { actual.onError(fail(t)); } @Override final public void onComplete() { actual.onComplete(); } @Override final public int requestFusion(int requestedMode) { QueueSubscription<T> qs = this.qs; if (qs != null) { int m = qs.requestFusion(requestedMode); if (m != Fuseable.NONE) { fusionMode = m; } return m; } return Fuseable.NONE; } final Throwable fail(Throwable t) { StringBuilder sb = new StringBuilder(); fillStacktraceHeader(sb, parent.getClass(), snapshotStack); OnAssemblyException set = null; if (!snapshotStack.isLight()) { sb.append(snapshotStack.toString()); } if (t.getSuppressed().length > 0) { for (Throwable e : t.getSuppressed()) { if (e instanceof OnAssemblyException) { OnAssemblyException oae = ((OnAssemblyException) e); oae.add(parent, sb.toString()); set = oae; break; } } } if (set == null) { t = Exceptions.addSuppressed(t, new OnAssemblyException(parent, snapshotStack, sb.toString())); } else if(snapshotStack.checkpointed) { t = Exceptions.addSuppressed(t, snapshotStack); } return t; } @Override final public boolean isEmpty() { try { return qs.isEmpty(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw Exceptions.propagate(fail(ex)); } } @Override final public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; this.qs = Operators.as(s); actual.onSubscribe(this); } } @Override final public int size() { return qs.size(); } @Override final public void clear() { qs.clear(); } @Override final public void request(long n) { s.request(n); } @Override final public void cancel() { s.cancel(); } @Override @Nullable final public T poll() { try { return qs.poll(); } catch (final Throwable ex) { Exceptions.throwIfFatal(ex); throw Exceptions.propagate(fail(ex)); } } } static final class OnAssemblyConditionalSubscriber<T> extends OnAssemblySubscriber<T> implements ConditionalSubscriber<T> { final ConditionalSubscriber<? super T> actualCS; OnAssemblyConditionalSubscriber(ConditionalSubscriber<? super T> actual, AssemblySnapshotException stacktrace, Publisher<?> parent) { super(actual, stacktrace, parent); this.actualCS = actual; } @Override public boolean tryOnNext(T t) { return actualCS.tryOnNext(t); } } } interface AssemblyOp {}