package io.vertx.proton.streams.impl;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.streams.Delivery;
public class DeliveryImpl implements Delivery {
private static final Accepted ACCEPTED = Accepted.getInstance();
private final Message message;
private final ProtonDelivery delivery;
private ContextInternal ctx;
DeliveryImpl(Message message, ProtonDelivery delivery, ContextInternal ctx) {
this.message = message;
this.delivery = delivery;
this.ctx = ctx;
}
@Override
public Message message() {
return message;
}
public ProtonDelivery delivery() {
return delivery;
}
@Override
public Delivery accept() {
ackOnContext(v -> {
delivery.disposition(ACCEPTED, true);
});
return this;
}
@Override
public Delivery disposition(final DeliveryState state, final boolean settle) {
ackOnContext(v -> {
delivery.disposition(state, settle);
});
return this;
}
private void ackOnContext(Handler<Void> action) {
if (onContextEventLoop()) {
action.handle(null);
} else {
ctx.runOnContext(action);
}
}
public boolean onContextEventLoop() {
return ctx.nettyEventLoop().inEventLoop();
}
public Context getCtx() {
return ctx;
}
}