package io.vertx.ext.web.client.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpConstants;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.HeadersAdaptor;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.ext.web.multipart.FormDataPart;
import io.vertx.ext.web.multipart.MultipartForm;
import java.io.File;
public class MultipartFormUpload implements ReadStream<Buffer> {
private static final UnpooledByteBufAllocator ALLOC = new UnpooledByteBufAllocator(false);
private DefaultFullHttpRequest request;
private HttpPostRequestEncoder encoder;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private InboundBuffer<Buffer> pending;
private boolean ended;
private final Context context;
public MultipartFormUpload(Context context,
MultipartForm parts,
boolean multipart,
HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
this.context = context;
this.pending = new InboundBuffer<Buffer>(context).emptyHandler(v -> checkEnd()).drainHandler(v -> run()).pause();
this.request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
io.netty.handler.codec.http.HttpMethod.POST,
"/");
this.encoder = new HttpPostRequestEncoder(
new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE),
request,
multipart,
HttpConstants.DEFAULT_CHARSET,
encoderMode);
for (FormDataPart formDataPart : parts) {
if (formDataPart.isAttribute()) {
encoder.addBodyAttribute(formDataPart.name(), formDataPart.value());
} else {
encoder.addBodyFileUpload(formDataPart.name(),
formDataPart.filename(), new File(formDataPart.pathname()),
formDataPart.mediaType(), formDataPart.isText());
}
}
encoder.finalizeRequest();
}
private void checkEnd() {
Handler<Void> handler;
synchronized (MultipartFormUpload.this) {
handler = ended ? endHandler : null;
}
if (handler != null) {
handler.handle(null);
}
}
public void run() {
if (Vertx.currentContext() != context) {
context.runOnContext(v -> {
run();
});
return;
}
while (!ended) {
if (encoder.isChunked()) {
try {
HttpContent chunk = encoder.readChunk(ALLOC);
ByteBuf content = chunk.content();
Buffer buff = Buffer.buffer(content);
if (!pending.write(buff)) {
break;
} else if (encoder.isEndOfInput()) {
ended = true;
request = null;
encoder = null;
if (pending.isEmpty()) {
endHandler.handle(null);
}
}
} catch (Exception e) {
ended = true;
request = null;
encoder = null;
if (exceptionHandler != null) {
exceptionHandler.handle(e);
}
break;
}
} else {
ByteBuf content = request.content();
Buffer buffer = Buffer.buffer(content);
request = null;
encoder = null;
pending.write(buffer);
ended = true;
if (pending.isEmpty() && endHandler != null) {
endHandler.handle(null);
}
}
}
}
public MultiMap () {
return new HeadersAdaptor(request.headers());
}
@Override
public synchronized MultipartFormUpload exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public synchronized MultipartFormUpload handler(Handler<Buffer> handler) {
pending.handler(handler);
return this;
}
@Override
public synchronized MultipartFormUpload pause() {
pending.pause();
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
pending.fetch(amount);
return this;
}
@Override
public synchronized MultipartFormUpload resume() {
pending.resume();
return this;
}
@Override
public synchronized MultipartFormUpload endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
}