package org.glassfish.jersey.client;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.inject.Provider;
import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.BootstrapBag;
import org.glassfish.jersey.internal.Version;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.internal.util.collection.LazyValue;
import org.glassfish.jersey.internal.util.collection.Ref;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.model.internal.ManagedObjectsFinalizer;
import org.glassfish.jersey.process.internal.ChainableStage;
import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.process.internal.Stage;
import org.glassfish.jersey.process.internal.Stages;
class ClientRuntime implements JerseyClient.ShutdownHook, ClientExecutor {
private static final Logger LOG = Logger.getLogger(ClientRuntime.class.getName());
private final Stage<ClientRequest> requestProcessingRoot;
private final Stage<ClientResponse> responseProcessingRoot;
private final Connector connector;
private final ClientConfig config;
private final RequestScope requestScope;
private final LazyValue<ExecutorService> asyncRequestExecutor;
private final LazyValue<ScheduledExecutorService> backgroundScheduler;
private final Iterable<ClientLifecycleListener> lifecycleListeners;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ManagedObjectsFinalizer managedObjectsFinalizer;
private final InjectionManager injectionManager;
private final InvocationInterceptorStages.PreInvocationInterceptorStage preInvocationInterceptorStage;
private final InvocationInterceptorStages.PostInvocationInterceptorStage postInvocationInterceptorStage;
public ClientRuntime(final ClientConfig config, final Connector connector, final InjectionManager injectionManager,
final BootstrapBag bootstrapBag) {
Provider<Ref<ClientRequest>> clientRequest =
() -> injectionManager.getInstance(new GenericType<Ref<ClientRequest>>() {}.getType());
RequestProcessingInitializationStage requestProcessingInitializationStage =
new RequestProcessingInitializationStage(clientRequest, bootstrapBag.getMessageBodyWorkers(), injectionManager);
Stage.Builder<ClientRequest> requestingChainBuilder = Stages.chain(requestProcessingInitializationStage);
preInvocationInterceptorStage = InvocationInterceptorStages.createPreInvocationInterceptorStage(injectionManager);
postInvocationInterceptorStage = InvocationInterceptorStages.createPostInvocationInterceptorStage(injectionManager);
ChainableStage<ClientRequest> requestFilteringStage = preInvocationInterceptorStage.hasPreInvocationInterceptors()
? ClientFilteringStages.createRequestFilteringStage(
preInvocationInterceptorStage.createPreInvocationInterceptorFilter(), injectionManager)
: ClientFilteringStages.createRequestFilteringStage(injectionManager);
this.requestProcessingRoot = requestFilteringStage != null
? requestingChainBuilder.build(requestFilteringStage) : requestingChainBuilder.build();
ChainableStage<ClientResponse> responseFilteringStage = ClientFilteringStages.createResponseFilteringStage(
injectionManager);
this.responseProcessingRoot = responseFilteringStage != null ? responseFilteringStage : Stages.identity();
this.managedObjectsFinalizer = bootstrapBag.getManagedObjectsFinalizer();
this.config = config;
this.connector = connector;
this.requestScope = bootstrapBag.getRequestScope();
this.asyncRequestExecutor = Values.lazy((Value<ExecutorService>) () ->
config.getExecutorService() == null
? injectionManager.getInstance(ExecutorService.class, ClientAsyncExecutorLiteral.INSTANCE)
: config.getExecutorService());
this.backgroundScheduler = Values.lazy((Value<ScheduledExecutorService>) () ->
config.getScheduledExecutorService() == null
? injectionManager.getInstance(ScheduledExecutorService.class, ClientBackgroundSchedulerLiteral.INSTANCE)
: config.getScheduledExecutorService());
this.injectionManager = injectionManager;
this.lifecycleListeners = Providers.getAllProviders(injectionManager, ClientLifecycleListener.class);
for (final ClientLifecycleListener listener : lifecycleListeners) {
try {
listener.onInit();
} catch (final Throwable t) {
LOG.log(Level.WARNING, LocalizationMessages.ERROR_LISTENER_INIT(listener.getClass().getName()), t);
}
}
}
Runnable createRunnableForAsyncProcessing(ClientRequest request, final ResponseCallback callback) {
try {
requestScope.runInScope(() -> preInvocationInterceptorStage.beforeRequest(request));
} catch (Throwable throwable) {
return () -> requestScope.runInScope(() -> processFailure(request, throwable, callback));
}
return () -> requestScope.runInScope(() -> {
RuntimeException runtimeException = null;
try {
ClientRequest processedRequest;
try {
processedRequest = Stages.process(request, requestProcessingRoot);
processedRequest = addUserAgent(processedRequest, connector.getName());
} catch (final AbortException aborted) {
processResponse(request, aborted.getAbortResponse(), callback);
return;
}
final AsyncConnectorCallback connectorCallback = new AsyncConnectorCallback() {
@Override
public void response(final ClientResponse response) {
requestScope.runInScope(() -> processResponse(request, response, callback));
}
@Override
public void failure(final Throwable failure) {
requestScope.runInScope(() -> processFailure(request, failure, callback));
}
};
connector.apply(processedRequest, connectorCallback);
} catch (final Throwable throwable) {
processFailure(request, throwable, callback);
}
});
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return asyncRequestExecutor.get().submit(task);
}
@Override
public Future<?> submit(Runnable task) {
return asyncRequestExecutor.get().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return asyncRequestExecutor.get().submit(task, result);
}
@Override
public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit unit) {
return backgroundScheduler.get().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return backgroundScheduler.get().schedule(command, delay, unit);
}
private void processResponse(final ClientRequest request, final ClientResponse response, final ResponseCallback callback) {
ClientResponse processedResponse = null;
Throwable caught = null;
try {
processedResponse = Stages.process(response, responseProcessingRoot);
} catch (final Throwable throwable) {
caught = throwable;
}
try {
processedResponse = postInvocationInterceptorStage.afterRequest(request, processedResponse, caught);
} catch (Throwable throwable) {
processFailure(throwable, callback);
return;
}
callback.completed(processedResponse, requestScope);
}
private void processFailure(final ClientRequest request, final Throwable failure, final ResponseCallback callback) {
if (postInvocationInterceptorStage.hasPostInvocationInterceptor()) {
try {
final ClientResponse clientResponse = postInvocationInterceptorStage.afterRequest(request, null, failure);
callback.completed(clientResponse, requestScope);
} catch (RuntimeException e) {
final Throwable t = e.getSuppressed().length == 1 && e.getSuppressed()[0] == failure ? failure : e;
processFailure(t, callback);
}
} else {
processFailure(failure, callback);
}
}
private void processFailure(final Throwable failure, final ResponseCallback callback) {
callback.failed(failure instanceof ProcessingException
? (ProcessingException) failure : new ProcessingException(failure));
}
private Future<?> submit(final ExecutorService executor, final Runnable task) {
return executor.submit(() -> requestScope.runInScope(task));
}
private ClientRequest addUserAgent(final ClientRequest clientRequest, final String connectorName) {
final MultivaluedMap<String, Object> headers = clientRequest.getHeaders();
if (headers.containsKey(HttpHeaders.USER_AGENT)) {
if (clientRequest.getHeaderString(HttpHeaders.USER_AGENT) == null) {
headers.remove(HttpHeaders.USER_AGENT);
}
} else if (!clientRequest.ignoreUserAgent()) {
if (connectorName != null && !connectorName.isEmpty()) {
headers.put(HttpHeaders.USER_AGENT,
Collections.singletonList(String.format("Jersey/%s (%s)", Version.getVersion(), connectorName)));
} else {
headers.put(HttpHeaders.USER_AGENT,
Collections.singletonList(String.format("Jersey/%s", Version.getVersion())));
}
}
return clientRequest;
}
public ClientResponse invoke(final ClientRequest request) {
ProcessingException processingException = null;
ClientResponse response = null;
try {
preInvocationInterceptorStage.beforeRequest(request);
try {
response = connector.apply(addUserAgent(Stages.process(request, requestProcessingRoot), connector.getName()));
} catch (final AbortException aborted) {
response = aborted.getAbortResponse();
}
response = Stages.process(response, responseProcessingRoot);
} catch (final ProcessingException pe) {
processingException = pe;
} catch (final Throwable t) {
processingException = new ProcessingException(t.getMessage(), t);
} finally {
response = postInvocationInterceptorStage.afterRequest(request, response, processingException);
return response;
}
}
public RequestScope getRequestScope() {
return requestScope;
}
public ClientConfig getConfig() {
return config;
}
@Override
protected void finalize() throws Throwable {
try {
close();
} finally {
super.finalize();
}
}
@Override
public void onShutdown() {
close();
}
private void close() {
if (closed.compareAndSet(false, true)) {
try {
for (final ClientLifecycleListener listener : lifecycleListeners) {
try {
listener.onClose();
} catch (final Throwable t) {
LOG.log(Level.WARNING, LocalizationMessages.ERROR_LISTENER_CLOSE(listener.getClass().getName()), t);
}
}
} finally {
try {
connector.close();
} finally {
managedObjectsFinalizer.preDestroy();
injectionManager.shutdown();
}
}
}
}
public void preInitialize() {
injectionManager.getInstance(MessageBodyWorkers.class);
}
public Connector getConnector() {
return connector;
}
InjectionManager getInjectionManager() {
return injectionManager;
}
}