package io.vertx.core.eventbus.impl;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import java.util.List;
import java.util.Map;
public class MessageImpl<U, V> implements Message<V> {
private static final Logger log = LoggerFactory.getLogger(MessageImpl.class);
protected MessageCodec<U, V> messageCodec;
protected final EventBusImpl bus;
protected String address;
protected String replyAddress;
protected MultiMap ;
protected U sentBody;
protected V receivedBody;
protected boolean send;
protected Object trace;
public MessageImpl(EventBusImpl bus) {
this.bus = bus;
}
public MessageImpl(String address, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec;
this.address = address;
this.headers = headers;
this.sentBody = sentBody;
this.send = send;
this.bus = bus;
}
protected MessageImpl(MessageImpl<U, V> other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
this.messageCodec = other.messageCodec;
if (other.headers != null) {
List<Map.Entry<String, String>> entries = other.headers.entries();
this.headers = MultiMap.caseInsensitiveMultiMap();
for (Map.Entry<String, String> entry: entries) {
this.headers.add(entry.getKey(), entry.getValue());
}
}
if (other.sentBody != null) {
this.sentBody = other.sentBody;
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
}
public MessageImpl<U, V> copyBeforeReceive() {
return new MessageImpl<>(this);
}
@Override
public String address() {
return address;
}
@Override
public MultiMap () {
if (headers == null) {
headers = MultiMap.caseInsensitiveMultiMap();
}
return headers;
}
@Override
public V body() {
if (receivedBody == null && sentBody != null) {
receivedBody = messageCodec.transform(sentBody);
}
return receivedBody;
}
@Override
public String replyAddress() {
return replyAddress;
}
@Override
public void reply(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = createReply(message, options);
bus.sendReply(reply, options, null);
}
}
@Override
public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options) {
if (replyAddress != null) {
MessageImpl reply = createReply(message, options);
ReplyHandler<R> handler = bus.createReplyHandler(reply, false, options);
bus.sendReply(reply, options, handler);
return handler.result();
} else {
throw new IllegalStateException();
}
}
protected MessageImpl createReply(Object message, DeliveryOptions options) {
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
reply.trace = trace;
return reply;
}
@Override
public boolean isSend() {
return send;
}
public void setReplyAddress(String replyAddress) {
this.replyAddress = replyAddress;
}
public MessageCodec<U, V> codec() {
return messageCodec;
}
protected boolean isLocal() {
return true;
}
}