package io.vertx.ext.mongo.impl;
import com.mongodb.async.AsyncBatchCursor;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoIterable;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
class MongoIterableStream implements ReadStream<JsonObject> {
private final Context context;
private final MongoIterable<JsonObject> mongoIterable;
private final int batchSize;
private AsyncBatchCursor<JsonObject> batchCursor;
private InboundBuffer<JsonObject> queue;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private boolean closed;
MongoIterableStream(Context context, MongoIterable<JsonObject> mongoIterable, int batchSize) {
this.context = context;
this.mongoIterable = mongoIterable;
this.batchSize = batchSize;
this.queue = new InboundBuffer<>(context);
queue.drainHandler(v -> doRead());
}
@Override
public synchronized MongoIterableStream exceptionHandler(Handler<Throwable> handler) {
this.exceptionHandler = handler;
return this;
}
@Override
public synchronized MongoIterableStream handler(Handler<JsonObject> handler) {
queue.handler(handler);
if (handler == null) {
close();
} else {
SingleResultCallback<AsyncBatchCursor<JsonObject>> callback = (result, t) -> {
context.runOnContext(v -> {
synchronized (this) {
if (t != null) {
close();
handleException(t);
} else {
batchCursor = result;
batchCursor.setBatchSize(batchSize);
if (!closed) {
doRead();
}
}
}
});
};
try {
mongoIterable.batchCursor(callback);
} catch (Exception e) {
close();
handleException(e);
}
}
return this;
}
@Override
public MongoIterableStream pause() {
synchronized (this) {
if (closed) {
return this;
}
}
queue.pause();
return this;
}
@Override
public MongoIterableStream resume() {
synchronized (this) {
if (closed) {
return this;
}
}
queue.resume();
return this;
}
@Override
public ReadStream<JsonObject> fetch(long amount) {
synchronized (this) {
if (closed) {
return this;
}
}
queue.fetch(amount);
return this;
}
private synchronized void doRead() {
context.<List<JsonObject>>executeBlocking(fut -> {
batchCursor.next((result, t) -> {
if (t != null) {
fut.fail(t);
} else {
fut.complete(result);
}
});
}, true, ar -> {
synchronized (this) {
if (ar.succeeded()) {
List<JsonObject> list = ar.result();
if (list != null) {
if (queue.write(list)) {
doRead();
}
} else {
close();
if (endHandler != null) {
endHandler.handle(null);
}
}
} else {
close();
handleException(ar.cause());
}
}
});
}
private void handleException(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}
@Override
public synchronized MongoIterableStream endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
private void close() {
if (closed) {
return;
}
closed = true;
AtomicReference<AsyncBatchCursor> cursorRef = new AtomicReference<>();
context.executeBlocking(fut -> {
synchronized (this) {
cursorRef.set(batchCursor);
}
AsyncBatchCursor cursor = cursorRef.get();
if (cursor != null) {
cursor.close();
}
fut.complete();
}, false, null);
}
}