/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.reactivestreams.Subscriber;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

A Scannable component exposes state in a non strictly memory consistent way and results should be understood as best-effort hint of the underlying state. This is useful to retro-engineer a component graph such as a flux operator chain via Stream queries from actuals(), parents() and inners(). This allows for visiting patterns and possibly enable serviceability features.

Scannable is also a useful tool for the advanced user eager to learn which kind of state we usually manage in the package-scope schedulers or operators implementations.

Author:Stephane Maldini
/** * A Scannable component exposes state in a non strictly memory consistent way and * results should be understood as best-effort hint of the underlying state. This is * useful to retro-engineer a component graph such as a flux operator chain via * {@link Stream} queries from * {@link #actuals()}, {@link #parents()} and {@link #inners()}. This allows for * visiting patterns and possibly enable serviceability features. * <p> * Scannable is also a useful tool for the advanced user eager to learn which kind * of state we usually manage in the package-scope schedulers or operators * implementations. * * @author Stephane Maldini */
@FunctionalInterface public interface Scannable {
The pattern for matching words unrelated to operator name. Used to strip an operator name of various prefixes and suffixes.
/** * The pattern for matching words unrelated to operator name. * Used to strip an operator name of various prefixes and suffixes. */
Pattern OPERATOR_NAME_UNRELATED_WORDS_PATTERN = Pattern.compile("Parallel|Flux|Mono|Publisher|Subscriber|Fuseable|Operator|Conditional");
Base class for Scannable attributes, which all can define a meaningful default.
Type parameters:
  • <T> – the type of data associated with an attribute
/** * Base class for {@link Scannable} attributes, which all can define a meaningful * default. * * @param <T> the type of data associated with an attribute */
class Attr<T> { /* IMPLEMENTATION NOTE Note that some attributes define a object-to-T converter, which means their private {@link #tryConvert(Object)} method can safely be used by {@link Scannable#scan(Attr)}, making them resilient to class cast exceptions. */
The direct dependent component downstream reference if any. Operators in Flux/Mono for instance delegate to a target Subscriber, which is going to be the actual chain navigated with this reference key. Subscribers are not always Scannable, but this attribute will convert these raw results to an unavailable scan object in this case.

A reference chain downstream can be navigated via Scannable.actuals().

/** * The direct dependent component downstream reference if any. Operators in * Flux/Mono for instance delegate to a target Subscriber, which is going to be * the actual chain navigated with this reference key. Subscribers are not always * {@link Scannable}, but this attribute will convert these raw results to an * {@link Scannable#isScanAvailable() unavailable scan} object in this case. * <p> * A reference chain downstream can be navigated via {@link Scannable#actuals()}. */
public static final Attr<Scannable> ACTUAL = new Attr<>(null, Scannable::from);
Indicate that for some purposes a Scannable should be used as additional source of information about a contiguous Scannable in the chain.

For example Scannable.steps() uses this to collate the stepName of an assembly trace to its wrapped operator (the one before it in the assembly chain).

/** * Indicate that for some purposes a {@link Scannable} should be used as additional * source of information about a contiguous {@link Scannable} in the chain. * <p> * For example {@link Scannable#steps()} uses this to collate the * {@link Scannable#stepName() stepName} of an assembly trace to its * wrapped operator (the one before it in the assembly chain). */
public static final Attr<Boolean> ACTUAL_METADATA = new Attr<>(false);
A Integer attribute implemented by components with a backlog capacity. It will expose current queue size or similar related to user-provided held data. Note that some operators and processors CAN keep a backlog larger than Integer.MAX_VALUE, in which case the Attr LARGE_BUFFERED should be used instead. Such operators will attempt to serve a BUFFERED query but will return Integer.MIN_VALUE when actual buffer size is oversized for int.
/** * A {@link Integer} attribute implemented by components with a backlog * capacity. It will expose current queue size or similar related to * user-provided held data. Note that some operators and processors CAN keep * a backlog larger than {@code Integer.MAX_VALUE}, in which case * the {@link Attr#LARGE_BUFFERED Attr} {@literal LARGE_BUFFERED} * should be used instead. Such operators will attempt to serve a BUFFERED * query but will return {@link Integer#MIN_VALUE} when actual buffer size is * oversized for int. */
public static final Attr<Integer> BUFFERED = new Attr<>(0);
Return an an Integer capacity when no Attr<T>.PREFETCH is defined or when an arbitrary maximum limit is applied to the backlog capacity of the scanned component. Integer.MAX_VALUE signal unlimited capacity.

Note: This attribute usually resolves to a constant value.

/** * Return an an {@link Integer} capacity when no {@link #PREFETCH} is defined or * when an arbitrary maximum limit is applied to the backlog capacity of the * scanned component. {@link Integer#MAX_VALUE} signal unlimited capacity. * <p> * Note: This attribute usually resolves to a constant value. */
public static final Attr<Integer> CAPACITY = new Attr<>(0);
A Boolean attribute indicating whether or not a downstream component has interrupted consuming this scanned component, e.g., a cancelled subscription. Note that it differs from Attr<T>.TERMINATED which is intended for "normal" shutdown cycles.
/** * A {@link Boolean} attribute indicating whether or not a downstream component * has interrupted consuming this scanned component, e.g., a cancelled * subscription. Note that it differs from {@link #TERMINATED} which is * intended for "normal" shutdown cycles. */
public static final Attr<Boolean> CANCELLED = new Attr<>(false);
Delay_Error exposes a Boolean whether the scanned component actively supports error delaying if it manages a backlog instead of fast error-passing which might drop pending backlog.

Note: This attribute usually resolves to a constant value.

/** * Delay_Error exposes a {@link Boolean} whether the scanned component * actively supports error delaying if it manages a backlog instead of fast * error-passing which might drop pending backlog. * <p> * Note: This attribute usually resolves to a constant value. */
public static final Attr<Boolean> DELAY_ERROR = new Attr<>(false);
a Throwable attribute which indicate an error state if the scanned component keeps track of it.
/** * a {@link Throwable} attribute which indicate an error state if the scanned * component keeps track of it. */
public static final Attr<Throwable> ERROR = new Attr<>(null);
Similar to BUFFERED, but reserved for operators that can hold a backlog of items that can grow beyond Integer.MAX_VALUE. These operators will also answer to a BUFFERED query up to the point where their buffer is actually too large, at which point they'll return Integer.MIN_VALUE, which serves as a signal that this attribute should be used instead. Defaults to null.

Flux.flatMap, Flux.filterWhen, TopicProcessor, and Flux.window (with overlap) are known to use this attribute.

/** * Similar to {@link Attr#BUFFERED}, but reserved for operators that can hold * a backlog of items that can grow beyond {@literal Integer.MAX_VALUE}. These * operators will also answer to a {@link Attr#BUFFERED} query up to the point * where their buffer is actually too large, at which point they'll return * {@literal Integer.MIN_VALUE}, which serves as a signal that this attribute * should be used instead. Defaults to {@literal null}. * <p> * {@code Flux.flatMap}, {@code Flux.filterWhen}, {@link reactor.core.publisher.TopicProcessor}, * and {@code Flux.window} (with overlap) are known to use this attribute. */
public static final Attr<Long> LARGE_BUFFERED = new Attr<>(null);
An arbitrary name given to the operator component. Defaults to null.
/** * An arbitrary name given to the operator component. Defaults to {@literal null}. */
public static final Attr<String> NAME = new Attr<>(null);
Parent key exposes the direct upstream relationship of the scanned component. It can be a Publisher source to an operator, a Subscription to a Subscriber (main flow if ambiguous with inner Subscriptions like flatMap), a Scheduler to a Worker. These types are not always Scannable, but this attribute will convert such raw results to an unavailable scan object in this case.

Scannable.parents() can be used to navigate the parent chain.

/** * Parent key exposes the direct upstream relationship of the scanned component. * It can be a Publisher source to an operator, a Subscription to a Subscriber * (main flow if ambiguous with inner Subscriptions like flatMap), a Scheduler to * a Worker. These types are not always {@link Scannable}, but this attribute * will convert such raw results to an {@link Scannable#isScanAvailable() unavailable scan} * object in this case. * <p> * {@link Scannable#parents()} can be used to navigate the parent chain. */
public static final Attr<Scannable> PARENT = new Attr<>(null, Scannable::from);
A key that links a Scannable to another Scannable it runs on. Usually exposes a link between an operator/subscriber and its Worker or Scheduler, provided these are Scannable. Will return UNAVAILABLE_SCAN if the supporting execution is not Scannable or NULL_SCAN if the operator doesn't define a specific runtime.
/** * A key that links a {@link Scannable} to another {@link Scannable} it runs on. * Usually exposes a link between an operator/subscriber and its {@link Worker} or * {@link Scheduler}, provided these are {@link Scannable}. Will return * {@link Attr#UNAVAILABLE_SCAN} if the supporting execution is not Scannable or * {@link Attr#NULL_SCAN} if the operator doesn't define a specific runtime. */
public static final Attr<Scannable> RUN_ON = new Attr<>(null, Scannable::from);
Prefetch is an Integer attribute defining the rate of processing in a component which has capacity to request and hold a backlog of data. It usually maps to a component capacity when no arbitrary Attr<T>.CAPACITY is set. Integer.MAX_VALUE signal unlimited capacity and therefore unbounded demand.

Note: This attribute usually resolves to a constant value.

/** * Prefetch is an {@link Integer} attribute defining the rate of processing in a * component which has capacity to request and hold a backlog of data. It * usually maps to a component capacity when no arbitrary {@link #CAPACITY} is * set. {@link Integer#MAX_VALUE} signal unlimited capacity and therefore * unbounded demand. * <p> * Note: This attribute usually resolves to a constant value. */
public static final Attr<Integer> PREFETCH = new Attr<>(0);
A Long attribute exposing the current pending demand of a downstream component. Note that Long.MAX_VALUE indicates an unbounded (push-style) demand as specified in Subscription.request(long).
/** * A {@link Long} attribute exposing the current pending demand of a downstream * component. Note that {@link Long#MAX_VALUE} indicates an unbounded (push-style) * demand as specified in {@link org.reactivestreams.Subscription#request(long)}. */
public static final Attr<Long> REQUESTED_FROM_DOWNSTREAM = new Attr<>(0L);
A Boolean attribute indicating whether or not an upstream component terminated this scanned component. e.g. a post onComplete/onError subscriber. By opposition to Attr<T>.CANCELLED which determines if a downstream component interrupted this scanned component.
/** * A {@link Boolean} attribute indicating whether or not an upstream component * terminated this scanned component. e.g. a post onComplete/onError subscriber. * By opposition to {@link #CANCELLED} which determines if a downstream * component interrupted this scanned component. */
public static final Attr<Boolean> TERMINATED = new Attr<>(false);
A Stream of Tuple2 representing key/value pairs for tagged components. Defaults to null.
/** * A {@link Stream} of {@link reactor.util.function.Tuple2} representing key/value * pairs for tagged components. Defaults to {@literal null}. */
public static final Attr<Stream<Tuple2<String, String>>> TAGS = new Attr<>(null);
Meaningful and always applicable default value for the attribute, returned instead of null when a specific value hasn't been defined for a component. null if no sensible generic default is available.
Returns:the default value applicable to all components or null if none.
/** * Meaningful and always applicable default value for the attribute, returned * instead of {@literal null} when a specific value hasn't been defined for a * component. {@literal null} if no sensible generic default is available. * * @return the default value applicable to all components or null if none. */
@Nullable public T defaultValue(){ return defaultValue; }
Checks if this attribute is capable of safely converting any Object into a T via tryConvert(Object) (potentially returning null or a Null Object for incompatible raw values).
Returns:true if the attribute can safely convert any object, false if it can throw ClassCastException
/** * Checks if this attribute is capable of safely converting any Object into * a {@code T} via {@link #tryConvert(Object)} (potentially returning {@code null} * or a Null Object for incompatible raw values). * @return true if the attribute can safely convert any object, false if it can * throw {@link ClassCastException} */
boolean isConversionSafe() { return safeConverter != null; }
Attempt to convert any Object instance o into a T. By default unsafe attributes will just try forcing a cast, which can lead to ClassCastException. However, attributes for which isConversionSafe() returns true are required to not throw an exception (but rather return null or a Null Object).
Params:
  • o – the instance to attempt conversion on
Returns:the converted instance
/** * Attempt to convert any {@link Object} instance o into a {@code T}. By default * unsafe attributes will just try forcing a cast, which can lead to {@link ClassCastException}. * However, attributes for which {@link #isConversionSafe()} returns true are * required to not throw an exception (but rather return {@code null} or a Null * Object). * * @param o the instance to attempt conversion on * @return the converted instance */
@Nullable T tryConvert(@Nullable Object o) { if (o == null) { return null; } if (safeConverter == null) { @SuppressWarnings("unchecked") T t = (T) o; return t; } return safeConverter.apply(o); } final T defaultValue; final Function<Object, ? extends T> safeConverter; protected Attr(@Nullable T defaultValue){ this(defaultValue, null); } protected Attr(@Nullable T defaultValue, @Nullable Function<Object, ? extends T> safeConverter) { this.defaultValue = defaultValue; this.safeConverter = safeConverter; }
A constant that represents Scannable returned via Scannable.from(Object) when the passed non-null reference is not a Scannable
/** * A constant that represents {@link Scannable} returned via {@link #from(Object)} * when the passed non-null reference is not a {@link Scannable} */
static final Scannable UNAVAILABLE_SCAN = new Scannable() { @Override public Object scanUnsafe(Attr key) { return null; } @Override public boolean isScanAvailable() { return false; } @Override public String toString() { return "UNAVAILABLE_SCAN"; } };
A constant that represents Scannable returned via Scannable.from(Object) when the passed reference is null
/** * A constant that represents {@link Scannable} returned via {@link #from(Object)} * when the passed reference is null */
static final Scannable NULL_SCAN = new Scannable() { @Override public Object scanUnsafe(Attr key) { return null; } @Override public boolean isScanAvailable() { return false; } @Override public String toString() { return "NULL_SCAN"; } }; static Stream<? extends Scannable> recurse(Scannable _s, Attr<Scannable> key){ Scannable s = Scannable.from(_s.scan(key)); if(!s.isScanAvailable()) { return Stream.empty(); } return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<Scannable>() { Scannable c = s; @Override public boolean hasNext() { return c != null && c.isScanAvailable(); } @Override public Scannable next() { Scannable _c = c; c = Scannable.from(c.scan(key)); return _c; } }, 0),false); } }
Attempt to cast the Object to a Scannable. Return Attr.NULL_SCAN if the value is null, or Attr.UNAVAILABLE_SCAN if the value is not a Scannable. Both are constant Scannable that return false on isScanAvailable.
Params:
  • o – a reference to cast
