/*
 * Copyright 2014 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.vertx.reactivex.ext.stomp;

import io.vertx.reactivex.RxHelper;
import io.vertx.reactivex.ObservableHelper;
import io.vertx.reactivex.FlowableHelper;
import io.vertx.reactivex.impl.AsyncResultMaybe;
import io.vertx.reactivex.impl.AsyncResultSingle;
import io.vertx.reactivex.impl.AsyncResultCompletable;
import io.vertx.reactivex.WriteStreamObserver;
import io.vertx.reactivex.WriteStreamSubscriber;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.vertx.core.Handler;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.lang.rx.RxGen;
import io.vertx.lang.rx.TypeArg;
import io.vertx.lang.rx.MappingIterator;

Defines a STOMP server. STOMP servers delegates to a StompServerHandler that let customize the behavior of the server. By default, it uses a handler compliant with the STOMP specification, but let you change anything.

NOTE: This class has been automatically generated from the original non RX-ified interface using Vert.x codegen.
/** * Defines a STOMP server. STOMP servers delegates to a {@link io.vertx.reactivex.ext.stomp.StompServerHandler} that let customize the behavior of * the server. By default, it uses a handler compliant with the STOMP specification, but let you change anything. * * <p/> * NOTE: This class has been automatically generated from the {@link io.vertx.ext.stomp.StompServer original} non RX-ified interface using Vert.x codegen. */
@RxGen(io.vertx.ext.stomp.StompServer.class) public class StompServer { @Override public String toString() { return delegate.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; StompServer that = (StompServer) o; return delegate.equals(that.delegate); } @Override public int hashCode() { return delegate.hashCode(); } public static final TypeArg<StompServer> __TYPE_ARG = new TypeArg<>( obj -> new StompServer((io.vertx.ext.stomp.StompServer) obj), StompServer::getDelegate ); private final io.vertx.ext.stomp.StompServer delegate; public StompServer(io.vertx.ext.stomp.StompServer delegate) { this.delegate = delegate; } public StompServer(Object delegate) { this.delegate = (io.vertx.ext.stomp.StompServer)delegate; } public io.vertx.ext.stomp.StompServer getDelegate() { return delegate; } private static final TypeArg<io.vertx.reactivex.core.http.ServerWebSocket> TYPE_ARG_0 = new TypeArg<io.vertx.reactivex.core.http.ServerWebSocket>(o1 -> io.vertx.reactivex.core.http.ServerWebSocket.newInstance((io.vertx.core.http.ServerWebSocket)o1), o1 -> o1.getDelegate());
Creates a StompServer based on the default Stomp Server implementation.
Params:
  • vertx – the vert.x instance to use
  • options – the server options
Returns:the created StompServer
/** * Creates a {@link io.vertx.reactivex.ext.stomp.StompServer} based on the default Stomp Server implementation. * @param vertx the vert.x instance to use * @param options the server options * @return the created {@link io.vertx.reactivex.ext.stomp.StompServer} */
public static io.vertx.reactivex.ext.stomp.StompServer create(io.vertx.reactivex.core.Vertx vertx, io.vertx.ext.stomp.StompServerOptions options) { io.vertx.reactivex.ext.stomp.StompServer ret = io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)io.vertx.ext.stomp.StompServer.create(vertx.getDelegate(), options)); return ret; }
Creates a StompServer based on the default Stomp Server implementation.
Params:
  • vertx – the vert.x instance to use
  • netServer – the Net server used by the STOMP server
Returns:the created StompServer
/** * Creates a {@link io.vertx.reactivex.ext.stomp.StompServer} based on the default Stomp Server implementation. * @param vertx the vert.x instance to use * @param netServer the Net server used by the STOMP server * @return the created {@link io.vertx.reactivex.ext.stomp.StompServer} */
public static io.vertx.reactivex.ext.stomp.StompServer create(io.vertx.reactivex.core.Vertx vertx, io.vertx.reactivex.core.net.NetServer netServer) { io.vertx.reactivex.ext.stomp.StompServer ret = io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)io.vertx.ext.stomp.StompServer.create(vertx.getDelegate(), netServer.getDelegate())); return ret; }
Creates a StompServer based on the default Stomp Server implementation.
Params:
  • vertx – the vert.x instance to use
  • net – the Net server used by the STOMP server
  • options – the server options
Returns:the created StompServer
/** * Creates a {@link io.vertx.reactivex.ext.stomp.StompServer} based on the default Stomp Server implementation. * @param vertx the vert.x instance to use * @param net the Net server used by the STOMP server * @param options the server options * @return the created {@link io.vertx.reactivex.ext.stomp.StompServer} */
public static io.vertx.reactivex.ext.stomp.StompServer create(io.vertx.reactivex.core.Vertx vertx, io.vertx.reactivex.core.net.NetServer net, io.vertx.ext.stomp.StompServerOptions options) { io.vertx.reactivex.ext.stomp.StompServer ret = io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)io.vertx.ext.stomp.StompServer.create(vertx.getDelegate(), net.getDelegate(), options)); return ret; }
Creates a StompServer based on the default Stomp Server implementation, and use the default options.
Params:
  • vertx – the vert.x instance to use
