/*
* Copyright (c) 2011-2015 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.ext.stomp;
import io.vertx.core.Handler;
import io.vertx.ext.stomp.impl.Transaction;
import io.vertx.ext.stomp.impl.Transactions;
import io.vertx.ext.stomp.utils.Headers;
STOMP compliant actions executed when receiving a SEND
sf.frame(). If the SEND
frame specifies a transaction, the message delivery is postponed until the transaction commit. The handler computes the MESSAGE
frame from the SEND
sf.frame(). It computes a message-id
and ack
id if needed. If requested the RECEIPT
frame is sent once the MESSAGE
frame has been sent to all matching subscriptions. If the SEND
frame requires an acknowledgment, the message-id
is added to the list of messages waiting for acknowledgment.
This handler is thread safe.
Author: Clement Escoffier
/**
* STOMP compliant actions executed when receiving a {@code SEND} sf.frame().
* <p/>
* If the {@code SEND} frame specifies a transaction, the message delivery is postponed until the transaction commit.
* <p/>
* The handler computes the {@code MESSAGE} frame from the {@code SEND} sf.frame(). It computes a {@code message-id} and
* {@code ack} id if needed. If requested the {@code RECEIPT} frame is sent once the {@code MESSAGE} frame has been
* sent to all matching subscriptions.
* <p/>
* If the {@code SEND} frame requires an acknowledgment, the {@code message-id} is added to the list of messages
* waiting for acknowledgment.
* <p/>
* This handler is thread safe.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class DefaultSendHandler implements Handler<ServerFrame> {
@Override
public void handle(ServerFrame sf) {
String destination = sf.frame().getHeader(Frame.DESTINATION);
if (destination == null) {
sf.connection().write(Frames.createErrorFrame(
"Destination header missing",
Headers.create(sf.frame().getHeaders()), "Invalid send frame - the " +
"'destination' must be set"));
sf.connection().close();
return;
}
// Handle transaction
String txId = sf.frame().getHeader(Frame.TRANSACTION);
if (txId != null) {
Transaction transaction = Transactions.instance().getTransaction(sf.connection(), txId);
if (transaction == null) {
// No transaction.
Frame errorFrame = Frames.createErrorFrame(
"No transaction",
Headers.create(Frame.DESTINATION, destination, Frame.TRANSACTION, txId),
"Message delivery failed - unknown transaction id");
sf.connection().write(errorFrame);
sf.connection().close();
return;
} else {
if (!transaction.addFrameToTransaction(sf.frame())) {
// Frame not added to transaction
Frame errorFrame = Frames.createErrorFrame("Frame not added to transaction",
Headers.create(Frame.DESTINATION, destination, Frame.TRANSACTION, txId),
"Message delivery failed - the frame cannot be added to the transaction - the number of allowed thread " +
"may have been reached");
Transactions.instance().unregisterTransactionsFromConnection(sf.connection());
sf.connection().write(errorFrame);
sf.connection().close();
return;
}
Frames.handleReceipt(sf.frame(), sf.connection());
// No delivery in transactions.
return;
}
}
final Destination dest = sf.connection().handler().getDestination(destination);
if (dest == null && sf.connection().server().options().isSendErrorOnNoSubscriptions()) {
Frame errorFrame = Frames.createErrorFrame(
"No subscriptions",
Headers.create(Frame.DESTINATION, destination),
"Message delivery failed - no subscriptions on this destination");
sf.connection().write(errorFrame);
sf.connection().close();
return;
}
if (dest != null) {
if (dest.dispatch(sf.connection(), sf.frame()) == null) {
// Error managed by the destination.
return;
}
}
Frames.handleReceipt(sf.frame(), sf.connection());
}
}