/*
 * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
 */

package io.vertx.reactivex.ext.sql;

import io.reactivex.*;
import io.vertx.reactivex.ext.sql.impl.*;

import java.util.function.Function;

Utilities for generating observables with a SQLClient.
Author:Thomas Segismont
/** * Utilities for generating observables with a {@link SQLClient}. * * @author Thomas Segismont */
public class SQLClientHelper {
Creates a FlowableTransformer decorating a Flowable with transaction management for a given SQLConnection.

If the upstream Flowable completes (onComplete), the transaction is committed. If the upstream Flowable emits an error (onError), the transaction is rollbacked.

Eventually, the given SQLConnection is put back in autocommit mode.

Params:
  • sqlConnection – the SQLConnection used for database operations and transaction management
Type parameters:
  • <T> – the type of the items emitted by the upstream Flowable
Returns:a FlowableTransformer decorating a Flowable with transaction management
/** * Creates a {@link FlowableTransformer} decorating a {@link Flowable} with transaction management for a given {@link SQLConnection}. * <p> * If the upstream {@link Flowable} completes (<em>onComplete</em>), the transaction is committed. * If the upstream {@link Flowable} emits an error (<em>onError</em>), the transaction is rollbacked. * <p> * Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode. * * @param sqlConnection the {@link SQLConnection} used for database operations and transaction management * @param <T> the type of the items emitted by the upstream {@link Flowable} * @return a {@link FlowableTransformer} decorating a {@link Flowable} with transaction management */
public static <T> FlowableTransformer<T, T> txFlowableTransformer(SQLConnection sqlConnection) { return new InTransactionFlowable<>(sqlConnection); }
Generates a Flowable from SQLConnection operations executed inside a transaction.
Params:
Type parameters:
  • <T> – the type of the items emitted by the Flowable
Returns:a Flowable generated from SQLConnection operations executed inside a transaction
/** * Generates a {@link Flowable} from {@link SQLConnection} operations executed inside a transaction. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Flowable} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the items emitted by the {@link Flowable} * @return a {@link Flowable} generated from {@link SQLConnection} operations executed inside a transaction */
public static <T> Flowable<T> inTransactionFlowable(SQLClient client, Function<SQLConnection, Flowable<T>> sourceSupplier) { return usingConnectionFlowable(client, conn -> sourceSupplier.apply(conn).compose(txFlowableTransformer(conn))); }
Creates a ObservableTransformer decorating an Observable with transaction management for a given SQLConnection.

If the upstream Observable completes (onComplete), the transaction is committed. If the upstream Observable emits an error (onError), the transaction is rollbacked.

Eventually, the given SQLConnection is put back in autocommit mode.

Params:
  • sqlConnection – the SQLConnection used for database operations and transaction management
Type parameters:
  • <T> – the type of the items emitted by the upstream Observable
Returns:a ObservableTransformer decorating an Observable with transaction management
/** * Creates a {@link ObservableTransformer} decorating an {@link Observable} with transaction management for a given {@link SQLConnection}. * <p> * If the upstream {@link Observable} completes (<em>onComplete</em>), the transaction is committed. * If the upstream {@link Observable} emits an error (<em>onError</em>), the transaction is rollbacked. * <p> * Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode. * * @param sqlConnection the {@link SQLConnection} used for database operations and transaction management * @param <T> the type of the items emitted by the upstream {@link Observable} * @return a {@link ObservableTransformer} decorating an {@link Observable} with transaction management */
public static <T> ObservableTransformer<T, T> txObservableTransformer(SQLConnection sqlConnection) { return new InTransactionObservable<>(sqlConnection); }
Generates a Observable from SQLConnection operations executed inside a transaction.
Params:
Type parameters:
  • <T> – the type of the items emitted by the Observable
Returns:an Observable generated from SQLConnection operations executed inside a transaction
/** * Generates a {@link Observable} from {@link SQLConnection} operations executed inside a transaction. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the items emitted by the {@link Observable} * @return an {@link Observable} generated from {@link SQLConnection} operations executed inside a transaction */
public static <T> Observable<T> inTransactionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) { return usingConnectionObservable(client, conn -> sourceSupplier.apply(conn).compose(txObservableTransformer(conn))); }
Creates a SingleTransformer decorating a Single with transaction management for a given SQLConnection.

If the upstream Single emits a value (onSuccess), the transaction is committed. If the upstream Single emits an error (onError), the transaction is rollbacked.

Eventually, the given SQLConnection is put back in autocommit mode.

Params:
  • sqlConnection – the SQLConnection used for database operations and transaction management
