package org.springframework.http.client.reactive;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
public class HttpComponentsClientHttpConnector implements ClientHttpConnector {
private final CloseableHttpAsyncClient client;
private final BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider;
private DataBufferFactory dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
public HttpComponentsClientHttpConnector() {
this(HttpAsyncClients.createDefault());
}
public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) {
this(client, (method, uri) -> HttpClientContext.create());
}
public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client,
BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider) {
Assert.notNull(client, "Client must not be null");
Assert.notNull(contextProvider, "ContextProvider must not be null");
this.contextProvider = contextProvider;
this.client = client;
this.client.start();
}
public void setBufferFactory(DataBufferFactory bufferFactory) {
this.dataBufferFactory = bufferFactory;
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
HttpClientContext context = this.contextProvider.apply(method, uri);
if (context.getCookieStore() == null) {
context.setCookieStore(new BasicCookieStore());
}
HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri,
context, this.dataBufferFactory);
return requestCallback.apply(request).then(Mono.defer(() -> execute(request, context)));
}
private Mono<ClientHttpResponse> execute(HttpComponentsClientHttpRequest request, HttpClientContext context) {
AsyncRequestProducer requestProducer = request.toRequestProducer();
return Mono.create(sink -> {
ReactiveResponseConsumer reactiveResponseConsumer =
new ReactiveResponseConsumer(new MonoFutureCallbackAdapter(sink, this.dataBufferFactory, context));
this.client.execute(requestProducer, reactiveResponseConsumer, context, null);
});
}
private static class MonoFutureCallbackAdapter
implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> {
private final MonoSink<ClientHttpResponse> sink;
private final DataBufferFactory dataBufferFactory;
private final HttpClientContext context;
public MonoFutureCallbackAdapter(MonoSink<ClientHttpResponse> sink,
DataBufferFactory dataBufferFactory, HttpClientContext context) {
this.sink = sink;
this.dataBufferFactory = dataBufferFactory;
this.context = context;
}
@Override
public void completed(Message<HttpResponse, Publisher<ByteBuffer>> result) {
HttpComponentsClientHttpResponse response =
new HttpComponentsClientHttpResponse(this.dataBufferFactory, result, this.context);
this.sink.success(response);
}
@Override
public void failed(Exception ex) {
Throwable t = ex;
if (t instanceof HttpStreamResetException) {
HttpStreamResetException httpStreamResetException = (HttpStreamResetException) ex;
t = httpStreamResetException.getCause();
}
this.sink.error(t);
}
@Override
public void cancelled() {
}
}
}