/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.reactivex.ext.sql;
import io.reactivex.*;
import io.vertx.reactivex.ext.sql.impl.*;
import java.util.function.Function;
Utilities for generating observables with a SQLClient
. Author: Thomas Segismont
/**
* Utilities for generating observables with a {@link SQLClient}.
*
* @author Thomas Segismont
*/
public class SQLClientHelper {
Creates a FlowableTransformer
decorating a Flowable
with transaction management for a given SQLConnection
. If the upstream Flowable
completes (onComplete), the transaction is committed. If the upstream Flowable
emits an error (onError), the transaction is rollbacked.
Eventually, the given SQLConnection
is put back in autocommit mode.
Params: - sqlConnection – the
SQLConnection
used for database operations and transaction management
Type parameters: Returns: a FlowableTransformer
decorating a Flowable
with transaction management
/**
* Creates a {@link FlowableTransformer} decorating a {@link Flowable} with transaction management for a given {@link SQLConnection}.
* <p>
* If the upstream {@link Flowable} completes (<em>onComplete</em>), the transaction is committed.
* If the upstream {@link Flowable} emits an error (<em>onError</em>), the transaction is rollbacked.
* <p>
* Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode.
*
* @param sqlConnection the {@link SQLConnection} used for database operations and transaction management
* @param <T> the type of the items emitted by the upstream {@link Flowable}
* @return a {@link FlowableTransformer} decorating a {@link Flowable} with transaction management
*/
public static <T> FlowableTransformer<T, T> txFlowableTransformer(SQLConnection sqlConnection) {
return new InTransactionFlowable<>(sqlConnection);
}
Generates a Flowable
from SQLConnection
operations executed inside a transaction. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Flowable
generated by interacting with the given SQLConnection
Type parameters: Returns: a Flowable
generated from SQLConnection
operations executed inside a transaction
/**
* Generates a {@link Flowable} from {@link SQLConnection} operations executed inside a transaction.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Flowable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Flowable}
* @return a {@link Flowable} generated from {@link SQLConnection} operations executed inside a transaction
*/
public static <T> Flowable<T> inTransactionFlowable(SQLClient client, Function<SQLConnection, Flowable<T>> sourceSupplier) {
return usingConnectionFlowable(client, conn -> sourceSupplier.apply(conn).compose(txFlowableTransformer(conn)));
}
Creates a ObservableTransformer
decorating an Observable
with transaction management for a given SQLConnection
. If the upstream Observable
completes (onComplete), the transaction is committed. If the upstream Observable
emits an error (onError), the transaction is rollbacked.
Eventually, the given SQLConnection
is put back in autocommit mode.
Params: - sqlConnection – the
SQLConnection
used for database operations and transaction management
Type parameters: - <T> – the type of the items emitted by the upstream
Observable
Returns: a ObservableTransformer
decorating an Observable
with transaction management
/**
* Creates a {@link ObservableTransformer} decorating an {@link Observable} with transaction management for a given {@link SQLConnection}.
* <p>
* If the upstream {@link Observable} completes (<em>onComplete</em>), the transaction is committed.
* If the upstream {@link Observable} emits an error (<em>onError</em>), the transaction is rollbacked.
* <p>
* Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode.
*
* @param sqlConnection the {@link SQLConnection} used for database operations and transaction management
* @param <T> the type of the items emitted by the upstream {@link Observable}
* @return a {@link ObservableTransformer} decorating an {@link Observable} with transaction management
*/
public static <T> ObservableTransformer<T, T> txObservableTransformer(SQLConnection sqlConnection) {
return new InTransactionObservable<>(sqlConnection);
}
Generates a Observable
from SQLConnection
operations executed inside a transaction. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Observable
generated by interacting with the given SQLConnection
Type parameters: - <T> – the type of the items emitted by the
Observable
Returns: an Observable
generated from SQLConnection
operations executed inside a transaction
/**
* Generates a {@link Observable} from {@link SQLConnection} operations executed inside a transaction.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Observable}
* @return an {@link Observable} generated from {@link SQLConnection} operations executed inside a transaction
*/
public static <T> Observable<T> inTransactionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) {
return usingConnectionObservable(client, conn -> sourceSupplier.apply(conn).compose(txObservableTransformer(conn)));
}
Creates a SingleTransformer
decorating a Single
with transaction management for a given SQLConnection
. If the upstream Single
emits a value (onSuccess), the transaction is committed. If the upstream Single
emits an error (onError), the transaction is rollbacked.
Eventually, the given SQLConnection
is put back in autocommit mode.
Params: - sqlConnection – the
SQLConnection
used for database operations and transaction management
Type parameters: Returns: a SingleTransformer
decorating a Single
with transaction management
/**
* Creates a {@link SingleTransformer} decorating a {@link Single} with transaction management for a given {@link SQLConnection}.
* <p>
* If the upstream {@link Single} emits a value (<em>onSuccess</em>), the transaction is committed.
* If the upstream {@link Single} emits an error (<em>onError</em>), the transaction is rollbacked.
* <p>
* Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode.
*
* @param sqlConnection the {@link SQLConnection} used for database operations and transaction management
* @param <T> the type of the item emitted by the upstream {@link Single}
* @return a {@link SingleTransformer} decorating a {@link Single} with transaction management
*/
public static <T> SingleTransformer<T, T> txSingleTransformer(SQLConnection sqlConnection) {
return new InTransactionSingle<>(sqlConnection);
}
Generates a Single
from SQLConnection
operations executed inside a transaction. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Single
generated by interacting with the given SQLConnection
Type parameters: Returns: a Single
generated from SQLConnection
operations executed inside a transaction
/**
* Generates a {@link Single} from {@link SQLConnection} operations executed inside a transaction.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the item emitted by the {@link Single}
* @return a {@link Single} generated from {@link SQLConnection} operations executed inside a transaction
*/
public static <T> Single<T> inTransactionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) {
return usingConnectionSingle(client, conn -> sourceSupplier.apply(conn).compose(txSingleTransformer(conn)));
}
Creates a MaybeTransformer
decorating a Maybe
with transaction management for a given SQLConnection
. If the upstream Maybe
emits a value (onSuccess) or completes (onComplete), the transaction is committed. If the upstream Maybe
emits an error (onError), the transaction is rollbacked.
Eventually, the given SQLConnection
is put back in autocommit mode.
Params: - sqlConnection – the
SQLConnection
used for database operations and transaction management
Type parameters: Returns: a MaybeTransformer
decorating a Maybe
with transaction management
/**
* Creates a {@link MaybeTransformer} decorating a {@link Maybe} with transaction management for a given {@link SQLConnection}.
* <p>
* If the upstream {@link Maybe} emits a value (<em>onSuccess</em>) or completes (<em>onComplete</em>), the transaction is committed.
* If the upstream {@link Maybe} emits an error (<em>onError</em>), the transaction is rollbacked.
* <p>
* Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode.
*
* @param sqlConnection the {@link SQLConnection} used for database operations and transaction management
* @param <T> the type of the item emitted by the upstream {@link Maybe}
* @return a {@link MaybeTransformer} decorating a {@link Maybe} with transaction management
*/
public static <T> MaybeTransformer<T, T> txMaybeTransformer(SQLConnection sqlConnection) {
return new InTransactionMaybe<>(sqlConnection);
}
Generates a Maybe
from SQLConnection
operations executed inside a transaction. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Maybe
generated by interacting with the given SQLConnection
Type parameters: Returns: a Maybe
generated from SQLConnection
operations executed inside a transaction
/**
* Generates a {@link Maybe} from {@link SQLConnection} operations executed inside a transaction.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Maybe} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the item emitted by the {@link Maybe}
* @return a {@link Maybe} generated from {@link SQLConnection} operations executed inside a transaction
*/
public static <T> Maybe<T> inTransactionMaybe(SQLClient client, Function<SQLConnection, Maybe<T>> sourceSupplier) {
return usingConnectionMaybe(client, conn -> sourceSupplier.apply(conn).compose(txMaybeTransformer(conn)));
}
Creates a CompletableTransformer
decorating a Completable
with transaction management for a given SQLConnection
. If the upstream Completable
completes (onComplete), the transaction is committed. If the upstream Completable
emits an error (onError), the transaction is rollbacked.
Eventually, the given SQLConnection
is put back in autocommit mode.
Params: - sqlConnection – the
SQLConnection
used for database operations and transaction management
Returns: a CompletableTransformer
decorating a Completable
with transaction management
/**
* Creates a {@link CompletableTransformer} decorating a {@link Completable} with transaction management for a given {@link SQLConnection}.
* <p>
* If the upstream {@link Completable} completes (<em>onComplete</em>), the transaction is committed.
* If the upstream {@link Completable} emits an error (<em>onError</em>), the transaction is rollbacked.
* <p>
* Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode.
*
* @param sqlConnection the {@link SQLConnection} used for database operations and transaction management
* @return a {@link CompletableTransformer} decorating a {@link Completable} with transaction management
*/
public static CompletableTransformer txCompletableTransformer(SQLConnection sqlConnection) {
return new InTransactionCompletable(sqlConnection);
}
Generates a Completable
from SQLConnection
operations executed inside a transaction. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Completable
generated by interacting with the given SQLConnection
Returns: a Completable
generated from SQLConnection
operations executed inside a transaction
/**
* Generates a {@link Completable} from {@link SQLConnection} operations executed inside a transaction.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Completable} generated by interacting with the given {@link SQLConnection}
* @return a {@link Completable} generated from {@link SQLConnection} operations executed inside a transaction
*/
public static Completable inTransactionCompletable(SQLClient client, Function<SQLConnection, Completable> sourceSupplier) {
return usingConnectionCompletable(client, conn -> sourceSupplier.apply(conn).compose(txCompletableTransformer(conn)));
}
Generates a Flowable
from SQLConnection
operations. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Flowable
generated by interacting with the given SQLConnection
Type parameters: Returns: a Flowable
generated from SQLConnection
operations
/**
* Generates a {@link Flowable} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Flowable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Flowable}
* @return a {@link Flowable} generated from {@link SQLConnection} operations
*/
public static <T> Flowable<T> usingConnectionFlowable(SQLClient client, Function<SQLConnection, Flowable<T>> sourceSupplier) {
return client.rxGetConnection().flatMapPublisher(conn -> {
return sourceSupplier.apply(conn).doFinally(conn::close);
});
}
Generates a Observable
from SQLConnection
operations. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Observable
generated by interacting with the given SQLConnection
Type parameters: - <T> – the type of the items emitted by the
Observable
Returns: an Observable
generated from SQLConnection
operations
/**
* Generates a {@link Observable} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the items emitted by the {@link Observable}
* @return an {@link Observable} generated from {@link SQLConnection} operations
*/
public static <T> Observable<T> usingConnectionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) {
return client.rxGetConnection().flatMapObservable(conn -> {
return sourceSupplier.apply(conn).doFinally(conn::close);
});
}
Generates a Single
from SQLConnection
operations. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Single
generated by interacting with the given SQLConnection
Type parameters: Returns: a Single
generated from SQLConnection
operations
/**
* Generates a {@link Single} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the item emitted by the {@link Single}
* @return a {@link Single} generated from {@link SQLConnection} operations
*/
public static <T> Single<T> usingConnectionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) {
return client.rxGetConnection().flatMap(conn -> {
return sourceSupplier.apply(conn).doFinally(conn::close);
});
}
Generates a Maybe
from SQLConnection
operations. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Maybe
generated by interacting with the given SQLConnection
Type parameters: Returns: a Maybe
generated from SQLConnection
operations
/**
* Generates a {@link Maybe} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Maybe} generated by interacting with the given {@link SQLConnection}
* @param <T> the type of the item emitted by the {@link Maybe}
* @return a {@link Maybe} generated from {@link SQLConnection} operations
*/
public static <T> Maybe<T> usingConnectionMaybe(SQLClient client, Function<SQLConnection, Maybe<T>> sourceSupplier) {
return client.rxGetConnection().flatMapMaybe(conn -> {
return sourceSupplier.apply(conn).doFinally(conn::close);
});
}
Generates a Completable
from SQLConnection
operations. Params: - client – the
SQLClient
- sourceSupplier – a user-provided function returning a
Completable
generated by interacting with the given SQLConnection
Returns: a Completable
generated from SQLConnection
operations
/**
* Generates a {@link Completable} from {@link SQLConnection} operations.
*
* @param client the {@link SQLClient}
* @param sourceSupplier a user-provided function returning a {@link Completable} generated by interacting with the given {@link SQLConnection}
* @return a {@link Completable} generated from {@link SQLConnection} operations
*/
public static Completable usingConnectionCompletable(SQLClient client, Function<SQLConnection, Completable> sourceSupplier) {
return client.rxGetConnection().flatMapCompletable(conn -> {
return sourceSupplier.apply(conn).doFinally(conn::close);
});
}
private SQLClientHelper() {
// Utility
}
}