package io.vertx.rxjava.ext.sql.impl;
import io.vertx.rxjava.ext.sql.SQLConnection;
import rx.Single;
import rx.Single.Transformer;
public class InTransactionSingle<T> implements Transformer<T, T> {
private final SQLConnection sqlConnection;
public InTransactionSingle(SQLConnection sqlConnection) {
this.sqlConnection = sqlConnection;
}
@Override
public Single<T> call(Single<T> upstream) {
return sqlConnection.rxSetAutoCommit(false).toCompletable()
.andThen(upstream)
.flatMap(item -> sqlConnection.rxCommit().toCompletable().andThen(Single.just(item)))
.onErrorResumeNext(throwable -> {
return sqlConnection.rxRollback().toCompletable().onErrorComplete()
.andThen(sqlConnection.rxSetAutoCommit(true).toCompletable().onErrorComplete())
.andThen(Single.error(throwable));
}).flatMap(item -> sqlConnection.rxSetAutoCommit(true).toCompletable().andThen(Single.just(item)));
}
}