package io.vertx.ext.web.client.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpConstants;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.codec.http.multipart.MemoryFileUpload;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.ext.web.multipart.FormDataPart;
import io.vertx.ext.web.multipart.MultipartForm;
import java.io.File;
public class MultipartFormUpload implements ReadStream<Buffer> {
private static final UnpooledByteBufAllocator ALLOC = new UnpooledByteBufAllocator(false);
private DefaultFullHttpRequest request;
private HttpPostRequestEncoder encoder;
private Handler<Throwable> exceptionHandler;
private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private InboundBuffer<Object> pending;
private boolean ended;
private final Context context;
public MultipartFormUpload(Context context,
MultipartForm parts,
boolean multipart,
HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
this.context = context;
this.pending = new InboundBuffer<>(context)
.handler(this::handleChunk)
.drainHandler(v -> run()).pause();
this.request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
io.netty.handler.codec.http.HttpMethod.POST,
"/");
this.encoder = new HttpPostRequestEncoder(
new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE),
request,
multipart,
HttpConstants.DEFAULT_CHARSET,
encoderMode);
for (FormDataPart formDataPart : parts) {
if (formDataPart.isAttribute()) {
encoder.addBodyAttribute(formDataPart.name(), formDataPart.value());
} else {
String pathname = formDataPart.pathname();
if (pathname != null) {
encoder.addBodyFileUpload(formDataPart.name(),
formDataPart.filename(), new File(formDataPart.pathname()),
formDataPart.mediaType(), formDataPart.isText());
} else {
String contentType = formDataPart.mediaType();
if (formDataPart.mediaType() == null) {
if (formDataPart.isText()) {
contentType = "text/plain";
} else {
contentType = "application/octet-stream";
}
}
String transferEncoding = formDataPart.isText() ? null : "binary";
MemoryFileUpload fileUpload = new MemoryFileUpload(
formDataPart.name(),
formDataPart.filename(),
contentType, transferEncoding, null, formDataPart.content().length());
fileUpload.setContent(formDataPart.content().getByteBuf());
encoder.addBodyHttpData(fileUpload);
}
}
}
encoder.finalizeRequest();
}
private void handleChunk(Object item) {
Handler handler;
synchronized (MultipartFormUpload.this) {
if (item instanceof Buffer) {
handler = dataHandler;
} else if (item instanceof Throwable) {
handler = exceptionHandler;
} else if (item == InboundBuffer.END_SENTINEL) {
handler = endHandler;
item = null;
} else {
return;
}
}
handler.handle(item);
}
public void run() {
if (Vertx.currentContext() != context) {
context.runOnContext(v -> {
run();
});
return;
}
while (!ended) {
if (encoder.isChunked()) {
try {
HttpContent chunk = encoder.readChunk(ALLOC);
ByteBuf content = chunk.content();
Buffer buff = Buffer.buffer(content);
boolean writable = pending.write(buff);
if (encoder.isEndOfInput()) {
ended = true;
request = null;
encoder = null;
pending.write(InboundBuffer.END_SENTINEL);
} else if (!writable) {
break;
}
} catch (Exception e) {
ended = true;
request = null;
encoder = null;
pending.write(e);
break;
}
} else {
ByteBuf content = request.content();
Buffer buffer = Buffer.buffer(content);
request = null;
encoder = null;
pending.write(buffer);
ended = true;
pending.write(InboundBuffer.END_SENTINEL);
}
}
}
public MultiMap () {
return new HeadersAdaptor(request.headers());
}
@Override
public synchronized MultipartFormUpload exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public synchronized MultipartFormUpload handler(Handler<Buffer> handler) {
dataHandler = handler;
return this;
}
@Override
public synchronized MultipartFormUpload pause() {
pending.pause();
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
pending.fetch(amount);
return this;
}
@Override
public synchronized MultipartFormUpload resume() {
pending.resume();
return this;
}
@Override
public synchronized MultipartFormUpload endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
}