package io.vertx.proton.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Record;
import java.util.Map;
abstract class ProtonLinkImpl<T extends ProtonLink<T>> implements ProtonLink<T> {
protected final Link link;
private Handler<AsyncResult<T>> openHandler;
private Handler<AsyncResult<T>> closeHandler;
private Handler<AsyncResult<T>> detachHandler;
ProtonLinkImpl(Link link) {
this.link = link;
this.link.setContext(this);
setQoS(getRemoteQoS());
}
protected abstract T self();
@Override
public ProtonSessionImpl getSession() {
return (ProtonSessionImpl) this.link.getSession().getContext();
}
@Override
public Record attachments() {
return link.attachments();
}
@Override
public ErrorCondition getCondition() {
return link.getCondition();
}
@Override
public int getCredit() {
return link.getCredit();
}
@Override
public boolean getDrain() {
return link.getDrain();
}
public EndpointState getLocalState() {
return link.getLocalState();
}
@Override
public String getName() {
return link.getName();
}
@Override
public ErrorCondition getRemoteCondition() {
return link.getRemoteCondition();
}
public int getRemoteCredit() {
return link.getRemoteCredit();
}
public EndpointState getRemoteState() {
return link.getRemoteState();
}
@Override
public Target getRemoteTarget() {
return link.getRemoteTarget();
}
@Override
public Target getTarget() {
return link.getTarget();
}
@Override
public T setTarget(Target target) {
link.setTarget(target);
return self();
}
@Override
public Source getRemoteSource() {
return link.getRemoteSource();
}
@Override
public Source getSource() {
return link.getSource();
}
@Override
public T setSource(Source source) {
link.setSource(source);
return self();
}
public int getUnsettled() {
return link.getUnsettled();
}
@Override
public int getQueued() {
return link.getQueued();
}
public boolean advance() {
return link.advance();
}
public int drained() {
int drained = link.drained();
getSession().getConnectionImpl().flush();
return drained;
}
public boolean detached() {
return link.detached();
}
public Delivery delivery(byte[] tag, int offset, int length) {
return link.delivery(tag, offset, length);
}
public Delivery current() {
return link.current();
}
@Override
public T setCondition(ErrorCondition condition) {
link.setCondition(condition);
return self();
}
public Delivery delivery(byte[] tag) {
return link.delivery(tag);
}
@Override
public T open() {
link.open();
getSession().getConnectionImpl().flush();
return self();
}
@Override
public T close() {
link.close();
getSession().getConnectionImpl().flush();
return self();
}
@Override
public T detach() {
link.detach();
getSession().getConnectionImpl().flush();
return self();
}
@Override
public T openHandler(Handler<AsyncResult<T>> openHandler) {
this.openHandler = openHandler;
return self();
}
@Override
public T closeHandler(Handler<AsyncResult<T>> closeHandler) {
this.closeHandler = closeHandler;
return self();
}
@Override
public T detachHandler(Handler<AsyncResult<T>> detachHandler) {
this.detachHandler = detachHandler;
return self();
}
@Override
public boolean isOpen() {
return getLocalState() == EndpointState.ACTIVE;
}
@Override
public ProtonQoS getQoS() {
if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) {
return ProtonQoS.AT_MOST_ONCE;
}
return ProtonQoS.AT_LEAST_ONCE;
}
@Override
public ProtonQoS getRemoteQoS() {
if (link.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED) {
return ProtonQoS.AT_MOST_ONCE;
}
return ProtonQoS.AT_LEAST_ONCE;
}
@Override
public T setQoS(ProtonQoS qos) {
if (qos == null) {
throw new IllegalArgumentException("Value must be specified");
}
switch (qos) {
case AT_MOST_ONCE:
link.setSenderSettleMode(SenderSettleMode.SETTLED);
link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
break;
case AT_LEAST_ONCE:
link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
break;
}
return self();
}
@Override
public UnsignedLong getMaxMessageSize() {
return link.getMaxMessageSize();
}
@Override
public void setMaxMessageSize(UnsignedLong maxMessageSize) {
link.setMaxMessageSize(maxMessageSize);
}
@Override
public UnsignedLong getRemoteMaxMessageSize() {
return link.getRemoteMaxMessageSize();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return link.getRemoteProperties();
}
@Override
public void setProperties(Map<Symbol, Object> properties) {
link.setProperties(properties);
}
@Override
public void setOfferedCapabilities(final Symbol[] capabilities) {
link.setOfferedCapabilities(capabilities);
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return link.getRemoteOfferedCapabilities();
}
@Override
public void setDesiredCapabilities(final Symbol[] capabilities) {
link.setDesiredCapabilities(capabilities);
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return link.getRemoteDesiredCapabilities();
}
@Override
public void free() {
link.free();
getSession().getConnectionImpl().flush();
}
void fireRemoteOpen() {
if (openHandler != null) {
openHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
}
}
void fireRemoteDetach() {
if (detachHandler != null) {
detachHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
}
}
void fireRemoteClose() {
if (closeHandler != null) {
closeHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
}
}
abstract void handleLinkFlow();
}