package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.*;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.List;
import java.util.Map;
public class MessageImpl<U, V> implements Message<V> {
private static final Logger log = LoggerFactory.getLogger(MessageImpl.class);
protected MessageCodec<U, V> messageCodec;
protected EventBusImpl bus;
protected String address;
protected String replyAddress;
protected MultiMap ;
protected U sentBody;
protected V receivedBody;
protected boolean send;
protected Handler<AsyncResult<Void>> writeHandler;
public MessageImpl() {
}
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, EventBusImpl bus,
Handler<AsyncResult<Void>> writeHandler) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
this.headers = headers;
this.sentBody = sentBody;
this.send = send;
this.bus = bus;
this.writeHandler = writeHandler;
}
protected MessageImpl(MessageImpl<U, V> other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
this.messageCodec = other.messageCodec;
if (other.headers != null) {
List<Map.Entry<String, String>> entries = other.headers.entries();
this.headers = new CaseInsensitiveHeaders();
for (Map.Entry<String, String> entry: entries) {
this.headers.add(entry.getKey(), entry.getValue());
}
}
if (other.sentBody != null) {
this.sentBody = other.sentBody;
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
this.writeHandler = other.writeHandler;
}
public MessageImpl<U, V> copyBeforeReceive() {
return new MessageImpl<>(this);
}
@Override
public String address() {
return address;
}
@Override
public MultiMap () {
if (headers == null) {
headers = new CaseInsensitiveHeaders();
}
return headers;
}
@Override
public V body() {
if (receivedBody == null && sentBody != null) {
receivedBody = messageCodec.transform(sentBody);
}
return receivedBody;
}
@Override
public String replyAddress() {
return replyAddress;
}
@Override
public void fail(int failureCode, String message) {
if (replyAddress != null) {
sendReply(bus.createMessage(true, replyAddress, null,
new ReplyException(ReplyFailure.RECIPIENT_FAILURE, failureCode, message), null, null), null, null);
}
}
@Override
public void reply(Object message) {
reply(message, new DeliveryOptions(), null);
}
@Override
public <R> void reply(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
reply(message, new DeliveryOptions(), replyHandler);
}
@Override
public void reply(Object message, DeliveryOptions options) {
reply(message, options, null);
}
@Override
public <R> void reply(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (replyAddress != null) {
sendReply(bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
}
}
@Override
public boolean isSend() {
return send;
}
public void setReplyAddress(String replyAddress) {
this.replyAddress = replyAddress;
}
public Handler<AsyncResult<Void>> writeHandler() {
return writeHandler;
}
public MessageCodec<U, V> codec() {
return messageCodec;
}
public void setBus(EventBusImpl bus) {
this.bus = bus;
}
protected <R> void sendReply(MessageImpl msg, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (bus != null) {
bus.sendReply(msg, this, options, replyHandler);
}
}
protected boolean isLocal() {
return true;
}
}