package io.vertx.config.kubernetes;
import io.vertx.config.spi.ConfigStore;
import io.vertx.config.spi.utils.JsonObjectHelper;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
public class ConfigMapStore implements ConfigStore {
private static final String KUBERNETES_NAMESPACE = System.getenv("KUBERNETES_NAMESPACE");
private static final Base64.Decoder DECODER = Base64.getDecoder();
private final Vertx vertx;
private final JsonObject configuration;
private final String namespace;
private final String name;
private final String key;
private final boolean secret;
private final boolean optional;
private final Context ctx;
private final WebClient client;
private String token;
public ConfigMapStore(Vertx vertx, JsonObject configuration) {
this.vertx = vertx;
this.configuration = configuration;
this.ctx = vertx.getOrCreateContext();
String ns = configuration.getString("namespace");
if (ns == null) {
if (KUBERNETES_NAMESPACE != null) {
ns = KUBERNETES_NAMESPACE;
} else {
ns = "default";
}
}
this.optional = configuration.getBoolean("optional", true);
this.namespace = ns;
this.name = configuration.getString("name");
this.key = configuration.getString("key");
this.secret = configuration.getBoolean("secret", false);
int port = configuration.getInteger("port", 0);
if (port == 0) {
if (configuration.getBoolean("ssl", true)) {
port = 443;
} else {
port = 80;
}
}
String p = System.getenv("KUBERNETES_SERVICE_PORT");
if (p != null) {
port = Integer.valueOf(p);
}
String host = configuration.getString("host");
String h = System.getenv("KUBERNETES_SERVICE_HOST");
if (h != null) {
host = h;
}
client = WebClient.create(vertx,
new WebClientOptions()
.setTrustAll(true)
.setSsl(configuration.getBoolean("ssl", true))
.setDefaultHost(host)
.setDefaultPort(port)
.setFollowRedirects(true)
);
Objects.requireNonNull(this.name);
}
@Override
public synchronized void close(Handler<Void> completionHandler) {
runOnContext(v -> closeOnContext(completionHandler));
}
private synchronized void closeOnContext(Handler<Void> completionHandler) {
if (client != null) {
client.close();
}
if (completionHandler != null) {
completionHandler.handle(null);
}
}
private void runOnContext(Handler<Void> action) {
if (Vertx.currentContext() == this.ctx) {
action.handle(null);
}
else {
ctx.runOnContext(action);
}
}
private Future<String> getToken() {
Promise<String> result = Promise.promise();
String token = configuration.getString("token");
if (token != null && !token.trim().isEmpty()) {
this.token = token;
result.complete(token);
return result.future();
}
vertx.fileSystem().readFile(KubernetesUtils.OPENSHIFT_KUBERNETES_TOKEN_FILE, ar -> {
if (ar.failed()) {
if (optional) {
this.token = "";
result.tryComplete(this.token);
} else {
result.tryFail(ar.cause());
}
} else {
this.token = ar.result().toString();
result.tryComplete(ar.result().toString());
}
});
return result.future();
}
@Override
public void get(Handler<AsyncResult<Buffer>> completionHandler) {
runOnContext(v -> getOnContext(completionHandler));
}
private synchronized void getOnContext(Handler<AsyncResult<Buffer>> completionHandler) {
Future<String> retrieveToken;
if (token == null) {
retrieveToken = getToken();
} else {
retrieveToken = Future.succeededFuture(token);
}
retrieveToken
.compose(token -> {
Promise<Buffer> promise = Promise.promise();
if (token.isEmpty()) {
promise.complete(Buffer.buffer("{}"));
return promise.future();
}
String path = "/api/v1/namespaces/" + namespace;
if (secret) {
path += "/secrets/" + name;
} else {
path += "/configmaps/" + name;
}
client.get(path)
.putHeader("Authorization", "Bearer " + token)
.send(ar -> {
if (ar.failed()) {
completionHandler.handle(ar.mapEmpty());
return;
}
HttpResponse<Buffer> response = ar.result();
if (response.statusCode() == 404) {
if (optional) {
promise.complete(Buffer.buffer("{}"));
} else {
promise.fail("Cannot find the config map '" + name + "' in '" + namespace + "'");
}
} else if (response.statusCode() == 403) {
completionHandler.handle(Future.failedFuture("Access denied to configmap or secret in namespace "
+ namespace + ": " + name));
} else if (response.statusCode() != 200) {
if (optional) {
promise.complete(Buffer.buffer("{}"));
} else {
completionHandler.handle(Future.failedFuture("Cannot retrieve the configmap or secret in namespace "
+ namespace + ": " + name + ", status code: " + response.statusCode() + ", error: "
+ response.bodyAsString()));
}
} else {
JsonObject data = response.bodyAsJsonObject().getJsonObject("data");
if (data == null) {
promise.fail("Invalid secret of configmap in namespace " + namespace + " " + name + ", the data " +
"entry is empty");
return;
}
if (this.key == null) {
if (secret) {
promise.complete(new JsonObject(asSecretObjectMap(data.getMap())).toBuffer());
}
else {
promise.complete(new JsonObject(asObjectMap(data.getMap())).toBuffer());
}
} else {
String string = data.getString(this.key);
if (string == null) {
promise.fail("Cannot find key '" + this.key + "' in the configmap or secret '" + this.name + "'");
} else {
if (secret) {
promise.complete(Buffer.buffer(DECODER.decode(string)));
}
else {
promise.complete(Buffer.buffer(string));
}
}
}
}
});
return promise.future();
}).setHandler(completionHandler);
}
private static Map<String, Object> asObjectMap(Map<String, Object> source) {
if (source == null) {
return new HashMap<>();
}
return source.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> JsonObjectHelper.convert(entry.getValue().toString())));
}
private static Map<String, Object> asSecretObjectMap(Map<String, Object> source) {
if (source == null) {
return new HashMap<>();
}
return source.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> {
String encodedString = entry.getValue().toString();
String decodedString = new String(DECODER.decode(encodedString), UTF_8);
return JsonObjectHelper.convert(decodedString);
}));
}
}