package io.vertx.rxjava.ext.sql.impl;
import io.vertx.rxjava.ext.sql.SQLConnection;
import rx.Completable;
import rx.Completable.Transformer;
public class InTransactionCompletable implements Transformer {
private final SQLConnection sqlConnection;
public InTransactionCompletable(SQLConnection sqlConnection) {
this.sqlConnection = sqlConnection;
}
@Override
public Completable call(Completable upstream) {
return sqlConnection.rxSetAutoCommit(false).toCompletable()
.andThen(upstream)
.andThen(sqlConnection.rxCommit().toCompletable())
.onErrorResumeNext(throwable -> {
return sqlConnection.rxRollback().toCompletable().onErrorComplete()
.andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
.andThen(Completable.error(throwable));
}).andThen(sqlConnection.rxSetAutoCommit(true).toCompletable());
}
}