/*
 * Copyright 2014 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.vertx.rxjava.rabbitmq;

import rx.Observable;
import rx.Single;
import io.vertx.rx.java.RxHelper;
import io.vertx.rx.java.WriteStreamSubscriber;
import io.vertx.rx.java.SingleOnSubscribeAdapter;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.vertx.core.Handler;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.lang.rx.RxGen;
import io.vertx.lang.rx.TypeArg;
import io.vertx.lang.rx.MappingIterator;

A reliable publisher that
  • Queues up messages internally until it can successfully call basicPublish.
  • Notifies the caller using a robust ID (not delivery tag) when the message is confirmed by rabbit.
This is a layer above the RabbitMQClient that provides a lot of standard implementation when guaranteed at least once delivery is required. If confirmations are not required do not use this publisher as it does have overhead.

NOTE: This class has been automatically generated from the original non RX-ified interface using Vert.x codegen.
/** * A reliable publisher that * <ul> * <li>Queues up messages internally until it can successfully call basicPublish. * <li>Notifies the caller using a robust ID (not delivery tag) when the message is confirmed by rabbit. * </ul> * * This is a layer above the RabbitMQClient that provides a lot of standard implementation when guaranteed at least once delivery is required. * If confirmations are not required do not use this publisher as it does have overhead. * * <p/> * NOTE: This class has been automatically generated from the {@link io.vertx.rabbitmq.RabbitMQPublisher original} non RX-ified interface using Vert.x codegen. */
@RxGen(io.vertx.rabbitmq.RabbitMQPublisher.class) public class RabbitMQPublisher { @Override public String toString() { return delegate.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RabbitMQPublisher that = (RabbitMQPublisher) o; return delegate.equals(that.delegate); } @Override public int hashCode() { return delegate.hashCode(); } public static final TypeArg<RabbitMQPublisher> __TYPE_ARG = new TypeArg<>( obj -> new RabbitMQPublisher((io.vertx.rabbitmq.RabbitMQPublisher) obj), RabbitMQPublisher::getDelegate ); private final io.vertx.rabbitmq.RabbitMQPublisher delegate; public RabbitMQPublisher(io.vertx.rabbitmq.RabbitMQPublisher delegate) { this.delegate = delegate; } public RabbitMQPublisher(Object delegate) { this.delegate = (io.vertx.rabbitmq.RabbitMQPublisher)delegate; } public io.vertx.rabbitmq.RabbitMQPublisher getDelegate() { return delegate; }
Create and return a publisher using the specified client.
Params:
  • vertx – the vertx instance.
  • client – the RabbitMQClient.
  • options – options for the publisher.
Returns:the publisher
/** * Create and return a publisher using the specified client. * @param vertx the vertx instance. * @param client the RabbitMQClient. * @param options options for the publisher. * @return the publisher */
public static io.vertx.rxjava.rabbitmq.RabbitMQPublisher create(io.vertx.rxjava.core.Vertx vertx, io.vertx.rxjava.rabbitmq.RabbitMQClient client, io.vertx.rabbitmq.RabbitMQPublisherOptions options) { io.vertx.rxjava.rabbitmq.RabbitMQPublisher ret = io.vertx.rxjava.rabbitmq.RabbitMQPublisher.newInstance((io.vertx.rabbitmq.RabbitMQPublisher)io.vertx.rabbitmq.RabbitMQPublisher.create(vertx.getDelegate(), client.getDelegate(), options)); return ret; }
Start the rabbitMQ publisher. The RabbitMQClient should have been started before this.
Params:
  • resultHandler –
/** * Start the rabbitMQ publisher. * The RabbitMQClient should have been started before this. * * @param resultHandler */
public void start(Handler<AsyncResult<Void>> resultHandler) { delegate.start(resultHandler); }
Start the rabbitMQ publisher. The RabbitMQClient should have been started before this.
/** * Start the rabbitMQ publisher. * The RabbitMQClient should have been started before this. * */
public void start() { start(ar -> { }); }
Start the rabbitMQ publisher. The RabbitMQClient should have been started before this.
Returns:
/** * Start the rabbitMQ publisher. * The RabbitMQClient should have been started before this. * * @return */
public Single<Void> rxStart() { return Single.create(new SingleOnSubscribeAdapter<>(fut -> { start(fut); })); }
Stop the rabbitMQ publisher. Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. Future calls to publish will be ignored.
Params:
  • resultHandler –
/** * Stop the rabbitMQ publisher. * Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. * Future calls to publish will be ignored. * * @param resultHandler */
public void stop(Handler<AsyncResult<Void>> resultHandler) { delegate.stop(resultHandler); }
Stop the rabbitMQ publisher. Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. Future calls to publish will be ignored.
/** * Stop the rabbitMQ publisher. * Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. * Future calls to publish will be ignored. * */
public void stop() { stop(ar -> { }); }
Stop the rabbitMQ publisher. Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. Future calls to publish will be ignored.
Returns:
/** * Stop the rabbitMQ publisher. * Calling this is optional, but it gives the opportunity to drain the send queue without losing messages. * Future calls to publish will be ignored. * * @return */
public Single<Void> rxStop() { return Single.create(new SingleOnSubscribeAdapter<>(fut -> { stop(fut); })); }
Undo the effects of calling stop so that publish may be called again. It is harmless to call restart() when stop has not been called, however if restart() is called whilst stop is being processed the stop will never complete.
/** * Undo the effects of calling {@link io.vertx.rxjava.rabbitmq.RabbitMQPublisher#stop} so that publish may be called again. * It is harmless to call restart() when {@link io.vertx.rxjava.rabbitmq.RabbitMQPublisher#stop} has not been called, however if restart() is called * whilst {@link io.vertx.rxjava.rabbitmq.RabbitMQPublisher#stop} is being processed the {@link io.vertx.rxjava.rabbitmq.RabbitMQPublisher#stop} will never complete. * */
public void restart() { delegate.restart(); }
Get the ReadStream that contains the message IDs for confirmed messages. The message IDs in this ReadStream are taken from the message properties, if these message IDs are not set then this ReadStream will contain nulls and using this publisher will be pointless.
Returns:the ReadStream that contains the message IDs for confirmed messages.
/** * Get the ReadStream that contains the message IDs for confirmed messages. * The message IDs in this ReadStream are taken from the message properties, * if these message IDs are not set then this ReadStream will contain nulls and using this publisher will be pointless. * @return the ReadStream that contains the message IDs for confirmed messages. */
public io.vertx.rxjava.core.streams.ReadStream<io.vertx.rabbitmq.RabbitMQPublisherConfirmation> getConfirmationStream() { io.vertx.rxjava.core.streams.ReadStream<io.vertx.rabbitmq.RabbitMQPublisherConfirmation> ret = io.vertx.rxjava.core.streams.ReadStream.newInstance((io.vertx.core.streams.ReadStream)delegate.getConfirmationStream(), TypeArg.unknown()); return ret; }
Get the number of published, but not sent, messages.
Returns:the number of published, but not sent, messages.
/** * Get the number of published, but not sent, messages. * @return the number of published, but not sent, messages. */
public int queueSize() { int ret = delegate.queueSize(); return ret; }
Publish a message.
Params:
  • exchange –
  • routingKey –
  • properties –
  • body –
  • resultHandler –
/** * Publish a message. * @param exchange * @param routingKey * @param properties * @param body * @param resultHandler */
public void publish(String exchange, String routingKey, com.rabbitmq.client.BasicProperties properties, io.vertx.rxjava.core.buffer.Buffer body, Handler<AsyncResult<Void>> resultHandler) { delegate.publish(exchange, routingKey, properties, body.getDelegate(), resultHandler); }
Publish a message.
Params:
  • exchange –
  • routingKey –
  • properties –
  • body –
/** * Publish a message. * @param exchange * @param routingKey * @param properties * @param body */
public void publish(String exchange, String routingKey, com.rabbitmq.client.BasicProperties properties, io.vertx.rxjava.core.buffer.Buffer body) { publish(exchange, routingKey, properties, body, ar -> { }); }
Publish a message.
Params:
  • exchange –
  • routingKey –
  • properties –
  • body –
Returns:
/** * Publish a message. * @param exchange * @param routingKey * @param properties * @param body * @return */
public Single<Void> rxPublish(String exchange, String routingKey, com.rabbitmq.client.BasicProperties properties, io.vertx.rxjava.core.buffer.Buffer body) { return Single.create(new SingleOnSubscribeAdapter<>(fut -> { publish(exchange, routingKey, properties, body, fut); })); } public static RabbitMQPublisher newInstance(io.vertx.rabbitmq.RabbitMQPublisher arg) { return arg != null ? new RabbitMQPublisher(arg) : null; } }