Returns:the created StompServer
/** * Creates a {@link io.vertx.reactivex.ext.stomp.StompServer} based on the default Stomp Server implementation, and use the default options. * @param vertx the vert.x instance to use * @return the created {@link io.vertx.reactivex.ext.stomp.StompServer} */
public static io.vertx.reactivex.ext.stomp.StompServer create(io.vertx.reactivex.core.Vertx vertx) { io.vertx.reactivex.ext.stomp.StompServer ret = io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)io.vertx.ext.stomp.StompServer.create(vertx.getDelegate())); return ret; }
Configures the StompServerHandler. You must calls this method before calling the listen method.
Params:
  • handler – the handler
Returns:the current StompServer
/** * Configures the {@link io.vertx.reactivex.ext.stomp.StompServerHandler}. You must calls this method before calling the {@link io.vertx.reactivex.ext.stomp.StompServer#listen} method. * @param handler the handler * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer handler(io.vertx.reactivex.ext.stomp.StompServerHandler handler) { delegate.handler(handler.getDelegate()); return this; }
Connects the STOMP server default port (61613) and network interface (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • handler – the handler to call with the result
Returns:the current StompServer
/** * Connects the STOMP server default port (61613) and network interface (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @param handler the handler to call with the result * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen(Handler<AsyncResult<io.vertx.reactivex.ext.stomp.StompServer>> handler) { delegate.listen(new Handler<AsyncResult<io.vertx.ext.stomp.StompServer>>() { public void handle(AsyncResult<io.vertx.ext.stomp.StompServer> ar) { if (ar.succeeded()) { handler.handle(io.vertx.core.Future.succeededFuture(io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)ar.result()))); } else { handler.handle(io.vertx.core.Future.failedFuture(ar.cause())); } } }); return this; }
Connects the STOMP server default port (61613) and network interface (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Returns:the current StompServer
/** * Connects the STOMP server default port (61613) and network interface (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen() { return listen(ar -> { }); }
Connects the STOMP server default port (61613) and network interface (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Returns:the current StompServer
/** * Connects the STOMP server default port (61613) and network interface (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.reactivex.Single<io.vertx.reactivex.ext.stomp.StompServer> rxListen() { return AsyncResultSingle.toSingle($handler -> { listen($handler); }); }
Connects the STOMP server to the given port. This method use the default host (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
  • handler – the handler to call with the result
Returns:the current StompServer
/** * Connects the STOMP server to the given port. This method use the default host (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @param port the port * @param handler the handler to call with the result * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen(int port, Handler<AsyncResult<io.vertx.reactivex.ext.stomp.StompServer>> handler) { delegate.listen(port, new Handler<AsyncResult<io.vertx.ext.stomp.StompServer>>() { public void handle(AsyncResult<io.vertx.ext.stomp.StompServer> ar) { if (ar.succeeded()) { handler.handle(io.vertx.core.Future.succeededFuture(io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)ar.result()))); } else { handler.handle(io.vertx.core.Future.failedFuture(ar.cause())); } } }); return this; }
Connects the STOMP server to the given port. This method use the default host (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
Returns:the current StompServer
/** * Connects the STOMP server to the given port. This method use the default host (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @param port the port * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen(int port) { return listen(port, ar -> { }); }
Connects the STOMP server to the given port. This method use the default host (0.0.0.0). Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
Returns:the current StompServer
/** * Connects the STOMP server to the given port. This method use the default host (<code>0.0.0.0</code>). Once the socket * it bounds calls the given handler with the result. The result may be a failure if the socket is already used. * @param port the port * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.reactivex.Single<io.vertx.reactivex.ext.stomp.StompServer> rxListen(int port) { return AsyncResultSingle.toSingle($handler -> { listen(port, $handler); }); }
Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
  • host – the host / interface
  • handler – the handler to call with the result
Returns:the current StompServer
/** * Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with * the result. The result may be a failure if the socket is already used. * @param port the port * @param host the host / interface * @param handler the handler to call with the result * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen(int port, String host, Handler<AsyncResult<io.vertx.reactivex.ext.stomp.StompServer>> handler) { delegate.listen(port, host, new Handler<AsyncResult<io.vertx.ext.stomp.StompServer>>() { public void handle(AsyncResult<io.vertx.ext.stomp.StompServer> ar) { if (ar.succeeded()) { handler.handle(io.vertx.core.Future.succeededFuture(io.vertx.reactivex.ext.stomp.StompServer.newInstance((io.vertx.ext.stomp.StompServer)ar.result()))); } else { handler.handle(io.vertx.core.Future.failedFuture(ar.cause())); } } }); return this; }
Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
  • host – the host / interface
Returns:the current StompServer
/** * Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with * the result. The result may be a failure if the socket is already used. * @param port the port * @param host the host / interface * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer listen(int port, String host) { return listen(port, host, ar -> { }); }
Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with the result. The result may be a failure if the socket is already used.
Params:
  • port – the port
  • host – the host / interface
Returns:the current StompServer
/** * Connects the STOMP server to the given port / interface. Once the socket it bounds calls the given handler with * the result. The result may be a failure if the socket is already used. * @param port the port * @param host the host / interface * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.reactivex.Single<io.vertx.reactivex.ext.stomp.StompServer> rxListen(int port, String host) { return AsyncResultSingle.toSingle($handler -> { listen(port, host, $handler); }); }
Closes the server.
Params:
  • completionHandler – handler called once the server has been stopped
/** * Closes the server. * @param completionHandler handler called once the server has been stopped */
public void close(Handler<AsyncResult<Void>> completionHandler) { delegate.close(completionHandler); }
Closes the server.
/** * Closes the server. */
public void close() { close(ar -> { }); }
Closes the server.
Returns:
/** * Closes the server. * @return */
public io.reactivex.Completable rxClose() { return AsyncResultCompletable.toCompletable($handler -> { close($handler); }); }
Checks whether or not the server is listening.
Returns:true if the server is listening, false otherwise
/** * Checks whether or not the server is listening. * @return <code>true</code> if the server is listening, <code>false</code> otherwise */
public boolean isListening() { boolean ret = delegate.isListening(); return ret; }
Gets the port on which the server is listening.