Returns:the casted Scannable, or one of two default Scannable instances that aren't actually scannable (one for nulls, one for non-scannable references)
/** * Attempt to cast the Object to a {@link Scannable}. Return {@link Attr#NULL_SCAN} if * the value is null, or {@link Attr#UNAVAILABLE_SCAN} if the value is not a {@link Scannable}. * Both are constant {@link Scannable} that return false on {@link Scannable#isScanAvailable}. * * @param o a reference to cast * * @return the casted {@link Scannable}, or one of two default {@link Scannable} instances * that aren't actually scannable (one for nulls, one for non-scannable references) */
static Scannable from(@Nullable Object o) { if (o == null) { return Attr.NULL_SCAN; } if (o instanceof Scannable) { return ((Scannable) o); } return Attr.UNAVAILABLE_SCAN; }
Return a Stream navigating the Subscriber chain (downward). The current Scannable is not included.
Returns:a Stream navigating the Subscriber chain (downward, current Scannable not included).
/** * Return a {@link Stream} navigating the {@link org.reactivestreams.Subscriber} * chain (downward). The current {@link Scannable} is not included. * * @return a {@link Stream} navigating the {@link org.reactivestreams.Subscriber} * chain (downward, current {@link Scannable} not included). */
default Stream<? extends Scannable> actuals() { return Attr.recurse(this, Attr.ACTUAL); }
Return a Stream of referenced inners (flatmap, multicast etc)
Returns:a Stream of referenced inners (flatmap, multicast etc)
/** * Return a {@link Stream} of referenced inners (flatmap, multicast etc) * * @return a {@link Stream} of referenced inners (flatmap, multicast etc) */
default Stream<? extends Scannable> inners() { return Stream.empty(); }
Return true whether the component is available for scan(Attr<Object>) resolution.
Returns:true whether the component is available for scan(Attr<Object>) resolution.
/** * Return true whether the component is available for {@link #scan(Attr)} resolution. * * @return true whether the component is available for {@link #scan(Attr)} resolution. */
default boolean isScanAvailable(){ return true; }
Check this Scannable and its parents() for a user-defined name and return the first one that is reachable, or default to this Scannable stepName() if none.
Returns:the name of the first parent that has one defined (including this scannable)
/** * Check this {@link Scannable} and its {@link #parents()} for a user-defined name and * return the first one that is reachable, or default to this {@link Scannable} * {@link #stepName()} if none. * * @return the name of the first parent that has one defined (including this scannable) */
default String name() { String thisName = this.scan(Attr.NAME); if (thisName != null) { return thisName; } return parents() .map(s -> s.scan(Attr.NAME)) .filter(Objects::nonNull) .findFirst() .orElse(stepName()); }
Return a meaningful String representation of this Scannable in its chain of parents() and actuals().
/** * Return a meaningful {@link String} representation of this {@link Scannable} in * its chain of {@link #parents()} and {@link #actuals()}. */
default String stepName() { // /!\ this code is duplicated in `InnerConsumer#stepName` in order to use simple class name instead of toString /* * Strip an operator name of various prefixes and suffixes. * @param name the operator name, usually simpleClassName or fully-qualified classname. * @return the stripped operator name */ String name = toString(); if (name.contains("@") && name.contains("$")) { name = name .substring(0, name.indexOf('$')) .substring(name.lastIndexOf('.') + 1); } String stripped = OPERATOR_NAME_UNRELATED_WORDS_PATTERN .matcher(name) .replaceAll(""); if (!stripped.isEmpty()) { return stripped.substring(0, 1).toLowerCase() + stripped.substring(1); } return stripped; }
List the step names in the chain of Scannable (including the current element), in their assembly order. This traverses the chain of Scannable both upstream (parents()) and downstream (actuals()).
  1. if the current Scannable is a Subscriber, the chain can reach down to the final subscriber, provided it is Scannable (eg. lambda subscriber)
  2. if it is an operator the chain can reach up to the source, if it is a Reactor source (that is Scannable).
Returns:a Stream of stepName() for each discovered step in the Scannable chain
/** * List the step names in the chain of {@link Scannable} (including the current element), * in their assembly order. This traverses the chain of {@link Scannable} both upstream * ({@link #parents()}) and downstream ({@link #actuals()}). * <ol> * <li>if the current Scannable is a {@link Subscriber}, the chain can reach down to * the final subscriber, provided it is {@link Scannable} (eg. lambda subscriber)</li> * <li>if it is an operator the chain can reach up to the source, if it is a Reactor * source (that is {@link Scannable}).</li> * </ol> * * @return a {@link Stream} of {@link #stepName()} for each discovered step in the {@link Scannable} chain */
default Stream<String> steps() { List<Scannable> chain = new ArrayList<>(); chain.addAll(parents().collect(Collectors.toList())); Collections.reverse(chain); chain.add(this); chain.addAll(actuals().collect(Collectors.toList())); List<String> chainNames = new ArrayList<>(chain.size()); for (int i = 0; i < chain.size(); i++) { Scannable step = chain.get(i); Scannable stepAfter = null; if (i < chain.size() - 1) { stepAfter = chain.get(i + 1); } //noinspection ConstantConditions if (stepAfter != null && stepAfter.scan(Attr.ACTUAL_METADATA)) { chainNames.add(stepAfter.stepName()); i++; } else { chainNames.add(step.stepName()); } } return chainNames.stream(); }
Return a Stream navigating the Subscription chain (upward). The current Scannable is not included.
Returns:a Stream navigating the Subscription chain (upward, current Scannable not included).
/** * Return a {@link Stream} navigating the {@link org.reactivestreams.Subscription} * chain (upward). The current {@link Scannable} is not included. * * @return a {@link Stream} navigating the {@link org.reactivestreams.Subscription} * chain (upward, current {@link Scannable} not included). */
default Stream<? extends Scannable> parents() { return Attr.recurse(this, Attr.PARENT); }
This method is used internally by components to define their key-value mappings in a single place. Although it is ignoring the generic type of the Attr key, implementors should take care to return values of the correct type, and return null if no specific value is available.

For public consumption of attributes, prefer using scan(Attr<Object>), which will return a typed value and fall back to the key's default if the component didn't define any mapping.

Params:
  • key – a Attr to resolve for the component.
Returns:the value associated to the key for that specific component, or null if none.
/** * This method is used internally by components to define their key-value mappings * in a single place. Although it is ignoring the generic type of the {@link Attr} key, * implementors should take care to return values of the correct type, and return * {@literal null} if no specific value is available. * <p> * For public consumption of attributes, prefer using {@link #scan(Attr)}, which will * return a typed value and fall back to the key's default if the component didn't * define any mapping. * * @param key a {@link Attr} to resolve for the component. * @return the value associated to the key for that specific component, or null if none. */
@Nullable Object scanUnsafe(Attr key);
Introspect a component's specific state attribute, returning an associated value specific to that component, or the default value associated with the key, or null if the attribute doesn't make sense for that particular component and has no sensible default.
Params:
  • key – a Attr to resolve for the component.
Returns:a value associated to the key or null if unmatched or unresolved
/** * Introspect a component's specific state {@link Attr attribute}, returning an * associated value specific to that component, or the default value associated with * the key, or null if the attribute doesn't make sense for that particular component * and has no sensible default. * * @param key a {@link Attr} to resolve for the component. * @return a value associated to the key or null if unmatched or unresolved * */
@Nullable default <T> T scan(Attr<T> key) { //note tryConvert will just plain cast most of the time //except e.g. for Attr<Scannable> T value = key.tryConvert(scanUnsafe(key)); if (value == null) return key.defaultValue(); return value; }
Introspect a component's specific state attribute. If there's no specific value in the component for that key, fall back to returning the provided non null default.
Params:
  • key – a Attr to resolve for the component.
  • defaultValue – a fallback value if key resolve to null
Returns:a value associated to the key or the provided default if unmatched or unresolved
/** * Introspect a component's specific state {@link Attr attribute}. If there's no * specific value in the component for that key, fall back to returning the * provided non null default. * * @param key a {@link Attr} to resolve for the component. * @param defaultValue a fallback value if key resolve to {@literal null} * * @return a value associated to the key or the provided default if unmatched or unresolved */
default <T> T scanOrDefault(Attr<T> key, T defaultValue) { T v; //note tryConvert will just plain cast most of the time //except e.g. for Attr<Scannable> v = key.tryConvert(scanUnsafe(key)); if (v == null) { return Objects.requireNonNull(defaultValue, "defaultValue"); } return v; }
Visit this Scannable and its parents() and stream all the observed tags
Returns:the stream of tags for this Scannable and its parents
/** * Visit this {@link Scannable} and its {@link #parents()} and stream all the * observed tags * * @return the stream of tags for this {@link Scannable} and its parents */
default Stream<Tuple2<String, String>> tags() { Stream<Tuple2<String, String>> parentTags = parents().flatMap(s -> s.scan(Attr.TAGS)); Stream<Tuple2<String, String>> thisTags = scan(Attr.TAGS); if (thisTags == null) { return parentTags; } return Stream.concat( thisTags, parentTags ); } }