package io.vertx.ext.mongo.impl;
import com.mongodb.client.gridfs.model.GridFSDownloadOptions;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.mongo.*;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.vertx.ext.mongo.impl.Utils.setHandler;
import static java.util.Objects.requireNonNull;
public class MongoGridFsClientImpl implements MongoGridFsClient {
private final GridFSBucket bucket;
private final MongoClientImpl clientImpl;
private final VertxInternal vertx;
public MongoGridFsClientImpl(VertxInternal vertx, MongoClientImpl mongoClient, GridFSBucket gridFSBucket) {
this.vertx = vertx;
this.clientImpl = mongoClient;
this.bucket = gridFSBucket;
}
@Override
public MongoGridFsClient uploadByFileName(ReadStream<Buffer> stream, String fileName, Handler<AsyncResult<String>> resultHandler) {
Future<String> future = uploadByFileName(stream, fileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<String> uploadByFileName(ReadStream<Buffer> stream, String fileName) {
GridFSReadStreamPublisher publisher = new GridFSReadStreamPublisher(stream);
Promise<ObjectId> promise = vertx.promise();
bucket.uploadFromPublisher(fileName, publisher).subscribe(new SingleResultSubscriber<>(promise));
return promise.future().map(ObjectId::toHexString);
}
@Override
public MongoGridFsClient uploadByFileNameWithOptions(ReadStream<Buffer> stream, String fileName, GridFsUploadOptions options, Handler<AsyncResult<String>> resultHandler) {
Future<String> future = uploadByFileNameWithOptions(stream, fileName, options);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<String> uploadByFileNameWithOptions(ReadStream<Buffer> stream, String fileName, GridFsUploadOptions options) {
GridFSUploadOptions uploadOptions = new GridFSUploadOptions();
uploadOptions.chunkSizeBytes(options.getChunkSizeBytes());
if (options.getMetadata() != null) uploadOptions.metadata(new Document(options.getMetadata().getMap()));
GridFSReadStreamPublisher publisher = new GridFSReadStreamPublisher(stream);
Promise<ObjectId> promise = vertx.promise();
bucket.uploadFromPublisher(fileName, publisher, uploadOptions).subscribe(new SingleResultSubscriber<>(promise));
return promise.future().map(ObjectId::toHexString);
}
@Override
public MongoGridFsClient uploadFile(String fileName, Handler<AsyncResult<String>> resultHandler) {
Future<String> future = uploadFile(fileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<String> uploadFile(String fileName) {
requireNonNull(fileName, "fileName cannot be null");
return uploadFileWithOptions(fileName, null);
}
@Override
public MongoGridFsClient uploadFileWithOptions(String fileName, GridFsUploadOptions options, Handler<AsyncResult<String>> resultHandler) {
Future<String> future = uploadFileWithOptions(fileName, options);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<String> uploadFileWithOptions(String fileName, GridFsUploadOptions options) {
requireNonNull(fileName, "fileName cannot be null");
OpenOptions openOptions = new OpenOptions().setRead(true);
return vertx.fileSystem().open(fileName, openOptions)
.flatMap(file -> {
GridFSReadStreamPublisher publisher = new GridFSReadStreamPublisher(file);
Promise<ObjectId> promise = vertx.promise();
if (options == null) {
bucket.uploadFromPublisher(fileName, publisher).subscribe(new SingleResultSubscriber<>(promise));
} else {
GridFSUploadOptions uploadOptions = new GridFSUploadOptions();
uploadOptions.chunkSizeBytes(options.getChunkSizeBytes());
if (options.getMetadata() != null) {
uploadOptions.metadata(new Document(options.getMetadata().getMap()));
}
bucket.uploadFromPublisher(fileName, publisher, uploadOptions).subscribe(new SingleResultSubscriber<>(promise));
}
return promise.future().map(ObjectId::toHexString);
});
}
@Override
public void close() {
}
@Override
public MongoGridFsClient delete(String id, Handler<AsyncResult<Void>> resultHandler) {
Future<Void> future = delete(id);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Void> delete(String id) {
requireNonNull(id, "id cannot be null");
ObjectId objectId = new ObjectId(id);
Promise<Void> promise = vertx.promise();
bucket.delete(objectId).subscribe(new CompletionSubscriber<>(promise));
return promise.future();
}
@Override
public MongoGridFsClient downloadByFileName(WriteStream<Buffer> stream, String fileName, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadByFileName(stream, fileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadByFileName(WriteStream<Buffer> stream, String fileName) {
GridFSDownloadPublisher publisher = bucket.downloadToPublisher(fileName);
return handleDownload(publisher, stream);
}
@Override
public MongoGridFsClient downloadByFileNameWithOptions(WriteStream<Buffer> stream, String fileName, GridFsDownloadOptions options, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadByFileNameWithOptions(stream, fileName, options);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadByFileNameWithOptions(WriteStream<Buffer> stream, String fileName, GridFsDownloadOptions options) {
GridFSDownloadOptions downloadOptions = new GridFSDownloadOptions();
GridFSDownloadPublisher publisher = bucket.downloadToPublisher(fileName, downloadOptions);
return handleDownload(publisher, stream);
}
@Override
public MongoGridFsClient downloadById(WriteStream<Buffer> stream, String id, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadById(stream, id);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadById(WriteStream<Buffer> stream, String id) {
ObjectId objectId = new ObjectId(id);
GridFSDownloadPublisher publisher = bucket.downloadToPublisher(objectId);
return handleDownload(publisher, stream);
}
@Override
public MongoGridFsClient downloadFile(String fileName, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadFile(fileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadFile(String fileName) {
requireNonNull(fileName, "fileName cannot be null");
return downloadFileAs(fileName, fileName);
}
@Override
public MongoGridFsClient downloadFileAs(String fileName, String newFileName, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadFileAs(fileName, newFileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadFileAs(String fileName, String newFileName) {
requireNonNull(fileName, "fileName cannot be null");
requireNonNull(newFileName, "newFileName cannot be null");
OpenOptions options = new OpenOptions().setWrite(true);
return vertx.fileSystem().open(newFileName, options)
.flatMap(file -> {
GridFSDownloadPublisher publisher = bucket.downloadToPublisher(fileName);
return handleDownload(publisher, file);
});
}
@Override
public MongoGridFsClient downloadFileByID(String id, String fileName, Handler<AsyncResult<Long>> resultHandler) {
Future<Long> future = downloadFileByID(id, fileName);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Long> downloadFileByID(String id, String fileName) {
requireNonNull(fileName, "fileName cannot be null");
OpenOptions options = new OpenOptions().setWrite(true);
return vertx.fileSystem().open(fileName, options)
.flatMap(file -> {
ObjectId objectId = new ObjectId(id);
GridFSDownloadPublisher publisher = bucket.downloadToPublisher(objectId);
return handleDownload(publisher, file);
});
}
@Override
public MongoGridFsClient drop(Handler<AsyncResult<Void>> resultHandler) {
Future<Void> future = drop();
setHandler(future, resultHandler);
return this;
}
@Override
public Future<Void> drop() {
Promise<Void> promise = vertx.promise();
bucket.drop().subscribe(new CompletionSubscriber<>(promise));
return promise.future();
}
@Override
public MongoGridFsClient findAllIds(Handler<AsyncResult<List<String>>> resultHandler) {
Future<List<String>> future = findAllIds();
setHandler(future, resultHandler);
return this;
}
@Override
public Future<List<String>> findAllIds() {
Promise<List<String>> promise = vertx.promise();
bucket.find().subscribe(new MappingAndBufferingSubscriber<>(gridFSFile -> gridFSFile.getObjectId().toHexString(), promise));
return promise.future();
}
@Override
public MongoGridFsClient findIds(JsonObject query, Handler<AsyncResult<List<String>>> resultHandler) {
Future<List<String>> future = findIds(query);
setHandler(future, resultHandler);
return this;
}
@Override
public Future<List<String>> findIds(JsonObject query) {
requireNonNull(query, "query cannot be null");
JsonObject encodedQuery = clientImpl.encodeKeyWhenUseObjectId(query);
Bson bquery = clientImpl.wrap(encodedQuery);
Promise<List<String>> promise = vertx.promise();
bucket.find(bquery).subscribe(new MappingAndBufferingSubscriber<>(gridFSFile -> gridFSFile.getObjectId().toHexString(), promise));
return promise.future();
}
private Future<Long> handleDownload(GridFSDownloadPublisher publisher, WriteStream<Buffer> stream) {
ReadStream<ByteBuffer> adapter = new PublisherAdapter<>(vertx.getOrCreateContext(), publisher, 16);
MapAndCountBuffer mapper = new MapAndCountBuffer();
MappingStream<ByteBuffer, Buffer> rs = new MappingStream<>(adapter, mapper);
return rs.pipeTo(stream).map(v -> mapper.count);
}
private static class MapAndCountBuffer implements Function<ByteBuffer, Buffer> {
private long count = 0;
@Override
public Buffer apply(ByteBuffer bb) {
Buffer buffer = Buffer.buffer(copiedBuffer(bb));
count += buffer.length();
return buffer;
}
}
}