/*
 * Copyright 2002-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.http.server.reactive;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.logging.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import org.springframework.core.log.LogDelegateFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Abstract base class for Processor implementations that bridge between event-listener write APIs and Reactive Streams.

Specifically a base class for writing to the HTTP response body with Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket messages through the Java WebSocket API (JSR-356), Jetty, and Undertow.

Author:Arjen Poutsma, Violeta Georgieva, Rossen Stoyanchev
Type parameters:
  • <T> – the type of element signaled to the Subscriber
Since:5.0
/** * Abstract base class for {@code Processor} implementations that bridge between * event-listener write APIs and Reactive Streams. * * <p>Specifically a base class for writing to the HTTP response body with * Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket * messages through the Java WebSocket API (JSR-356), Jetty, and Undertow. * * @author Arjen Poutsma * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 * @param <T> the type of element signaled to the {@link Subscriber} */
public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
Special logger for debugging Reactive Streams signals.
See Also:
/** * Special logger for debugging Reactive Streams signals. * @see LogDelegateFactory#getHiddenLog(Class) * @see AbstractListenerReadPublisher#rsReadLogger * @see AbstractListenerWriteFlushProcessor#rsWriteFlushLogger * @see WriteResultPublisher#rsWriteResultLogger */
protected static final Log rsWriteLogger = LogDelegateFactory.getHiddenLog(AbstractListenerWriteProcessor.class); private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED); @Nullable private Subscription subscription; @Nullable private volatile T currentData; /* Indicates "onComplete" was received during the (last) write. */ private volatile boolean subscriberCompleted;
Indicates we're waiting for one last isReady-onWritePossible cycle after "onComplete" because some Servlet containers expect this to take place prior to calling AsyncContext.complete(). See https://github.com/eclipse-ee4j/servlet-api/issues/273
/** * Indicates we're waiting for one last isReady-onWritePossible cycle * after "onComplete" because some Servlet containers expect this to take * place prior to calling AsyncContext.complete(). * See https://github.com/eclipse-ee4j/servlet-api/issues/273 */
private volatile boolean readyToCompleteAfterLastWrite; private final WriteResultPublisher resultPublisher; private final String logPrefix; public AbstractListenerWriteProcessor() { this(""); }
Create an instance with the given log prefix.
Since:5.1
/** * Create an instance with the given log prefix. * @since 5.1 */
public AbstractListenerWriteProcessor(String logPrefix) { this.logPrefix = (StringUtils.hasText(logPrefix) ? logPrefix : ""); this.resultPublisher = new WriteResultPublisher(logPrefix); }
Get the configured log prefix.
Since:5.1
/** * Get the configured log prefix. * @since 5.1 */
public String getLogPrefix() { return this.logPrefix; } // Subscriber methods and async I/O notification methods... @Override public final void onSubscribe(Subscription subscription) { this.state.get().onSubscribe(this, subscription); } @Override public final void onNext(T data) { if (rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + "Item to write"); } this.state.get().onNext(this, data); }
Error signal from the upstream, write Publisher. This is also used by sub-classes to delegate error notifications from the container.
/** * Error signal from the upstream, write Publisher. This is also used by * sub-classes to delegate error notifications from the container. */
@Override public final void onError(Throwable ex) { if (rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + "Write source error: " + ex); } this.state.get().onError(this, ex); }
Completion signal from the upstream, write Publisher. This is also used by sub-classes to delegate completion notifications from the container.
/** * Completion signal from the upstream, write Publisher. This is also used * by sub-classes to delegate completion notifications from the container. */
@Override public final void onComplete() { if (rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + "No more items to write"); } this.state.get().onComplete(this); }
Invoked when writing is possible, either in the same thread after a check via isWritePossible(), or as a callback from the underlying container.
/** * Invoked when writing is possible, either in the same thread after a check * via {@link #isWritePossible()}, or as a callback from the underlying * container. */
public final void onWritePossible() { if (rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + "onWritePossible"); } this.state.get().onWritePossible(this); }
Invoked during an error or completion callback from the underlying container to cancel the upstream subscription.
/** * Invoked during an error or completion callback from the underlying * container to cancel the upstream subscription. */
public void cancel() { rsWriteLogger.trace(getLogPrefix() + "Cancellation"); if (this.subscription != null) { this.subscription.cancel(); } } // Publisher implementation for result notifications... @Override public final void subscribe(Subscriber<? super Void> subscriber) { // Technically, cancellation from the result subscriber should be propagated // to the upstream subscription. In practice, HttpHandler server adapters // don't have a reason to cancel the result subscription. this.resultPublisher.subscribe(subscriber); } // Write API methods to be implemented or template methods to override...
Whether the given data item has any content to write. If false the item is not written.
/** * Whether the given data item has any content to write. * If false the item is not written. */
protected abstract boolean isDataEmpty(T data);
Template method invoked after a data item to write is received via onNext.onNext(Object). The default implementation saves the data item for writing once that is possible.
/** * Template method invoked after a data item to write is received via * {@link Subscriber#onNext(Object)}. The default implementation saves the * data item for writing once that is possible. */
protected void dataReceived(T data) { T prev = this.currentData; if (prev != null) { // This shouldn't happen: // 1. dataReceived can only be called from REQUESTED state // 2. currentData is cleared before requesting discardData(data); cancel(); onError(new IllegalStateException("Received new data while current not processed yet.")); } this.currentData = data; }
Whether writing is possible.
/** * Whether writing is possible. */
protected abstract boolean isWritePossible();
Write the given item.

Note: Sub-classes are responsible for releasing any data buffer associated with the item, once fully written, if pooled buffers apply to the underlying container.

Params:
  • data – the item to write
Returns:true if the current data item was written completely and a new item requested, or false if it was written partially and we'll need more write callbacks before it is fully written
/** * Write the given item. * <p><strong>Note:</strong> Sub-classes are responsible for releasing any * data buffer associated with the item, once fully written, if pooled * buffers apply to the underlying container. * @param data the item to write * @return {@code true} if the current data item was written completely and * a new item requested, or {@code false} if it was written partially and * we'll need more write callbacks before it is fully written */
protected abstract boolean write(T data) throws IOException;
Invoked after the current data has been written and before requesting the next item from the upstream, write Publisher.

The default implementation is a no-op.

Deprecated:originally introduced for Undertow to stop write notifications when no data is available, but deprecated as of as of 5.0.6 since constant switching on every requested item causes a significant slowdown.
/** * Invoked after the current data has been written and before requesting * the next item from the upstream, write Publisher. * <p>The default implementation is a no-op. * @deprecated originally introduced for Undertow to stop write notifications * when no data is available, but deprecated as of as of 5.0.6 since constant * switching on every requested item causes a significant slowdown. */
@Deprecated protected void writingPaused() { }
Invoked after onComplete or onError notification.

The default implementation is a no-op.

/** * Invoked after onComplete or onError notification. * <p>The default implementation is a no-op. */
protected void writingComplete() { }
Invoked when an I/O error occurs during a write. Sub-classes may choose to ignore this if they know the underlying API will provide an error notification in a container thread.

Defaults to no-op.

/** * Invoked when an I/O error occurs during a write. Sub-classes may choose * to ignore this if they know the underlying API will provide an error * notification in a container thread. * <p>Defaults to no-op. */
protected void writingFailed(Throwable ex) { }
Invoked after any error (either from the upstream write Publisher, or from I/O operations to the underlying server) and cancellation to discard in-flight data that was in the process of being written when the error took place.
Params:
  • data – the data to be released
Since:5.0.11
/** * Invoked after any error (either from the upstream write Publisher, or * from I/O operations to the underlying server) and cancellation * to discard in-flight data that was in * the process of being written when the error took place. * @param data the data to be released * @since 5.0.11 */
protected abstract void discardData(T data); // Private methods for use from State's... private boolean changeState(State oldState, State newState) { boolean result = this.state.compareAndSet(oldState, newState); if (result && rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + oldState + " -> " + newState); } return result; } private void changeStateToReceived(State oldState) { if (changeState(oldState, State.RECEIVED)) { writeIfPossible(); } } private void changeStateToComplete(State oldState) { if (changeState(oldState, State.COMPLETED)) { discardCurrentData(); writingComplete(); this.resultPublisher.publishComplete(); } else { this.state.get().onComplete(this); } } private void writeIfPossible() { boolean result = isWritePossible(); if (!result && rsWriteLogger.isTraceEnabled()) { rsWriteLogger.trace(getLogPrefix() + "isWritePossible: false"); } if (result) { onWritePossible(); } } private void discardCurrentData() { T data = this.currentData; this.currentData = null; if (data != null) { discardData(data); } }
Represents a state for the Processor to be in.

       UNSUBSCRIBED
            |
            v
  +--- REQUESTED -------------> RECEIVED ---+
  |        ^                       ^        |
  |        |                       |        |
  |        + ------ WRITING <------+        |
  |                    |                    |
  |                    v                    |
  +--------------> COMPLETED <--------------+
/** * Represents a state for the {@link Processor} to be in. * * <p><pre> * UNSUBSCRIBED * | * v * +--- REQUESTED -------------> RECEIVED ---+ * | ^ ^ | * | | | | * | + ------ WRITING <------+ | * | | | * | v | * +--------------> COMPLETED <--------------+ * </pre> */
private enum State { UNSUBSCRIBED { @Override public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) { Assert.notNull(subscription, "Subscription must not be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; subscription.request(1); } else { super.onSubscribe(processor, subscription); } } @Override public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { // This can happen on (very early) completion notification from container.. processor.changeStateToComplete(this); } }, REQUESTED { @Override public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { if (processor.isDataEmpty(data)) { Assert.state(processor.subscription != null, "No subscription"); processor.subscription.request(1); } else { processor.dataReceived(data); processor.changeStateToReceived(this); } } @Override public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { processor.readyToCompleteAfterLastWrite = true; processor.changeStateToReceived(this); } }, RECEIVED { @SuppressWarnings("deprecation") @Override public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) { if (processor.readyToCompleteAfterLastWrite) { processor.changeStateToComplete(RECEIVED); } else if (processor.changeState(this, WRITING)) { T data = processor.currentData; Assert.state(data != null, "No data"); try { if (processor.write(data)) { if (processor.changeState(WRITING, REQUESTED)) { processor.currentData = null; if (processor.subscriberCompleted) { processor.readyToCompleteAfterLastWrite = true; processor.changeStateToReceived(REQUESTED); } else { processor.writingPaused(); Assert.state(processor.subscription != null, "No subscription"); processor.subscription.request(1); } } } else { processor.changeStateToReceived(WRITING); } } catch (IOException ex) { processor.writingFailed(ex); } } } @Override public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { processor.subscriberCompleted = true; // A competing write might have completed very quickly if (processor.state.get().equals(State.REQUESTED)) { processor.changeStateToComplete(State.REQUESTED); } } }, WRITING { @Override public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { processor.subscriberCompleted = true; // A competing write might have completed very quickly if (processor.state.get().equals(State.REQUESTED)) { processor.changeStateToComplete(State.REQUESTED); } } }, COMPLETED { @Override public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { // ignore } @Override public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) { // ignore } @Override public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { // ignore } }; public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) { subscription.cancel(); } public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { processor.discardData(data); processor.cancel(); processor.onError(new IllegalStateException("Illegal onNext without demand")); } public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { processor.discardCurrentData(); processor.writingComplete(); processor.resultPublisher.publishError(ex); } else { processor.state.get().onError(processor, ex); } } public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { throw new IllegalStateException(toString()); } public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) { // ignore } } }