package org.springframework.transaction.reactive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.transaction.ReactiveTransaction;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.util.Assert;
final class TransactionalOperatorImpl implements TransactionalOperator {
private static final Log logger = LogFactory.getLog(TransactionalOperatorImpl.class);
private final ReactiveTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition;
TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
Assert.notNull(transactionManager, "TransactionDefinition must not be null");
this.transactionManager = transactionManager;
this.transactionDefinition = transactionDefinition;
}
public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}
@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
return status.flatMapMany(it -> Flux
.usingWhen(
Mono.just(it),
action::doInTransaction,
this.transactionManager::commit,
(tx, ex) -> Mono.empty(),
this.transactionManager::rollback)
.onErrorResume(ex ->
rollbackOnException(it, ex).then(Mono.error(ex))));
})
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
private Mono<Void> rollbackOnException(ReactiveTransaction status, Throwable ex) throws TransactionException {
logger.debug("Initiating transaction rollback on application exception", ex);
return this.transactionManager.rollback(status).onErrorMap(ex2 -> {
logger.error("Application exception overridden by rollback exception", ex);
if (ex2 instanceof TransactionSystemException) {
((TransactionSystemException) ex2).initApplicationException(ex);
}
return ex2;
}
);
}
@Override
public boolean equals(@Nullable Object other) {
return (this == other || (super.equals(other) && (!(other instanceof TransactionalOperatorImpl) ||
getTransactionManager() == ((TransactionalOperatorImpl) other).getTransactionManager())));
}
@Override
public int hashCode() {
return getTransactionManager().hashCode();
}
}