package io.vertx.proton.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonReceiver;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.codec.CompositeReadableBuffer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.impl.MessageImpl;
import static io.vertx.proton.ProtonHelper.accepted;
public class ProtonReceiverImpl extends ProtonLinkImpl<ProtonReceiver> implements ProtonReceiver {
private static final Logger LOG = LoggerFactory.getLogger(ProtonReceiverImpl.class);
private ProtonMessageHandler handler;
private int prefetch = 1000;
private Handler<AsyncResult<Void>> drainCompleteHandler;
private Long drainTimeoutTaskId = null;
private Session session;
private int maxFrameSize;
private long sessionIncomingCapacity;
private long windowFullThreshhold;
private Handler<ProtonReceiver> maxMessageSizeExceededHandler;
private boolean maxMessageSizeExceeded;
ProtonReceiverImpl(Receiver receiver) {
super(receiver);
session = receiver.getSession();
sessionIncomingCapacity = session.getIncomingCapacity();
maxFrameSize = session.getConnection().getTransport().getMaxFrameSize();
windowFullThreshhold = sessionIncomingCapacity - maxFrameSize;
}
@Override
protected ProtonReceiverImpl self() {
return this;
}
private Receiver getReceiver() {
return (Receiver) link;
}
public int recv(byte[] bytes, int offset, int size) {
return getReceiver().recv(bytes, offset, size);
}
@Override
public String getRemoteAddress() {
Source remoteSource = getRemoteSource();
return remoteSource == null ? null : remoteSource.getAddress();
}
@Override
public ProtonReceiver drain(long timeout, Handler<AsyncResult<Void>> completionHandler) {
if (prefetch > 0) {
throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
}
if (completionHandler == null) {
throw new IllegalArgumentException("A completion handler must be provided");
}
if (drainCompleteHandler != null) {
throw new IllegalStateException("A previous drain operation has not yet completed");
}
if ((getCredit() - getQueued()) <= 0) {
if (getQueued() == 0) {
completionHandler.handle(Future.succeededFuture());
} else {
setDrainHandlerAndTimeoutTask(timeout, completionHandler);
}
} else {
setDrainHandlerAndTimeoutTask(timeout, completionHandler);
getReceiver().drain(0);
flushConnection();
}
return this;
}
private void setDrainHandlerAndTimeoutTask(long delay, Handler<AsyncResult<Void>> completionHandler) {
drainCompleteHandler = completionHandler;
if(delay > 0) {
Vertx vertx = Vertx.currentContext().owner();
drainTimeoutTaskId = vertx.setTimer(delay, x -> {
drainTimeoutTaskId = null;
drainCompleteHandler = null;
completionHandler.handle(Future.failedFuture("Drain attempt timed out"));
});
}
}
@Override
public ProtonReceiver flow(int credits) throws IllegalStateException {
flow(credits, true);
return this;
}
private void flow(int credits, boolean checkPrefetch) throws IllegalStateException {
if (checkPrefetch && prefetch > 0) {
throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
}
if (drainCompleteHandler != null) {
throw new IllegalStateException("A previous drain operation has not yet completed");
}
getReceiver().flow(credits);
flushConnection();
}
public boolean draining() {
return getReceiver().draining();
}
public ProtonReceiver setDrain(boolean drain) {
getReceiver().setDrain(drain);
return this;
}
@Override
public ProtonReceiver handler(ProtonMessageHandler handler) {
this.handler = handler;
onDelivery();
return this;
}
@Override
public ProtonReceiver maxMessageSizeExceededHandler(Handler<ProtonReceiver> handler) {
this.maxMessageSizeExceededHandler = handler;
return this;
}
private void flushConnection() {
getSession().getConnectionImpl().flush();
}
private boolean autoAccept = true;
private CompositeReadableBuffer splitContent;
void onDelivery() {
if (this.handler == null) {
return;
}
Receiver receiver = getReceiver();
Delivery delivery = receiver.current();
if (delivery != null) {
if(delivery.isAborted()) {
handleAborted(receiver, delivery);
return;
}
UnsignedLong maxMessageSize = getMaxMessageSize();
if(maxMessageSize != null) {
if(checkMaxMessageSize(maxMessageSize, delivery, receiver)) {
return;
};
}
if (delivery.isPartial()) {
handlePartial(receiver, delivery);
return;
}
ReadableBuffer data = receiver.recv();
if(splitContent != null) {
data = completePartial(data);
}
receiver.advance();
MessageImpl msg = (MessageImpl) Proton.message();
ProtonDeliveryImpl delImpl = new ProtonDeliveryImpl(delivery);
try {
msg.decode(data);
} catch (Throwable t) {
LOG.debug("Unable to decode message, undeliverable", t);
handleDecodeFailure(receiver, delImpl);
return;
}
handler.handle(delImpl, msg);
if (autoAccept && delivery.getLocalState() == null) {
accepted(delImpl, true);
}
if (prefetch > 0) {
flow(1, false);
} else {
processForDrainCompletion();
}
}
}
private boolean checkMaxMessageSize(final UnsignedLong maxMessageSize, final Delivery delivery, final Receiver receiver) {
if(maxMessageSizeExceeded) {
receiver.recv();
return true;
}
long payloadLength = delivery.available();
if(splitContent != null) {
payloadLength += splitContent.remaining();
}
long max = maxMessageSize.longValue();
if(max > 0 && payloadLength > max) {
maxMessageSizeExceeded = true;
splitContent = null;
receiver.recv();
handleMaxMessageSizeExceeded(maxMessageSize, receiver);
return true;
}
return false;
}
private void handleMaxMessageSizeExceeded(final UnsignedLong maxMessageSize, final Receiver receiver) {
try {
LOG.debug("delivery received exceeding max-message-size of " + maxMessageSize + " bytes");
if (maxMessageSizeExceededHandler != null) {
maxMessageSizeExceededHandler.handle(this);
}
} finally {
if (!receiver.detached() && isOpen()) {
LOG.debug("detaching link with error condition " + LinkError.MESSAGE_SIZE_EXCEEDED);
setCondition(new ErrorCondition(LinkError.MESSAGE_SIZE_EXCEEDED, "exceeded max-message-size of " + maxMessageSize + " bytes "));
detach();
}
}
}
private void handleDecodeFailure(Receiver receiver, ProtonDeliveryImpl delImpl) {
Modified modified = new Modified();
modified.setDeliveryFailed(true);
modified.setUndeliverableHere(true);
delImpl.disposition(modified, true);
if(!receiver.getDrain()) {
flow(1, false);
} else {
processForDrainCompletion();
}
}
private void handleAborted(Receiver receiver, Delivery delivery) {
splitContent = null;
receiver.advance();
delivery.settle();
if(!receiver.getDrain()) {
flow(1, false);
} else {
processForDrainCompletion();
}
}
private void handlePartial(final Receiver receiver, final Delivery delivery) {
if (sessionIncomingCapacity <= 0 || maxFrameSize <= 0 || session.getIncomingBytes() < windowFullThreshhold) {
} else {
if(delivery.available() > 0) {
ReadableBuffer buff = receiver.recv();
if(splitContent == null && buff instanceof CompositeReadableBuffer) {
splitContent = (CompositeReadableBuffer) buff;
} else {
int remaining = buff.remaining();
if(remaining > 0) {
if (splitContent == null) {
splitContent = new CompositeReadableBuffer();
}
byte[] chunk = new byte[remaining];
buff.get(chunk);
splitContent.append(chunk);
}
}
}
}
}
private ReadableBuffer completePartial(final ReadableBuffer finalContent) {
int pending = finalContent.remaining();
if(pending > 0) {
byte[] chunk = new byte[pending];
finalContent.get(chunk);
splitContent.append(chunk);
}
ReadableBuffer data = splitContent;
splitContent = null;
return data;
}
@Override
public boolean isAutoAccept() {
return autoAccept;
}
@Override
public ProtonReceiver setAutoAccept(boolean autoAccept) {
this.autoAccept = autoAccept;
return this;
}
@Override
public ProtonReceiver setPrefetch(int messages) {
if (messages < 0) {
throw new IllegalArgumentException("Value must not be negative");
}
prefetch = messages;
return this;
}
@Override
public int getPrefetch() {
return prefetch;
}
@Override
public ProtonReceiver open() {
super.open();
if (prefetch > 0) {
flow(prefetch, false);
}
return this;
}
@Override
void handleLinkFlow(){
processForDrainCompletion();
}
private void processForDrainCompletion() {
Handler<AsyncResult<Void>> h = drainCompleteHandler;
if(h != null && getCredit() <= 0 && getQueued() <= 0) {
boolean timeoutTaskCleared = false;
Long timerId = drainTimeoutTaskId;
if(timerId != null) {
Vertx vertx = Vertx.currentContext().owner();
timeoutTaskCleared = vertx.cancelTimer(timerId);
} else {
timeoutTaskCleared = true;
}
drainTimeoutTaskId = null;
drainCompleteHandler = null;
if(timeoutTaskCleared) {
h.handle(Future.succeededFuture());
}
}
}
}