/*
 * Copyright 2014 Red Hat, Inc.
 *
 *  All rights reserved. This program and the accompanying materials
 *  are made available under the terms of the Eclipse Public License v1.0
 *  and Apache License v2.0 which accompanies this distribution.
 *
 *  The Eclipse Public License is available at
 *  http://www.eclipse.org/legal/epl-v10.html
 *
 *  The Apache License v2.0 is available at
 *  http://www.opensource.org/licenses/apache2.0.php
 *
 *  You may elect to redistribute this code under either of these licenses.
 */
package io.vertx.ext.web.codec.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.ext.web.codec.spi.BodyStream;

Author:Julien Viet
/** * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> */
public class StreamingBodyCodec implements BodyCodec<Void> { private final WriteStream<Buffer> stream; private final boolean close; private Throwable error; public StreamingBodyCodec(WriteStream<Buffer> stream) { this(stream, true); } public StreamingBodyCodec(WriteStream<Buffer> stream, boolean close) { this.stream = stream; this.close = close; } public void init() { stream.exceptionHandler(err -> { synchronized (StreamingBodyCodec.this) { error = err; } }); } @Override public void create(Handler<AsyncResult<BodyStream<Void>>> handler) { AsyncResult<BodyStream<Void>> result; synchronized (this) { if (error != null) { result = Future.failedFuture(error); } else { result = Future.succeededFuture(new BodyStream<Void>() { Promise<Void> promise = Promise.promise(); @Override public Future<Void> result() { return promise.future(); } @Override public void handle(Throwable cause) { promise.tryFail(cause); } @Override public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) { stream.exceptionHandler(handler); return this; } @Override public void write(Buffer data, Handler<AsyncResult<Void>> handler) { stream.write(data, handler); } @Override public Future<Void> write(Buffer data) { Promise<Void> promise = Promise.promise(); write(data, promise); return promise.future(); } @Override public void end(Handler<AsyncResult<Void>> handler) { if (close) { stream.end(ar -> { if (ar.succeeded()) { promise.tryComplete(); } else { promise.tryFail(ar.cause()); } if (handler != null) { handler.handle(ar); } }); } else { promise.tryComplete(); if (handler != null) { handler.handle(Future.succeededFuture()); } } } @Override public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) { stream.setWriteQueueMaxSize(maxSize); return this; } @Override public boolean writeQueueFull() { return stream.writeQueueFull(); } @Override public WriteStream<Buffer> drainHandler(Handler<Void> handler) { stream.drainHandler(handler); return this; } }); } } handler.handle(result); } }