/*
 * Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package javax.ws.rs.sse;

import javax.ws.rs.client.WebTarget;
import java.net.URL;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Client for reading and processing incoming Server-Sent Events.

SSE event source instances of this class are thread safe. To build a new instance, you can use the SseEventSource.target(endpoint) factory method to get a new event source builder that can be further customised and eventually used to create a new SSE event source.

Once a SseEventSource is created, it can be used to open a connection to the associated web target. After establishing the connection, the event source starts processing any incoming inbound events. Whenever a new event is received, an Consumer#accept(InboundSseEvent) method is invoked on any registered event consumers.

Reconnect support

The SseEventSource supports automated recuperation from a connection loss, including negotiation of delivery of any missed events based on the last received SSE event id field value, provided this field is set by the server and the negotiation facility is supported by the server. In case of a connection loss, the last received SSE event id field value is send in the "Last-Event-ID" HTTP request header as part of a new connection request sent to the SSE endpoint. Upon a receipt of such reconnect request, the SSE endpoint that supports this negotiation facility is expected to replay all missed events. Note however, that this is a best-effort mechanism which does not provide any guaranty that all events would be delivered without a loss. You should therefore not rely on receiving every single event and design your client application code accordingly.

By default, when a connection the the SSE endpoint is lost, the event source will wait 500 ms before attempting to reconnect to the SSE endpoint. The SSE endpoint can however control the client-side retry delay by including a special retry field value in the any send event. SseEventSource tracks any received SSE event retry field values set by the endpoint and adjusts the reconnect delay accordingly, using the last received retry field value as the reconnect delay.

In addition to handling the standard connection loss failures, SseEventSource automatically deals with any HTTP 503 Service Unavailable responses from an SSE endpoint, that contain a "Retry-After" HTTP header with a valid value. The HTTP 503 + "Retry-After" technique is often used by HTTP endpoints as a means of connection and traffic throttling. In case a HTTP 503 + "Retry-After" response is received in return to a connection request, SSE event source will automatically schedule a new reconnect attempt and use the received "Retry-After" HTTP header value as a one-time override of the reconnect delay.

Author:Marek Potociar
Since:2.1
/** * Client for reading and processing {@link InboundSseEvent incoming Server-Sent Events}. * <p> * SSE event source instances of this class are thread safe. To build a new instance, you can use the * {@link #target(javax.ws.rs.client.WebTarget) SseEventSource.target(endpoint)} factory method to get * a new event source builder that can be further customised and eventually used to create a new SSE * event source. * <p> * Once a {@link SseEventSource} is created, it can be used to {@link #open open a connection} * to the associated {@link WebTarget web target}. After establishing the connection, the event source starts * processing any incoming inbound events. Whenever a new event is received, an * {@link Consumer#accept(Object) Consumer<InboundSseEvent>#accept(InboundSseEvent)} method is invoked on any registered event consumers. * <h3>Reconnect support</h3> * <p> * The {@code SseEventSource} supports automated recuperation from a connection loss, including * negotiation of delivery of any missed events based on the last received SSE event {@code id} field value, provided * this field is set by the server and the negotiation facility is supported by the server. In case of a connection loss, * the last received SSE event {@code id} field value is send in the * <code>{@value javax.ws.rs.core.HttpHeaders#LAST_EVENT_ID_HEADER}</code> HTTP * request header as part of a new connection request sent to the SSE endpoint. Upon a receipt of such reconnect request, the SSE * endpoint that supports this negotiation facility is expected to replay all missed events. Note however, that this is a * best-effort mechanism which does not provide any guaranty that all events would be delivered without a loss. You should * therefore not rely on receiving every single event and design your client application code accordingly. * <p> * By default, when a connection the the SSE endpoint is lost, the event source will wait 500&nbsp;ms * before attempting to reconnect to the SSE endpoint. The SSE endpoint can however control the client-side retry delay * by including a special {@code retry} field value in the any send event. {@code SseEventSource} tracks any * received SSE event {@code retry} field values set by the endpoint and adjusts the reconnect delay accordingly, * using the last received {@code retry} field value as the reconnect delay. * <p> * In addition to handling the standard connection loss failures, {@code SseEventSource} automatically deals with any * {@code HTTP 503 Service Unavailable} responses from an SSE endpoint, that contain a * <code>{@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER}</code> HTTP header with a valid value. The * <code>HTTP 503 + {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER}</code> technique is often used by HTTP endpoints * as a means of connection and traffic throttling. * In case a <code>HTTP 503 + {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER}</code> response is received in return to a connection * request, SSE event source will automatically schedule a new reconnect attempt and use the received * <code>{@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER}</code> HTTP header value as a one-time override of the reconnect delay. * * @author Marek Potociar * @since 2.1 */
public interface SseEventSource extends AutoCloseable {
SseEventSource builder class.

Event source builder provides methods that let you conveniently configure and subsequently build a new SseEventSource instance. You can obtain a new event source builder instance using a static SseEventSource.target(endpoint) factory method.

For example:

SseEventSource es = SseEventSource.target(endpoint)
                            .reconnectingEvery(5, SECONDS)
                            .build();
es.register(System.out::println);
es.open();
/** * {@link SseEventSource} builder class. * <p> * Event source builder provides methods that let you conveniently configure and subsequently build * a new {@code SseEventSource} instance. You can obtain a new event source builder instance using * a static {@link SseEventSource#target(javax.ws.rs.client.WebTarget) SseEventSource.target(endpoint)} factory method. * <p> * For example: * <pre> * SseEventSource es = SseEventSource.target(endpoint) * .reconnectingEvery(5, SECONDS) * .build(); * es.register(System.out::println); * es.open(); * </pre> */
abstract class Builder {
Name of the property identifying the Builder implementation to be returned from newBuilder().
/** * Name of the property identifying the {@link SseEventSource.Builder} implementation * to be returned from {@link SseEventSource.Builder#newBuilder()}. */
public static final String JAXRS_DEFAULT_SSE_BUILDER_PROPERTY = "javax.ws.rs.sse.SseEventSource.Builder";
Default SSE event source builder implementation class name.
/** * Default SSE event source builder implementation class name. */
private static final String JAXRS_DEFAULT_SSE_BUILDER = "org.glassfish.jersey.media.sse.internal.JerseySseEventSource$Builder";
Allows custom implementations to extend the SSE event source builder class.
/** * Allows custom implementations to extend the SSE event source builder class. */
protected Builder() { }
Create a new SSE event source instance using the default implementation class provided by the implementation provider.
Returns:new SSE event source builder instance.
/** * Create a new SSE event source instance using the default implementation class provided by the * implementation provider. * * @return new SSE event source builder instance. */
static Builder newBuilder() { try { Object delegate = FactoryFinder.find(JAXRS_DEFAULT_SSE_BUILDER_PROPERTY, JAXRS_DEFAULT_SSE_BUILDER, SseEventSource.Builder.class); if (!(delegate instanceof Builder)) { Class pClass = Builder.class; String classnameAsResource = pClass.getName().replace('.', '/') + ".class"; ClassLoader loader = pClass.getClassLoader(); if (loader == null) { loader = ClassLoader.getSystemClassLoader(); } URL targetTypeURL = loader.getResource(classnameAsResource); throw new LinkageError("ClassCastException: attempting to cast" + delegate.getClass().getClassLoader().getResource(classnameAsResource) + " to " + targetTypeURL); } return (Builder) delegate; } catch (Exception ex) { throw new RuntimeException(ex); } } protected abstract Builder target(WebTarget endpoint);
Set the initial reconnect delay to be used by the event source.

Note that this value may be later overridden by the SSE endpoint using either a retry SSE event field or HTTP 503 + "Retry-After" mechanism as described in the SseEventSource javadoc.

Params:
  • delay – the default time to wait before attempting to recover from a connection loss.
  • unit – time unit of the reconnect delay parameter.
Returns:updated event source builder instance.
/** * Set the initial reconnect delay to be used by the event source. * <p> * Note that this value may be later overridden by the SSE endpoint using either a {@code retry} SSE event field * or <code>HTTP 503 + {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER}</code> mechanism as described * in the {@link SseEventSource} javadoc. * * @param delay the default time to wait before attempting to recover from a connection loss. * @param unit time unit of the reconnect delay parameter. * @return updated event source builder instance. */
public abstract Builder reconnectingEvery(long delay, TimeUnit unit);
Build new SSE event source pointing at a SSE streaming web target.

The returned event source is ready, but not connected to the SSE endpoint. It is expected that you will manually invoke its SseEventSource.open() method once you are ready to start receiving SSE events. In case you want to build an event source instance that is already connected to the SSE endpoint, use the event source builder SseEventSource.open() method instead.

Once the event source is open, the incoming events are processed by the event source in an asynchronous task that runs in an internal single-threaded scheduled executor service.

See Also:
Returns:new event source instance, ready to be connected to the SSE endpoint.
/** * Build new SSE event source pointing at a SSE streaming {@link WebTarget web target}. * <p> * The returned event source is ready, but not {@link SseEventSource#open() connected} to the SSE endpoint. * It is expected that you will manually invoke its {@link #open()} method once you are ready to start * receiving SSE events. In case you want to build an event source instance that is already connected * to the SSE endpoint, use the event source builder {@link #open()} method instead. * <p> * Once the event source is open, the incoming events are processed by the event source in an * asynchronous task that runs in an internal single-threaded {@link ScheduledExecutorService * scheduled executor service}. * * @return new event source instance, ready to be connected to the SSE endpoint. * @see #open() */
public abstract SseEventSource build(); }
Register a InboundSseEvent consumer.

Given consumer is invoked once per each received event.

Params:
  • onEvent – event consumer.
Throws:
/** * Register a {@link InboundSseEvent} consumer. * <p> * Given consumer is invoked once per each received event. * * @param onEvent event consumer. * @throws IllegalArgumentException when the provided parameter is {@code null}. */
void register(Consumer<InboundSseEvent> onEvent);
Register InboundSseEvent and Throwable consumers.

Event consumer is invoked once per each received event, Throwable consumer is invoked invoked upon a unrecoverable error encountered by a SseEventSource.

Params:
  • onEvent – event consumer.
  • onError – error consumer.
Throws:
/** * Register {@link InboundSseEvent} and {@link Throwable} consumers. * <p> * Event consumer is invoked once per each received event, {@code Throwable} consumer is invoked invoked upon a * unrecoverable error encountered by a {@link SseEventSource}. * * @param onEvent event consumer. * @param onError error consumer. * @throws IllegalArgumentException when the any of the provided parameters is {@code null}. */
void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable> onError);
Register InboundSseEvent and Throwable consumers and onComplete callback.

