package examples;
import io.vertx.amqp.*;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
public class AmqpClientExamples {
public void creation(Vertx vertx) {
AmqpClientOptions options = new AmqpClientOptions()
.setHost("localhost")
.setPort(5672)
.setUsername("user")
.setPassword("secret");
AmqpClient client1 = AmqpClient.create(options);
AmqpClient client2 = AmqpClient.create(vertx, options);
}
public void connect(AmqpClient client) {
client.connect(ar -> {
if (ar.failed()) {
System.out.println("Unable to connect to the broker");
} else {
System.out.println("Connection succeeded");
AmqpConnection connection = ar.result();
}
});
}
public void receiver1(AmqpConnection connection) {
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
}
public void receiverFromClient(AmqpClient client) {
client.createReceiver("my-queue"
,
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
}
public void senderFromClient(AmqpClient client) {
client.createSender("my-queue", maybeSender -> {
});
}
public void receiver2(AmqpConnection connection) {
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver
.exceptionHandler(t -> {
})
.handler(msg -> {
});
}
}
);
}
public void sender(AmqpConnection connection) {
connection.createSender("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create a sender");
} else {
AmqpSender result = done.result();
}
});
}
public void messages() {
AmqpMessageBuilder builder = AmqpMessage.create();
AmqpMessage m1 = builder.withBody("hello").build();
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();
AmqpMessage m3 = builder
.withJsonObjectAsBody(new JsonObject().put("message", "hello"))
.subject("subject")
.ttl(10000)
.applicationProperties(new JsonObject().put("prop1", "value1"))
.build();
}
public void send(AmqpSender sender) {
sender.send(AmqpMessage.create().withBody("hello").build());
}
public void sendWithAck(AmqpSender sender) {
sender.sendWithAck(AmqpMessage.create().withBody("hello").build(), acked -> {
if (acked.succeeded()) {
System.out.println("Message accepted");
} else {
System.out.println("Message not accepted");
}
});
}
public void requestReply(AmqpConnection connection) {
connection.createAnonymousSender(responseSender -> {
connection.createReceiver("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
responseSender.result().send(AmqpMessage.create()
.address(msg.replyTo())
.correlationId(msg.id())
.withBody("my response to your request")
.build()
);
});
}
});
});
connection.createDynamicReceiver(replyReceiver -> {
String replyToAddress = replyReceiver.result().address();
replyReceiver.result().handler(msg -> {
System.out.println("Got the reply! " + msg.bodyAsString());
});
connection.createSender("my-queue", sender -> {
sender.result().send(AmqpMessage.create()
.replyTo(replyToAddress)
.id("my-message-id")
.withBody("This is my request").build());
});
});
}
}