/*
 * Copyright 2018 The Vert.x Community.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.impl.InboundBuffer;

Author:Pavel Drankou, Thomas Segismont
/** * @author Pavel Drankou * @author Thomas Segismont */
public class CassandraRowStreamImpl implements CassandraRowStream { private enum State { IDLE, STARTED, EXHAUSTED, STOPPED } private final Context context; private final ResultSet resultSet; private final InboundBuffer<Row> internalQueue; private State state; private int inFlight; private Handler<Row> handler; private Handler<Throwable> exceptionHandler; private Handler<Void> endHandler; public CassandraRowStreamImpl(Context context, ResultSet resultSet) { this.context = context; this.resultSet = resultSet; internalQueue = new InboundBuffer<Row>(context) .exceptionHandler(this::handleException) .drainHandler(v -> fetchRow()); state = State.IDLE; } @Override public synchronized CassandraRowStream exceptionHandler(Handler<Throwable> handler) { if (state != State.STOPPED) { exceptionHandler = handler; } return this; } @Override public synchronized CassandraRowStream handler(Handler<Row> handler) { if (state == State.STOPPED) { return this; } if (handler == null) { stop(); context.runOnContext(v -> handleEnd()); } else { this.handler = handler; internalQueue.handler(this::handleRow); if (state == State.IDLE) { state = State.STARTED; context.runOnContext(v -> fetchRow()); } } return this; } @Override public synchronized CassandraRowStream pause() { if (state != State.STOPPED) { internalQueue.pause(); } return this; } @Override public synchronized CassandraRowStream resume() { if (state != State.STOPPED) { internalQueue.resume(); } return this; } @Override public synchronized CassandraRowStream endHandler(Handler<Void> handler) { if (state != State.STOPPED) { endHandler = handler; } return this; } @Override public synchronized CassandraRowStream fetch(long l) { if (state != State.STOPPED) { internalQueue.fetch(l); } return this; } @Override public ExecutionInfo executionInfo() { return resultSet.getExecutionInfo(); } @Override public ColumnDefinitions columnDefinitions(){ return resultSet.getColumnDefinitions(); } private synchronized void fetchRow() { if (state == State.STOPPED) { return; } if (resultSet.remaining() > 0) { handleFetched(resultSet.one()); } else { if (resultSet.hasMorePages()) { resultSet.fetchNextPage().map(rs -> resultSet.one()) .onComplete(event -> { if (event.succeeded()) { handleFetched(event.result()); } else { handleException(event.cause()); } }); } else { // last row handleFetched(null); } } } private synchronized void handleFetched(Row row) { if (state == State.STOPPED) { return; } if (row != null) { inFlight++; if (internalQueue.write(row)) { context.runOnContext(v -> fetchRow()); } } else { state = State.EXHAUSTED; if (inFlight == 0) { stop(); handleEnd(); } } } private void handleRow(Row row) { synchronized (this) { if (state == State.STOPPED) { return; } inFlight--; } handler.handle(row); synchronized (this) { if (state == State.EXHAUSTED && inFlight == 0) { stop(); handleEnd(); } } } private void handleException(Throwable cause) { Handler<Throwable> h; synchronized (this) { if (state != State.STOPPED) { stop(); h = exceptionHandler; } else { h = null; } } if (h != null) { h.handle(cause); } } private synchronized void handleEnd() { Handler<Void> h; synchronized (this) { h = endHandler; } if (h != null) { h.handle(null); } } private synchronized void stop() { state = State.STOPPED; internalQueue.handler(null).drainHandler(null); } }