package jdk.incubator.http;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.common.Log;
class ResponseProcessors {
abstract static class AbstractProcessor<T>
implements HttpResponse.BodyProcessor<T>
{
HttpClientImpl client;
synchronized void setClient(HttpClientImpl client) {
this.client = client;
}
synchronized HttpClientImpl getClient() {
return client;
}
}
static class ConsumerProcessor extends AbstractProcessor<Void> {
private final Consumer<Optional<byte[]>> consumer;
private Flow.Subscription subscription;
private final CompletableFuture<Void> result = new MinimalFuture<>();
ConsumerProcessor(Consumer<Optional<byte[]>> consumer) {
this.consumer = consumer;
}
@Override
public CompletionStage<Void> getBody() {
return result;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
byte[] buf = new byte[item.remaining()];
item.get(buf);
consumer.accept(Optional.of(buf));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
result.completeExceptionally(throwable);
}
@Override
public void onComplete() {
consumer.accept(Optional.empty());
result.complete(null);
}
}
static class PathProcessor extends AbstractProcessor<Path> {
private final Path file;
private final CompletableFuture<Path> result = new MinimalFuture<>();
private Flow.Subscription subscription;
private FileChannel out;
private final OpenOption[] options;
PathProcessor(Path file, OpenOption... options) {
this.file = file;
this.options = options;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
try {
out = FileChannel.open(file, options);
} catch (IOException e) {
result.completeExceptionally(e);
subscription.cancel();
return;
}
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
try {
out.write(item);
} catch (IOException ex) {
Utils.close(out);
subscription.cancel();
result.completeExceptionally(ex);
}
subscription.request(1);
}
@Override
public void onError(Throwable e) {
result.completeExceptionally(e);
Utils.close(out);
}
@Override
public void onComplete() {
Utils.close(out);
result.complete(file);
}
@Override
public CompletionStage<Path> getBody() {
return result;
}
}
static class ByteArrayProcessor<T> extends AbstractProcessor<T> {
private final Function<byte[], T> finisher;
private final CompletableFuture<T> result = new MinimalFuture<>();
private final List<ByteBuffer> received = new ArrayList<>();
private Flow.Subscription subscription;
ByteArrayProcessor(Function<byte[],T> finisher) {
this.finisher = finisher;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer item) {
if(item.hasRemaining()) {
received.add(item);
}
}
@Override
public void onError(Throwable throwable) {
received.clear();
result.completeExceptionally(throwable);
}
static private byte[] join(List<ByteBuffer> bytes) {
int size = Utils.remaining(bytes);
byte[] res = new byte[size];
int from = 0;
for (ByteBuffer b : bytes) {
int l = b.remaining();
b.get(res, from, l);
from += l;
}
return res;
}
@Override
public void onComplete() {
try {
result.complete(finisher.apply(join(received)));
received.clear();
} catch (IllegalArgumentException e) {
result.completeExceptionally(e);
}
}
@Override
public CompletionStage<T> getBody() {
return result;
}
}
static class MultiProcessorImpl<V> implements HttpResponse.MultiProcessor<MultiMapResult<V>,V> {
private final MultiMapResult<V> results;
private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
private final boolean completion;
MultiProcessorImpl(Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) {
this.results = new MultiMapResult<V>(new ConcurrentHashMap<>());
this.pushHandler = pushHandler;
this.completion = completion;
}
@Override
public Optional<HttpResponse.BodyHandler<V>> onRequest(HttpRequest request) {
return pushHandler.apply(request);
}
@Override
public void onResponse(HttpResponse<V> response) {
results.put(response.request(), CompletableFuture.completedFuture(response));
}
@Override
public void onError(HttpRequest request, Throwable t) {
results.put(request, MinimalFuture.failedFuture(t));
}
@Override
public CompletableFuture<MultiMapResult<V>> completion(
CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) {
if (completion)
return onComplete.thenApply((ignored)-> results);
else
return onFinalPushPromise.thenApply((ignored) -> results);
}
}
static class MultiFile {
final Path pathRoot;
MultiFile(Path destination) {
if (!destination.toFile().isDirectory())
throw new UncheckedIOException(new IOException("destination is not a directory"));
pathRoot = destination;
}
Optional<HttpResponse.BodyHandler<Path>> handlePush(HttpRequest request) {
final URI uri = request.uri();
String path = uri.getPath();
while (path.startsWith("/"))
path = path.substring(1);
Path p = pathRoot.resolve(path);
if (Log.trace()) {
Log.logTrace("Creating file body processor for URI={0}, path={1}",
uri, p);
}
try {
Files.createDirectories(p.getParent());
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
final HttpResponse.BodyHandler<Path> proc =
HttpResponse.BodyHandler.asFile(p);
return Optional.of(proc);
}
}
static class NullProcessor<T> extends AbstractProcessor<T> {
Flow.Subscription subscription;
final CompletableFuture<T> cf = new MinimalFuture<>();
final Optional<T> result;
NullProcessor(Optional<T> result) {
this.result = result;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer item) {
item.position(item.limit());
}
@Override
public void onError(Throwable throwable) {
cf.completeExceptionally(throwable);
}
@Override
public void onComplete() {
if (result.isPresent()) {
cf.complete(result.get());
} else {
cf.complete(null);
}
}
@Override
public CompletionStage<T> getBody() {
return cf;
}
}
}