/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

Author:David Karnok
/** * @author David Karnok */
final class FluxWindowTimeout<T> extends InternalFluxOperator<T, Flux<T>> { final int maxSize; final long timespan; final TimeUnit unit; final Scheduler timer; FluxWindowTimeout(Flux<T> source, int maxSize, long timespan, TimeUnit unit, Scheduler timer) { super(source); if (timespan <= 0) { throw new IllegalArgumentException("Timeout period must be strictly positive"); } if (maxSize <= 0) { throw new IllegalArgumentException("maxSize must be strictly positive"); } this.timer = Objects.requireNonNull(timer, "Timer"); this.timespan = timespan; this.unit = Objects.requireNonNull(unit, "unit"); this.maxSize = maxSize; } @Override public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super Flux<T>> actual) { return new WindowTimeoutSubscriber<>(actual, maxSize, timespan, unit, timer); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.RUN_ON) return timer; if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC; return super.scanUnsafe(key); } static final class WindowTimeoutSubscriber<T> implements InnerOperator<T, Flux<T>> { final CoreSubscriber<? super Flux<T>> actual; final long timespan; final TimeUnit unit; final Scheduler scheduler; final int maxSize; final Scheduler.Worker worker; final Queue<Object> queue; Throwable error; volatile boolean done; volatile boolean cancelled; volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<WindowTimeoutSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, "requested"); volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<WindowTimeoutSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, "wip"); int count; long producerIndex; Subscription s; Sinks.Many<T> window; volatile boolean terminated; volatile Disposable timer; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<WindowTimeoutSubscriber, Disposable> TIMER = AtomicReferenceFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, Disposable.class, "timer"); WindowTimeoutSubscriber(CoreSubscriber<? super Flux<T>> actual, int maxSize, long timespan, TimeUnit unit, Scheduler scheduler) { this.actual = actual; this.queue = Queues.unboundedMultiproducer().get(); this.timespan = timespan; this.unit = unit; this.scheduler = scheduler; this.maxSize = maxSize; this.worker = scheduler.createWorker(); } @Override public CoreSubscriber<? super Flux<T>> actual() { return actual; } @Override public Stream<? extends Scannable> inners() { Sinks.Many<T> w = window; return w == null ? Stream.empty() : Stream.of(Scannable.from(w)); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return s; if (key == Attr.CANCELLED) return cancelled; if (key == Attr.TERMINATED) return done; if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.CAPACITY) return maxSize; if (key == Attr.BUFFERED) return queue.size(); if (key == Attr.RUN_ON) return worker; if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC; return InnerOperator.super.scanUnsafe(key); } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; Subscriber<? super Flux<T>> a = actual; a.onSubscribe(this); if (cancelled) { return; } Sinks.Many<T> w = Sinks.unsafe().many().unicast().onBackpressureBuffer(); window = w; long r = requested; if (r != 0L) { a.onNext(w.asFlux()); if (r != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } } else { a.onError(Operators.onOperatorError(s, Exceptions.failWithOverflow(), actual.currentContext())); return; } if (OperatorDisposables.replace(TIMER, this, newPeriod())) { s.request(Long.MAX_VALUE); } } } Disposable newPeriod() { try { return worker.schedulePeriodically(new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); } catch (Exception e) { actual.onError(Operators.onRejectedExecution(e, s, null, null, actual.currentContext())); return Disposables.disposed(); } } @Override public void onNext(T t) { if (terminated) { return; } if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) { Sinks.Many<T> w = window; w.emitNext(t, Sinks.EmitFailureHandler.FAIL_FAST); int c = count + 1; if (c >= maxSize) { producerIndex++; count = 0; w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); long r = requested; if (r != 0L) { w = Sinks.unsafe().many().unicast().onBackpressureBuffer(); window = w; actual.onNext(w.asFlux()); if (r != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } Disposable tm = timer; tm.dispose(); Disposable task = newPeriod(); if (!TIMER.compareAndSet(this, tm, task)) { task.dispose(); } } else { window = null; actual.onError(Operators.onOperatorError(s, Exceptions.failWithOverflow(), t, actual .currentContext())); timer.dispose(); worker.dispose(); return; } } else { count = c; } if (WIP.decrementAndGet(this) == 0) { return; } } else { queue.offer(t); if (!enter()) { return; } } drainLoop(); } @Override public void onError(Throwable t) { error = t; done = true; if (enter()) { drainLoop(); } actual.onError(t); timer.dispose(); worker.dispose(); } @Override public void onComplete() { done = true; if (enter()) { drainLoop(); } actual.onComplete(); timer.dispose(); worker.dispose(); } @Override public void request(long n) { if(Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); } } @Override public void cancel() { cancelled = true; } @SuppressWarnings("unchecked") void drainLoop() { final Queue<Object> q = queue; final Subscriber<? super Flux<T>> a = actual; Sinks.Many<T> w = window; int missed = 1; for (; ; ) { for (; ; ) { if (terminated) { s.cancel(); q.clear(); timer.dispose(); worker.dispose(); return; } boolean d = done; Object o = q.poll(); boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; if (d && (empty || isHolder)) { window = null; q.clear(); Throwable err = error; if (err != null) { w.emitError(err, Sinks.EmitFailureHandler.FAIL_FAST); } else { w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } timer.dispose(); worker.dispose(); return; } if (empty) { break; } if (isHolder) { w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); count = 0; w = Sinks.unsafe().many().unicast().onBackpressureBuffer(); window = w; long r = requested; if (r != 0L) { a.onNext(w.asFlux()); if (r != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } } else { window = null; queue.clear(); a.onError(Operators.onOperatorError(s, Exceptions.failWithOverflow(), actual.currentContext())); timer.dispose(); worker.dispose(); return; } continue; } w.emitNext((T) o, Sinks.EmitFailureHandler.FAIL_FAST); int c = count + 1; if (c >= maxSize) { producerIndex++; count = 0; w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); long r = requested; if (r != 0L) { w = Sinks.unsafe().many().unicast().onBackpressureBuffer(); window = w; actual.onNext(w.asFlux()); if (r != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } Disposable tm = timer; tm.dispose(); Disposable task = newPeriod(); if (!TIMER.compareAndSet(this, tm, task)) { task.dispose(); } } else { window = null; a.onError(Operators.onOperatorError(s, Exceptions.failWithOverflow(), o, actual .currentContext())); timer.dispose(); worker.dispose(); return; } } else { count = c; } } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } boolean enter() { return WIP.getAndIncrement(this) == 0; } static final class ConsumerIndexHolder implements Runnable { final long index; final WindowTimeoutSubscriber<?> parent; ConsumerIndexHolder(long index, WindowTimeoutSubscriber<?> parent) { this.index = index; this.parent = parent; } @Override public void run() { WindowTimeoutSubscriber<?> p = parent; if (!p.cancelled) { p.queue.offer(this); } else { p.terminated = true; p.timer.dispose(); p.worker.dispose(); } if (p.enter()) { p.drainLoop(); } } } } }