/*
 * Copyright 2008-present MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.mongodb.async.client.gridfs.helpers;

import com.mongodb.MongoGridFSException;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.gridfs.AsyncInputStream;
import com.mongodb.async.client.gridfs.AsyncOutputStream;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;

import static com.mongodb.assertions.Assertions.notNull;

A general helper class that creates AsyncInputStream or AsyncOutputStream instances. Provides support for:
  • byte[] - Converts byte arrays into Async Streams
  • ByteBuffer - Converts ByteBuffers into Async Streams
  • InputStream - Converts InputStreams into Async Streams (Note: InputStream implementations are blocking)
  • OutputStream - Converts OutputStreams into Async Streams (Note: OutputStream implementations are blocking)
Since:3.3
Deprecated:Prefer the Reactive Streams-based asynchronous driver (mongodb-driver-reactivestreams artifactId)
/** * A general helper class that creates {@link AsyncInputStream} or {@link AsyncOutputStream} instances. * * Provides support for: * <ul> * <li>{@code byte[]} - Converts byte arrays into Async Streams</li> * <li>{@link ByteBuffer} - Converts ByteBuffers into Async Streams</li> * <li>{@link InputStream} - Converts InputStreams into Async Streams (Note: InputStream implementations are blocking)</li> * <li>{@link OutputStream} - Converts OutputStreams into Async Streams (Note: OutputStream implementations are blocking)</li> * </ul> * * @since 3.3 * @deprecated Prefer the Reactive Streams-based asynchronous driver (mongodb-driver-reactivestreams artifactId) */
@Deprecated public final class AsyncStreamHelper {
Converts a byte[] into a AsyncInputStream
Params:
  • srcBytes – the data source
Returns:the AsyncInputStream
/** * Converts a {@code byte[]} into a {@link AsyncInputStream} * * @param srcBytes the data source * @return the AsyncInputStream */
public static AsyncInputStream toAsyncInputStream(final byte[] srcBytes) { return toAsyncInputStream(ByteBuffer.wrap(srcBytes)); }
Converts a byte[] into a AsyncOutputStream
Params:
  • dstBytes – the data destination
Returns:the AsyncOutputStream
/** * Converts a {@code byte[]} into a {@link AsyncOutputStream} * * @param dstBytes the data destination * @return the AsyncOutputStream */
public static AsyncOutputStream toAsyncOutputStream(final byte[] dstBytes) { return toAsyncOutputStream(ByteBuffer.wrap(dstBytes)); }
Converts a ByteBuffer into a AsyncInputStream
Params:
  • srcByteBuffer – the data source
Returns:the AsyncInputStream
/** * Converts a {@link ByteBuffer} into a {@link AsyncInputStream} * * @param srcByteBuffer the data source * @return the AsyncInputStream */
public static AsyncInputStream toAsyncInputStream(final ByteBuffer srcByteBuffer) { notNull("srcByteBuffer", srcByteBuffer); return new AsyncInputStream() { @Override public void read(final ByteBuffer dstByteBuffer, final SingleResultCallback<Integer> callback) { transferDataFromByteBuffers(srcByteBuffer, dstByteBuffer, callback); } @Override public void skip(final long bytesToSkip, final SingleResultCallback<Long> callback) { notNull("callback", callback); callback.onResult(0L, null); } @Override public void close(final SingleResultCallback<Void> callback) { callback.onResult(null, null); } }; }
Converts a ByteBuffer into a AsyncOutputStream
Params:
  • dstByteBuffer – the data destination
Returns:the AsyncOutputStream
/** * Converts a {@link ByteBuffer} into a {@link AsyncOutputStream} * * @param dstByteBuffer the data destination * @return the AsyncOutputStream */
public static AsyncOutputStream toAsyncOutputStream(final ByteBuffer dstByteBuffer) { notNull("dstByteBuffer", dstByteBuffer); return new AsyncOutputStream() { @Override public void write(final ByteBuffer srcByteBuffer, final SingleResultCallback<Integer> callback) { transferDataFromByteBuffers(srcByteBuffer, dstByteBuffer, callback); } @Override public void close(final SingleResultCallback<Void> callback) { callback.onResult(null, null); } }; }
Converts a InputStream into a AsyncInputStream
Params:
  • inputStream – the InputStream
Returns:the AsyncInputStream
/** * Converts a {@link InputStream} into a {@link AsyncInputStream} * * @param inputStream the InputStream * @return the AsyncInputStream */
public static AsyncInputStream toAsyncInputStream(final InputStream inputStream) { notNull("inputStream", inputStream); return new AsyncInputStream() { @Override public void read(final ByteBuffer dstByteBuffer, final SingleResultCallback<Integer> callback) { notNull("dst", dstByteBuffer); notNull("callback", callback); if (!dstByteBuffer.hasRemaining()) { callback.onResult(-1, null); return; } int maxAmount = dstByteBuffer.remaining(); byte[] bytes = new byte[maxAmount]; int amountRead; try { amountRead = inputStream.read(bytes); } catch (Throwable t) { callback.onResult(null, new MongoGridFSException("Error reading from input stream", t)); return; } if (amountRead > 0) { if (amountRead < maxAmount) { byte[] dataRead = new byte[amountRead]; System.arraycopy(bytes, 0, dataRead, 0, amountRead); dstByteBuffer.put(dataRead); } else { dstByteBuffer.put(bytes); } } callback.onResult(amountRead, null); } @Override public void skip(final long bytesToSkip, final SingleResultCallback<Long> callback) { notNull("callback", callback); try { long skipped = inputStream.skip(bytesToSkip); callback.onResult(skipped, null); } catch (Throwable t) { callback.onResult(null, t); } } @Override public void close(final SingleResultCallback<Void> callback) { try { inputStream.close(); } catch (Throwable t) { callback.onResult(null, new MongoGridFSException("Error closing input stream", t)); return; } callback.onResult(null, null); } }; }
Converts a OutputStream into a AsyncOutputStream
Params:
  • outputStream – the OutputStream
Returns:the AsyncOutputStream
/** * Converts a {@link OutputStream} into a {@link AsyncOutputStream} * * @param outputStream the OutputStream * @return the AsyncOutputStream */
public static AsyncOutputStream toAsyncOutputStream(final OutputStream outputStream) { notNull("outputStream", outputStream); return new AsyncOutputStream() { @Override public void write(final ByteBuffer srcByteBuffer, final SingleResultCallback<Integer> callback) { notNull("src", srcByteBuffer); notNull("callback", callback); if (!srcByteBuffer.hasRemaining()) { callback.onResult(-1, null); return; } int amount = srcByteBuffer.remaining(); byte[] bytes = new byte[amount]; try { srcByteBuffer.get(bytes); outputStream.write(bytes); } catch (Throwable t) { callback.onResult(null, new MongoGridFSException("Error reading from input stream", t)); return; } callback.onResult(amount, null); } @Override public void close(final SingleResultCallback<Void> callback) { try { outputStream.close(); } catch (Throwable t) { callback.onResult(null, new MongoGridFSException("Error closing from output stream", t)); return; } callback.onResult(null, null); } }; } private static void transferDataFromByteBuffers(final ByteBuffer srcByteBuffer, final ByteBuffer dstByteBuffer, final SingleResultCallback<Integer> callback) { if (!srcByteBuffer.hasRemaining()) { callback.onResult(-1, null); return; } int transferAmount = Math.min(dstByteBuffer.remaining(), srcByteBuffer.remaining()); if (transferAmount > 0) { ByteBuffer temp = srcByteBuffer.duplicate(); ((Buffer) temp).limit(temp.position() + transferAmount); dstByteBuffer.put(temp); ((Buffer) srcByteBuffer).position(srcByteBuffer.position() + transferAmount); } callback.onResult(transferAmount, null); } private AsyncStreamHelper() { } }