Type parameters:
  • <T> – the type of the item emitted by the upstream Single
Returns:a SingleTransformer decorating a Single with transaction management
/** * Creates a {@link SingleTransformer} decorating a {@link Single} with transaction management for a given {@link SQLConnection}. * <p> * If the upstream {@link Single} emits a value (<em>onSuccess</em>), the transaction is committed. * If the upstream {@link Single} emits an error (<em>onError</em>), the transaction is rollbacked. * <p> * Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode. * * @param sqlConnection the {@link SQLConnection} used for database operations and transaction management * @param <T> the type of the item emitted by the upstream {@link Single} * @return a {@link SingleTransformer} decorating a {@link Single} with transaction management */
public static <T> SingleTransformer<T, T> txSingleTransformer(SQLConnection sqlConnection) { return new InTransactionSingle<>(sqlConnection); }
Generates a Single from SQLConnection operations executed inside a transaction.
Params:
  • client – the SQLClient
  • sourceSupplier – a user-provided function returning a Single generated by interacting with the given SQLConnection
Type parameters:
  • <T> – the type of the item emitted by the Single
Returns:a Single generated from SQLConnection operations executed inside a transaction
/** * Generates a {@link Single} from {@link SQLConnection} operations executed inside a transaction. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the item emitted by the {@link Single} * @return a {@link Single} generated from {@link SQLConnection} operations executed inside a transaction */
public static <T> Single<T> inTransactionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) { return usingConnectionSingle(client, conn -> sourceSupplier.apply(conn).compose(txSingleTransformer(conn))); }
Creates a MaybeTransformer decorating a Maybe with transaction management for a given SQLConnection.

If the upstream Maybe emits a value (onSuccess) or completes (onComplete), the transaction is committed. If the upstream Maybe emits an error (onError), the transaction is rollbacked.

Eventually, the given SQLConnection is put back in autocommit mode.

Params:
  • sqlConnection – the SQLConnection used for database operations and transaction management
Type parameters:
  • <T> – the type of the item emitted by the upstream Maybe
Returns:a MaybeTransformer decorating a Maybe with transaction management
/** * Creates a {@link MaybeTransformer} decorating a {@link Maybe} with transaction management for a given {@link SQLConnection}. * <p> * If the upstream {@link Maybe} emits a value (<em>onSuccess</em>) or completes (<em>onComplete</em>), the transaction is committed. * If the upstream {@link Maybe} emits an error (<em>onError</em>), the transaction is rollbacked. * <p> * Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode. * * @param sqlConnection the {@link SQLConnection} used for database operations and transaction management * @param <T> the type of the item emitted by the upstream {@link Maybe} * @return a {@link MaybeTransformer} decorating a {@link Maybe} with transaction management */
public static <T> MaybeTransformer<T, T> txMaybeTransformer(SQLConnection sqlConnection) { return new InTransactionMaybe<>(sqlConnection); }
Generates a Maybe from SQLConnection operations executed inside a transaction.
Params:
  • client – the SQLClient
  • sourceSupplier – a user-provided function returning a Maybe generated by interacting with the given SQLConnection
Type parameters:
  • <T> – the type of the item emitted by the Maybe
Returns:a Maybe generated from SQLConnection operations executed inside a transaction
/** * Generates a {@link Maybe} from {@link SQLConnection} operations executed inside a transaction. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Maybe} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the item emitted by the {@link Maybe} * @return a {@link Maybe} generated from {@link SQLConnection} operations executed inside a transaction */
public static <T> Maybe<T> inTransactionMaybe(SQLClient client, Function<SQLConnection, Maybe<T>> sourceSupplier) { return usingConnectionMaybe(client, conn -> sourceSupplier.apply(conn).compose(txMaybeTransformer(conn))); }
Creates a CompletableTransformer decorating a Completable with transaction management for a given SQLConnection.

If the upstream Completable completes (onComplete), the transaction is committed. If the upstream Completable emits an error (onError), the transaction is rollbacked.

Eventually, the given SQLConnection is put back in autocommit mode.

Params:
  • sqlConnection – the SQLConnection used for database operations and transaction management
