/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Author: Simon Baslé, David Karnok
/**
* @author Simon Baslé
* @author David Karnok
*/
final class FluxLimitRequest<T> extends InternalFluxOperator<T, T> {
final long cap;
FluxLimitRequest(Flux<T> flux, long cap) {
super(flux);
this.cap = cap;
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxLimitRequestSubscriber<>(actual, this.cap);
}
@Override
public int getPrefetch() {
return 0;
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return cap;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
//FluxOperator defines PREFETCH and PARENT
return super.scanUnsafe(key);
}
static class FluxLimitRequestSubscriber<T> implements InnerOperator<T, T> {
final CoreSubscriber<? super T> actual;
Subscription parent;
long toProduce;
volatile long requestRemaining;
static final AtomicLongFieldUpdater<FluxLimitRequestSubscriber> REQUEST_REMAINING =
AtomicLongFieldUpdater.newUpdater(FluxLimitRequestSubscriber.class, "requestRemaining");
FluxLimitRequestSubscriber(CoreSubscriber<? super T> actual, long cap) {
this.actual = actual;
this.toProduce = cap;
this.requestRemaining = cap;
}
@Override
public CoreSubscriber<? super T> actual() {
return this.actual;
}
@Override
public void onNext(T t) {
long r = toProduce;
if (r > 0L) {
toProduce = --r;
actual.onNext(t);
if (r == 0) {
parent.cancel();
actual.onComplete();
}
}
}
@Override
public void onError(Throwable throwable) {
if (toProduce != 0L) {
toProduce = 0L;
actual.onError(throwable);
}
}
@Override
public void onComplete() {
if (toProduce != 0L) {
toProduce = 0L;
actual.onComplete();
}
}
@Override
public void onSubscribe(Subscription s) {
parent = s;
actual.onSubscribe(this);
}
@Override
public void request(long l) {
for (;;) {
long r = requestRemaining;
long newRequest;
if (r <= l) {
newRequest = r;
} else {
newRequest = l;
}
long u = r - newRequest;
if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
if (newRequest != 0) {
parent.request(newRequest);
}
break;
}
}
}
@Override
public void cancel() {
parent.cancel();
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return parent;
if (key == Attr.TERMINATED) return toProduce == 0L;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
//InnerOperator defines ACTUAL
return InnerOperator.super.scanUnsafe(key);
}
}
}