/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;


An unbounded Java Lambda adapter to Subscriber
Type parameters:
  • <T> – the value type
/** * An unbounded Java Lambda adapter to {@link Subscriber} * * @param <T> the value type */
final class LambdaSubscriber<T> implements InnerConsumer<T>, Disposable { final Consumer<? super T> consumer; final Consumer<? super Throwable> errorConsumer; final Runnable completeConsumer; final Consumer<? super Subscription> subscriptionConsumer; volatile Subscription subscription; static final AtomicReferenceFieldUpdater<LambdaSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(LambdaSubscriber.class, Subscription.class, "subscription");
Create a Subscriber reacting onNext, onError and onComplete. The subscriber will automatically request Long.MAX_VALUE onSubscribe.

The argument subscriptionHandler is executed once by new subscriber to generate a context shared by every request calls.

Params:
  • consumer – A Consumer with argument onNext data
  • errorConsumer – A Consumer called onError
  • completeConsumer – A Runnable called onComplete with the actual context if any
  • subscriptionConsumer – A Consumer called with the Subscription to perform initial request, or null to request max
/** * Create a {@link Subscriber} reacting onNext, onError and onComplete. The subscriber * will automatically request Long.MAX_VALUE onSubscribe. * <p> * The argument {@code subscriptionHandler} is executed once by new subscriber to * generate a context shared by every request calls. * * @param consumer A {@link Consumer} with argument onNext data * @param errorConsumer A {@link Consumer} called onError * @param completeConsumer A {@link Runnable} called onComplete with the actual * context if any * @param subscriptionConsumer A {@link Consumer} called with the {@link Subscription} * to perform initial request, or null to request max */
LambdaSubscriber( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; } @Override public final void onSubscribe(Subscription s) { if (Operators.validate(subscription, s)) { this.subscription = s; if (subscriptionConsumer != null) { try { subscriptionConsumer.accept(s); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); onError(t); } } else { s.request(Long.MAX_VALUE); } } } @Override public final void onComplete() { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { return; } if (completeConsumer != null) { try { completeConsumer.run(); } catch (Throwable t) { Exceptions.throwIfFatal(t); onError(t); } } } @Override public final void onError(Throwable t) { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onErrorDropped(t, Context.empty()); return; } if (errorConsumer != null) { errorConsumer.accept(t); } else { throw Exceptions.errorCallbackNotImplemented(t); } } @Override public final void onNext(T x) { try { if (consumer != null) { consumer.accept(x); } } catch (Throwable t) { Exceptions.throwIfFatal(t); this.subscription.cancel(); onError(t); } } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return subscription; if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.TERMINATED || key == Attr.CANCELLED) return isDisposed(); return null; } @Override public boolean isDisposed() { return subscription == Operators.cancelledSubscription(); } @Override public void dispose() { Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); if (s != null && s != Operators.cancelledSubscription()) { s.cancel(); } } }