Returns:a CompletableTransformer decorating a Completable with transaction management
/** * Creates a {@link CompletableTransformer} decorating a {@link Completable} with transaction management for a given {@link SQLConnection}. * <p> * If the upstream {@link Completable} completes (<em>onComplete</em>), the transaction is committed. * If the upstream {@link Completable} emits an error (<em>onError</em>), the transaction is rollbacked. * <p> * Eventually, the given {@link SQLConnection} is put back in <em>autocommit</em> mode. * * @param sqlConnection the {@link SQLConnection} used for database operations and transaction management * @return a {@link CompletableTransformer} decorating a {@link Completable} with transaction management */
public static CompletableTransformer txCompletableTransformer(SQLConnection sqlConnection) { return new InTransactionCompletable(sqlConnection); }
Generates a Completable from SQLConnection operations executed inside a transaction.
Params:
Returns:a Completable generated from SQLConnection operations executed inside a transaction
/** * Generates a {@link Completable} from {@link SQLConnection} operations executed inside a transaction. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Completable} generated by interacting with the given {@link SQLConnection} * @return a {@link Completable} generated from {@link SQLConnection} operations executed inside a transaction */
public static Completable inTransactionCompletable(SQLClient client, Function<SQLConnection, Completable> sourceSupplier) { return usingConnectionCompletable(client, conn -> sourceSupplier.apply(conn).compose(txCompletableTransformer(conn))); }
Generates a Flowable from SQLConnection operations.
Params:
Type parameters:
  • <T> – the type of the items emitted by the Flowable
Returns:a Flowable generated from SQLConnection operations
/** * Generates a {@link Flowable} from {@link SQLConnection} operations. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Flowable} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the items emitted by the {@link Flowable} * @return a {@link Flowable} generated from {@link SQLConnection} operations */
public static <T> Flowable<T> usingConnectionFlowable(SQLClient client, Function<SQLConnection, Flowable<T>> sourceSupplier) { return client.rxGetConnection().flatMapPublisher(conn -> { return sourceSupplier.apply(conn).doFinally(conn::close); }); }
Generates a Observable from SQLConnection operations.
Params:
Type parameters:
  • <T> – the type of the items emitted by the Observable
Returns:an Observable generated from SQLConnection operations
/** * Generates a {@link Observable} from {@link SQLConnection} operations. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Observable} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the items emitted by the {@link Observable} * @return an {@link Observable} generated from {@link SQLConnection} operations */
public static <T> Observable<T> usingConnectionObservable(SQLClient client, Function<SQLConnection, Observable<T>> sourceSupplier) { return client.rxGetConnection().flatMapObservable(conn -> { return sourceSupplier.apply(conn).doFinally(conn::close); }); }
Generates a Single from SQLConnection operations.
Params:
  • client – the SQLClient
  • sourceSupplier – a user-provided function returning a Single generated by interacting with the given SQLConnection
Type parameters:
  • <T> – the type of the item emitted by the Single
Returns:a Single generated from SQLConnection operations
/** * Generates a {@link Single} from {@link SQLConnection} operations. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Single} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the item emitted by the {@link Single} * @return a {@link Single} generated from {@link SQLConnection} operations */
public static <T> Single<T> usingConnectionSingle(SQLClient client, Function<SQLConnection, Single<T>> sourceSupplier) { return client.rxGetConnection().flatMap(conn -> { return sourceSupplier.apply(conn).doFinally(conn::close); }); }
Generates a Maybe from SQLConnection operations.
Params:
  • client – the SQLClient
  • sourceSupplier – a user-provided function returning a Maybe generated by interacting with the given SQLConnection
Type parameters:
  • <T> – the type of the item emitted by the Maybe
Returns:a Maybe generated from SQLConnection operations
/** * Generates a {@link Maybe} from {@link SQLConnection} operations. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Maybe} generated by interacting with the given {@link SQLConnection} * @param <T> the type of the item emitted by the {@link Maybe} * @return a {@link Maybe} generated from {@link SQLConnection} operations */
public static <T> Maybe<T> usingConnectionMaybe(SQLClient client, Function<SQLConnection, Maybe<T>> sourceSupplier) { return client.rxGetConnection().flatMapMaybe(conn -> { return sourceSupplier.apply(conn).doFinally(conn::close); }); }
Generates a Completable from SQLConnection operations.
Params:
Returns:a Completable generated from SQLConnection operations
/** * Generates a {@link Completable} from {@link SQLConnection} operations. * * @param client the {@link SQLClient} * @param sourceSupplier a user-provided function returning a {@link Completable} generated by interacting with the given {@link SQLConnection} * @return a {@link Completable} generated from {@link SQLConnection} operations */
public static Completable usingConnectionCompletable(SQLClient client, Function<SQLConnection, Completable> sourceSupplier) { return client.rxGetConnection().flatMapCompletable(conn -> { return sourceSupplier.apply(conn).doFinally(conn::close); }); } private SQLClientHelper() { // Utility } }