package io.vertx.rxjava.ext.sql.impl;
import io.vertx.rxjava.ext.sql.SQLConnection;
import rx.Observable;
import rx.Observable.Transformer;
public class InTransactionObservable<T> implements Transformer<T, T> {
private final SQLConnection sqlConnection;
public InTransactionObservable(SQLConnection sqlConnection) {
this.sqlConnection = sqlConnection;
}
@Override
public Observable<T> call(Observable<T> upstream) {
return sqlConnection.rxSetAutoCommit(false).toCompletable()
.andThen(upstream)
.concatWith(sqlConnection.rxCommit().toCompletable().toObservable())
.onErrorResumeNext(throwable -> {
return sqlConnection.rxRollback().toCompletable().onErrorComplete()
.andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
.andThen(Observable.error(throwable));
}).concatWith(sqlConnection.rxSetAutoCommit(true).toCompletable().toObservable());
}
}