package com.microsoft.azure.management.appservice.implementation;
import com.google.common.base.Joiner;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.TypeToken;
import com.microsoft.azure.AzureResponseBuilder;
import com.microsoft.azure.management.appservice.DeployType;
import com.microsoft.azure.management.appservice.WebAppBase;
import com.microsoft.rest.RestClient;
import com.microsoft.rest.RestException;
import com.microsoft.rest.ServiceResponse;
import com.microsoft.rest.ServiceResponseBuilder;
import com.microsoft.rest.protocol.ResponseBuilder;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.BufferedSource;
import retrofit2.Response;
import retrofit2.http.Body;
import retrofit2.http.GET;
import retrofit2.http.Headers;
import retrofit2.http.POST;
import retrofit2.http.Query;
import retrofit2.http.Streaming;
import rx.Completable;
import rx.Emitter;
import rx.Emitter.BackpressureMode;
import rx.Observable;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
class KuduClient {
private final RestClient restClient;
private final KuduService service;
KuduClient(WebAppBase webAppBase) {
if (webAppBase.defaultHostName() == null) {
throw new UnsupportedOperationException("Cannot initialize kudu client before web app is created");
}
String host = webAppBase.defaultHostName().toLowerCase()
.replace("http://", "")
.replace("https://", "");
String[] parts = host.split("\\.", 2);
host = Joiner.on('.').join(parts[0], "scm", parts[1]);
restClient = webAppBase.manager().restClient().newBuilder()
.withBaseUrl("https://" + host)
.withConnectionTimeout(3, TimeUnit.MINUTES)
.withReadTimeout(3, TimeUnit.MINUTES)
.build();
service = restClient
.retrofit().create(KuduService.class);
}
private interface KuduService {
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps streamApplicationLogs", "x-ms-body-logging: false" })
@GET("api/logstream/application")
@Streaming
Observable<ResponseBody> streamApplicationLogs();
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps streamHttpLogs", "x-ms-body-logging: false" })
@GET("api/logstream/http")
@Streaming
Observable<ResponseBody> streamHttpLogs();
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps streamTraceLogs", "x-ms-body-logging: false" })
@GET("api/logstream/kudu/trace")
@Streaming
Observable<ResponseBody> streamTraceLogs();
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps streamDeploymentLogs", "x-ms-body-logging: false" })
@GET("api/logstream/kudu/deployment")
@Streaming
Observable<ResponseBody> streamDeploymentLogs();
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps streamAllLogs", "x-ms-body-logging: false" })
@GET("api/logstream")
@Streaming
Observable<ResponseBody> streamAllLogs();
@Headers({ "Content-Type: application/octet-stream", "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps warDeploy", "x-ms-body-logging: false" })
@POST("api/wardeploy")
Observable<Response<ResponseBody>> warDeploy(@Body RequestBody warFile, @Query("name") String appName);
@Headers({ "Content-Type: application/octet-stream", "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps zipDeploy", "x-ms-body-logging: false" })
@POST("api/zipdeploy")
Observable<Response<ResponseBody>> zipDeploy(@Body RequestBody zipFile);
@Headers({ "Content-Type: application/octet-stream", "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps publish", "x-ms-body-logging: false" })
@POST("api/publish")
Observable<Response<ResponseBody>> deploy(@Body RequestBody file, @Query("type") DeployType type, @Query("path") String path, @Query("restart") Boolean restart, @Query("clean") Boolean clean);
@Headers({ "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps settings" })
@GET("api/settings")
Observable<Response<ResponseBody>> settings();
}
Observable<String> streamApplicationLogsAsync() {
return service.streamApplicationLogs()
.flatMap(new Func1<ResponseBody, Observable<String>>() {
@Override
public Observable<String> call(ResponseBody responseBody) {
final BufferedSource source = responseBody.source();
return streamFromBufferedSource(source);
}
});
}
Observable<String> streamHttpLogsAsync() {
return service.streamHttpLogs()
.flatMap(new Func1<ResponseBody, Observable<String>>() {
@Override
public Observable<String> call(ResponseBody responseBody) {
final BufferedSource source = responseBody.source();
return streamFromBufferedSource(source);
}
});
}
Observable<String> streamTraceLogsAsync() {
return service.streamTraceLogs()
.flatMap(new Func1<ResponseBody, Observable<String>>() {
@Override
public Observable<String> call(ResponseBody responseBody) {
final BufferedSource source = responseBody.source();
return streamFromBufferedSource(source);
}
});
}
Observable<String> streamDeploymentLogsAsync() {
return service.streamDeploymentLogs()
.flatMap(new Func1<ResponseBody, Observable<String>>() {
@Override
public Observable<String> call(ResponseBody responseBody) {
final BufferedSource source = responseBody.source();
return streamFromBufferedSource(source);
}
});
}
Observable<String> streamAllLogsAsync() {
return service.streamAllLogs()
.flatMap(new Func1<ResponseBody, Observable<String>>() {
@Override
public Observable<String> call(ResponseBody responseBody) {
final BufferedSource source = responseBody.source();
return streamFromBufferedSource(source);
}
});
}
private Observable<String> streamFromBufferedSource(final BufferedSource source) {
return Observable.create(new Action1<Emitter<String>>() {
@Override
public void call(Emitter<String> stringEmitter) {
try {
while (!source.exhausted()) {
stringEmitter.onNext(source.readUtf8Line());
}
stringEmitter.onCompleted();
} catch (IOException e) {
stringEmitter.onError(e);
}
}
}, BackpressureMode.BUFFER);
}
Completable warDeployAsync(InputStream warFile, String appName) {
try {
RequestBody body = RequestBody.create(MediaType.parse("application/octet-stream"), ByteStreams.toByteArray(warFile));
Observable<ServiceResponse<Void>> response =
retryOnError(handleResponse(service.warDeploy(body, appName)));
return response.toCompletable();
} catch (IOException e) {
return Completable.error(e);
}
}
Completable zipDeployAsync(InputStream zipFile) {
try {
RequestBody body = RequestBody.create(MediaType.parse("application/octet-stream"), ByteStreams.toByteArray(zipFile));
Observable<ServiceResponse<Void>> response =
retryOnError(handleResponse(service.zipDeploy(body)));
return response.toCompletable();
} catch (IOException e) {
return Completable.error(e);
}
}
Completable deployAsync(DeployType type, InputStream file, String path, Boolean restart, Boolean clean) {
try {
RequestBody body = RequestBody.create(MediaType.parse("application/octet-stream"), ByteStreams.toByteArray(file));
Observable<ServiceResponse<Void>> response =
retryOnError(handleResponse(service.deploy(body, type, path, restart, clean)));
return response.toCompletable();
} catch (IOException e) {
return Completable.error(e);
}
}
Observable<Map<String, String>> settings() {
return retryOnError(service.settings().flatMap(new Func1<Response<ResponseBody>, Observable<ServiceResponse<Map<String, String>>>>() {
@Override
public Observable<ServiceResponse<Map<String, String>>> call(Response<ResponseBody> response) {
try {
ResponseBuilder<Map<String, String>, RestException> responseBuilder = restClient
.responseBuilderFactory().newInstance(restClient.serializerAdapter());
if (responseBuilder instanceof AzureResponseBuilder) {
((AzureResponseBuilder<Map<String, String>, RestException>) responseBuilder).withThrowOnGet404(true);
} else if (responseBuilder instanceof ServiceResponseBuilder) {
((ServiceResponseBuilder<Map<String, String>, RestException>) responseBuilder).withThrowOnGet404(true);
}
ServiceResponse<Map<String, String>> clientResponse = responseBuilder
.register(200, new TypeToken<Map<String, String>>() { }.getType())
.registerError(RestException.class)
.build(response);
return Observable.just(clientResponse);
} catch (Throwable t) {
return Observable.error(t);
}
}
}).map(new Func1<ServiceResponse<Map<String, String>>, Map<String, String>>() {
@Override
public Map<String, String> call(ServiceResponse<Map<String, String>> response) {
return response.body();
}
}));
}
private Observable<ServiceResponse<Void>> handleResponse(Observable<Response<ResponseBody>> observable) {
return observable.flatMap(new Func1<Response<ResponseBody>, Observable<ServiceResponse<Void>>>() {
@Override
public Observable<ServiceResponse<Void>> call(Response<ResponseBody> response) {
try {
if (response.isSuccessful()) {
return Observable.just(new ServiceResponse<Void>(null, response));
} else {
String errorMessage = "Status code " + response.code();
if (response.errorBody() != null
&& response.errorBody().contentType() != null
&& Objects.equals("text", response.errorBody().contentType().type())) {
String errorBody = response.errorBody().string();
if (!errorBody.isEmpty()) {
errorMessage = errorBody;
}
}
return Observable.error(new RestException(errorMessage, response));
}
} catch (Throwable t) {
return Observable.error(t);
}
}
});
}
private <T> Observable<T> retryOnError(Observable<T> observable) {
final int retryCount = 5 + 1;
return observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, retryCount), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
if (integer < retryCount
&& (throwable instanceof SocketTimeoutException
|| (throwable instanceof RestException
&& ((RestException) throwable).response().code() == 502))) {
return integer;
} else {
throw Exceptions.propagate(throwable);
}
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer i) {
return Observable.timer(i * 10, TimeUnit.SECONDS);
}
});
}
});
}
}