Event consumer is invoked once per each received event, Throwable consumer is invoked invoked upon a unrecoverable error encountered by a SseEventSource, onComplete callback is invoked when there are no further events to be received.

Params:
  • onEvent – event consumer.
  • onError – error consumer.
  • onComplete – onComplete handler.
Throws:
/** * Register {@link InboundSseEvent} and {@link Throwable} consumers and onComplete callback. * <p> * Event consumer is invoked once per each received event, {@code Throwable} consumer is invoked invoked upon a * unrecoverable error encountered by a {@link SseEventSource}, onComplete callback is invoked when there are no * further events to be received. * * @param onEvent event consumer. * @param onError error consumer. * @param onComplete onComplete handler. * @throws IllegalArgumentException when the any of the provided parameters is {@code null}. */
void register(Consumer<InboundSseEvent> onEvent, Consumer<Throwable> onError, Runnable onComplete);
Create a new event source builder that provides convenient way how to configure and fine-tune various aspects of a newly prepared event source instance.
Params:
  • endpoint – SSE streaming endpoint. Must not be null.
Throws:
Returns:a builder of a new event source instance pointing at the specified SSE streaming endpoint.
/** * Create a new {@link SseEventSource.Builder event source builder} that provides convenient way how to * configure and fine-tune various aspects of a newly prepared event source instance. * * @param endpoint SSE streaming endpoint. Must not be {@code null}. * @return a builder of a new event source instance pointing at the specified SSE streaming endpoint. * @throws NullPointerException in case the supplied web target is {@code null}. */
static Builder target(WebTarget endpoint) { return Builder.newBuilder().target(endpoint); }
Open the connection to the supplied SSE underlying web target and start processing incoming events.
Throws:
/** * Open the connection to the supplied SSE underlying {@link WebTarget web target} and start processing incoming * {@link InboundSseEvent events}. * * @throws IllegalStateException in case the event source has already been opened earlier. */
void open();
Check if this event source instance has already been opened.
Returns:true if this event source is open, false otherwise.
/** * Check if this event source instance has already been {@link #open() opened}. * * @return {@code true} if this event source is open, {@code false} otherwise. */
boolean isOpen();
Close this event source.

