/*
* Copyright 2018 The Vert.x Community.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.cassandra.impl;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Author: Pavel Drankou, Thomas Segismont
/**
* @author Pavel Drankou
* @author Thomas Segismont
*/
public class ResultSetImpl implements ResultSet {
private final Vertx vertx;
private final AtomicReference<com.datastax.oss.driver.api.core.cql.AsyncResultSet> resultSetRef;
public ResultSetImpl(com.datastax.oss.driver.api.core.cql.AsyncResultSet resultSet, Vertx vertx) {
this.resultSetRef = new AtomicReference<>(resultSet);
this.vertx = vertx;
}
@Override
public ResultSet all(Handler<AsyncResult<List<Row>>> handler) {
loadMore(vertx.getOrCreateContext(), Collections.emptyList(), handler);
return this;
}
@Override
public Future<List<Row>> all() {
Promise<List<Row>> promise = Promise.promise();
all(promise);
return promise.future();
}
@Override
public ColumnDefinitions getColumnDefinitions() {
return resultSetRef.get().getColumnDefinitions();
}
@Override
public ExecutionInfo getExecutionInfo() {
return resultSetRef.get().getExecutionInfo();
}
@Override
public int remaining() {
return resultSetRef.get().remaining();
}
@Override
public Iterable<Row> currentPage() {
return resultSetRef.get().currentPage();
}
@Override
public Row one() {
return resultSetRef.get().one();
}
@Override
public boolean hasMorePages() {
return resultSetRef.get().hasMorePages();
}
@Override
public void fetchNextPage(Handler<AsyncResult<ResultSet>> handler) {
Future<ResultSet> fut = fetchNextPage();
if (handler != null) {
fut.onComplete(handler);
}
}
@Override
public Future<ResultSet> fetchNextPage() throws IllegalStateException {
return Future.fromCompletionStage(
resultSetRef.get().fetchNextPage(),
vertx.getOrCreateContext())
.map(datastaxRS -> {
resultSetRef.set(datastaxRS);
return this;
});
}
@Override
public boolean wasApplied() {
return resultSetRef.get().wasApplied();
}
private void loadMore(Context context, List<Row> loaded, Handler<AsyncResult<List<Row>>> handler) {
int availableWithoutFetching = resultSetRef.get().remaining();
List<Row> rows = new ArrayList<>(loaded.size() + availableWithoutFetching);
rows.addAll(loaded);
for (int i = 0; i < availableWithoutFetching; i++) {
rows.add(resultSetRef.get().one());
}
if (resultSetRef.get().hasMorePages()) {
Future.fromCompletionStage(resultSetRef.get().fetchNextPage(), context).onComplete(ar -> {
if (ar.succeeded()) {
resultSetRef.set(ar.result());
loadMore(context, rows, handler);
} else {
if (handler != null) {
handler.handle(Future.failedFuture(ar.cause()));
}
}
});
} else {
if (handler != null) {
handler.handle(Future.succeededFuture(rows));
}
}
}
}