/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.ext.web.codec.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.ext.web.codec.spi.BodyStream;
import java.util.function.Function;
Author: Julien Viet
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class BodyCodecImpl<T> implements BodyCodec<T> {
public static final Function<Buffer, Void> VOID_DECODER = buff -> null;
public static final Function<Buffer, String> UTF8_DECODER = Buffer::toString;
public static final Function<Buffer, JsonObject> JSON_OBJECT_DECODER = buff -> {
Object val = Json.decodeValue(buff);
if (val == null) {
return null;
}
if (val instanceof JsonObject) {
return (JsonObject) val;
}
throw new DecodeException("Invalid Json Object decoded as " + val.getClass().getName());
};
public static final Function<Buffer, JsonArray> JSON_ARRAY_DECODER = buff -> {
Object val = Json.decodeValue(buff);
if (val == null) {
return null;
}
if (val instanceof JsonArray) {
return (JsonArray) val;
}
throw new DecodeException("Invalid Json Object decoded as " + val.getClass().getName());
};
public static final BodyCodec<String> STRING = new BodyCodecImpl<>(UTF8_DECODER);
public static final BodyCodec<Void> NONE = new BodyCodecImpl<>(VOID_DECODER);
public static final BodyCodec<Buffer> BUFFER = new BodyCodecImpl<>(Function.identity());
public static final BodyCodec<JsonObject> JSON_OBJECT = new BodyCodecImpl<>(JSON_OBJECT_DECODER);
public static final BodyCodec<JsonArray> JSON_ARRAY = new BodyCodecImpl<>(JSON_ARRAY_DECODER);
public static BodyCodecImpl<String> string(String encoding) {
return new BodyCodecImpl<>(buff -> buff.toString(encoding));
}
public static <T> BodyCodec<T> json(Class<T> type) {
return new BodyCodecImpl<>(jsonDecoder(type));
}
public static <T> Function<Buffer, T> jsonDecoder(Class<T> type) {
return buff -> Json.decodeValue(buff.toString(), type);
}
private final Function<Buffer, T> decoder;
public BodyCodecImpl(Function<Buffer, T> decoder) {
this.decoder = decoder;
}
@Override
public void create(Handler<AsyncResult<BodyStream<T>>> handler) {
handler.handle(Future.succeededFuture(new BodyStream<T>() {
Buffer buffer = Buffer.buffer();
Promise<T> state = Promise.promise();
@Override
public void handle(Throwable cause) {
state.tryFail(cause);
}
@Override
public Future<T> result() {
return state.future();
}
@Override
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public WriteStream<Buffer> write(Buffer data, Handler<AsyncResult<Void>> handler) {
buffer.appendBuffer(data);
handler.handle(Future.succeededFuture());
return this;
}
@Override
public WriteStream<Buffer> write(Buffer data) {
buffer.appendBuffer(data);
return this;
}
@Override
public void end() {
end((Handler<AsyncResult<Void>>) null);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
if (!state.future().isComplete()) {
T result;
if (buffer.length() > 0) {
try {
result = decoder.apply(buffer);
} catch (Throwable t) {
state.fail(t);
if (handler != null) {
handler.handle(Future.failedFuture(t));
}
return;
}
} else {
result = null;
}
state.complete(result);
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
return this;
}
}));
}
}