The method will wait up to 5 seconds for the internal event processing tasks to complete.

/** * Close this event source. * <p> * The method will wait up to 5 seconds for the internal event processing tasks to complete. */
@Override default void close() { close(5, TimeUnit.SECONDS); }
Close this event source and wait for the internal event processing task to complete for up to the specified amount of wait time.

The method blocks until the event processing task has completed execution after a shutdown request, or until the timeout occurs, or the current thread is interrupted, whichever happens first.

In case the waiting for the event processing task has been interrupted, this method restores the interrupt flag on the thread before returning false.

Params:
  • timeout – the maximum time to wait.
  • unit – the time unit of the timeout argument.
Returns:true if this executor terminated and false if the timeout elapsed before termination or the termination was interrupted.
/** * Close this event source and wait for the internal event processing task to complete * for up to the specified amount of wait time. * <p> * The method blocks until the event processing task has completed execution after a shutdown * request, or until the timeout occurs, or the current thread is interrupted, whichever happens * first. * <p> * In case the waiting for the event processing task has been interrupted, this method restores * the {@link Thread#interrupted() interrupt} flag on the thread before returning {@code false}. * * @param timeout the maximum time to wait. * @param unit the time unit of the timeout argument. * @return {@code true} if this executor terminated and {@code false} if the timeout elapsed * before termination or the termination was interrupted. */
boolean close(final long timeout, final TimeUnit unit); }