This is useful if you bound the server specifying 0 as port number signifying an ephemeral port.
Returns:the port
/** * Gets the port on which the server is listening. * <p/> * This is useful if you bound the server specifying 0 as port number signifying an ephemeral port. * @return the port */
public int actualPort() { int ret = delegate.actualPort(); return ret; }
Returns:the server options
/** * @return the server options */
public io.vertx.ext.stomp.StompServerOptions options() { io.vertx.ext.stomp.StompServerOptions ret = delegate.options(); return ret; }
Returns:the instance of vert.x used by the server.
/** * @return the instance of vert.x used by the server. */
public io.vertx.reactivex.core.Vertx vertx() { io.vertx.reactivex.core.Vertx ret = io.vertx.reactivex.core.Vertx.newInstance((io.vertx.core.Vertx)delegate.vertx()); return ret; }
Returns:the StompServerHandler used by this server.
/** * @return the {@link io.vertx.reactivex.ext.stomp.StompServerHandler} used by this server. */
public io.vertx.reactivex.ext.stomp.StompServerHandler stompHandler() { io.vertx.reactivex.ext.stomp.StompServerHandler ret = io.vertx.reactivex.ext.stomp.StompServerHandler.newInstance((io.vertx.ext.stomp.StompServerHandler)delegate.stompHandler()); return ret; }
Gets the able to manage web socket connections. If the web socket bridge is disabled, it returns null.
Returns:the handler that can be passed to HttpServer.webSocketHandler.
/** * Gets the able to manage web socket connections. If the web socket bridge is disabled, it returns * <code>null</code>. * @return the handler that can be passed to {@link io.vertx.reactivex.core.http.HttpServer#webSocketHandler}. */
public Handler<io.vertx.reactivex.core.http.ServerWebSocket> webSocketHandler() { Handler<io.vertx.reactivex.core.http.ServerWebSocket> ret = new Handler<io.vertx.reactivex.core.http.ServerWebSocket>() { public void handle(io.vertx.reactivex.core.http.ServerWebSocket event) { delegate.webSocketHandler().handle(event.getDelegate()); } }; return ret; }
Configures the handler that is invoked every time a frame is going to be written to the "wire". It lets you log the frames, but also adapt the frame if needed.
Params:
  • handler – the handler, must not be null
Returns:the current StompServer
/** * Configures the handler that is invoked every time a frame is going to be written to the "wire". It lets you log * the frames, but also adapt the frame if needed. * @param handler the handler, must not be <code>null</code> * @return the current {@link io.vertx.reactivex.ext.stomp.StompServer} */
public io.vertx.reactivex.ext.stomp.StompServer writingFrameHandler(Handler<io.vertx.reactivex.ext.stomp.ServerFrame> handler) { delegate.writingFrameHandler(new Handler<io.vertx.ext.stomp.ServerFrame>() { public void handle(io.vertx.ext.stomp.ServerFrame event) { handler.handle(io.vertx.reactivex.ext.stomp.ServerFrame.newInstance((io.vertx.ext.stomp.ServerFrame)event)); } }); return this; } public static StompServer newInstance(io.vertx.ext.stomp.StompServer arg) { return arg != null ? new StompServer(arg) : null; } }