package io.vertx.config.impl;
import io.vertx.config.ConfigChange;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
import io.vertx.config.spi.ConfigProcessor;
import io.vertx.config.spi.ConfigStore;
import io.vertx.config.spi.ConfigStoreFactory;
import io.vertx.config.spi.utils.Processors;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import java.io.File;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ConfigRetrieverImpl implements ConfigRetriever {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRetrieverImpl.class);
private static final String DEFAULT_CONFIG_PATH = "conf" + File.separator + "config.json";
private final ContextInternal context;
private final List<ConfigurationProvider> providers;
private long scan;
private final List<Handler<ConfigChange>> listeners = new ArrayList<>();
private final ConfigStreamImpl streamOfConfiguration = new ConfigStreamImpl();
private final ConfigRetrieverOptions options;
private JsonObject current = new JsonObject();
private Handler<Void> beforeScan;
private Function<JsonObject, JsonObject> processor;
public ConfigRetrieverImpl(Vertx vertx, ConfigRetrieverOptions options) {
this.context = (ContextInternal) vertx.getOrCreateContext();
this.options = options;
ServiceLoader<ConfigStoreFactory> storeImpl =
ServiceLoader.load(ConfigStoreFactory.class,
ConfigStoreFactory.class.getClassLoader());
Map<String, ConfigStoreFactory> nameToImplMap = new HashMap<>();
storeImpl.iterator().forEachRemaining(factory -> nameToImplMap.put(factory.name(), factory));
if (nameToImplMap.isEmpty()) {
throw new IllegalStateException("No configuration store implementations found on the classpath");
}
List<ConfigStoreOptions> stores = options.getStores();
if (options.isIncludeDefaultStores()) {
stores = new ArrayList<>();
stores.add(
new ConfigStoreOptions().setType("json")
.setConfig(vertx.getOrCreateContext().config()));
stores.add(new ConfigStoreOptions().setType("sys"));
stores.add(new ConfigStoreOptions().setType("env"));
String defaultConfigPath = getDefaultConfigPath();
if (defaultConfigPath != null && ! defaultConfigPath.trim().isEmpty()) {
String format = extractFormatFromFileExtension(defaultConfigPath);
LOGGER.info("Config file path: " + defaultConfigPath + ", format:" + format);
stores.add(new ConfigStoreOptions()
.setType("file").setFormat(format)
.setOptional(true)
.setConfig(new JsonObject().put("path", defaultConfigPath)));
}
stores.addAll(options.getStores());
}
providers = new ArrayList<>();
for (ConfigStoreOptions option : stores) {
String type = option.getType();
if (type == null) {
throw new IllegalArgumentException(
"the `type` entry is mandatory in a configuration store configuration");
}
ConfigStoreFactory factory = nameToImplMap.get(type);
if (factory == null) {
throw new IllegalArgumentException("unknown configuration store implementation: " +
type + " (known implementations are: " + nameToImplMap.keySet() + ")");
}
JsonObject config = option.getConfig();
if (config == null) {
config = new JsonObject();
}
ConfigStore store = factory.create(vertx, config);
String format = option.getFormat() != null ? option.getFormat() : "json";
ConfigProcessor processor = Processors.get(format);
if (processor == null) {
throw new IllegalArgumentException("unknown configuration format: " + format + " (supported formats are: " +
Processors.getSupportedFormats());
}
providers.add(new ConfigurationProvider(store, processor, option.getConfig(), option.isOptional()));
}
}
static String (String path) {
int index = path.lastIndexOf(".");
if (index == -1) {
return "json";
} else {
String ext = path.substring(index + 1);
if (ext.trim().isEmpty()) {
return "json";
}
if ("yml".equalsIgnoreCase(ext)) {
ext = "yaml";
}
return ext.toLowerCase();
}
}
private String getDefaultConfigPath() {
String value = System.getenv("VERTX_CONFIG_PATH");
if (value == null || value.trim().isEmpty()) {
value = System.getProperty("vertx-config-path");
}
if (value != null && ! value.trim().isEmpty()) {
return value.trim();
}
File file = context.owner().resolveFile(DEFAULT_CONFIG_PATH);
boolean exists = file != null && file.exists();
if (exists) {
return file.getAbsolutePath();
}
return null;
}
public synchronized void initializePeriodicScan() {
if (options.getScanPeriod() > 0) {
this.scan = context.setPeriodic(options.getScanPeriod(), l -> scan());
} else {
this.scan = -1;
}
}
@Override
public void getConfig(Handler<AsyncResult<JsonObject>> completionHandler) {
Objects.requireNonNull(completionHandler);
getConfig().onComplete(completionHandler);
}
@Override
public Future<JsonObject> getConfig() {
return compute().onSuccess(result -> {
synchronized (this) {
current = result;
}
streamOfConfiguration.handle(result);
});
}
@Override
public synchronized void close() {
if (scan != -1) {
context.owner().cancelTimer(scan);
}
streamOfConfiguration.close();
for (ConfigurationProvider provider : providers) {
provider.close();
}
}
@Override
public synchronized JsonObject getCachedConfig() {
return current.copy();
}
@Override
public synchronized void listen(Handler<ConfigChange> listener) {
Objects.requireNonNull(listener);
listeners.add(listener);
}
@Override
public synchronized ConfigRetriever setBeforeScanHandler(Handler<Void> handler) {
this.beforeScan = Objects.requireNonNull(handler, "The handler must not be `null`");
return this;
}
@Override
public synchronized ConfigRetriever setConfigurationProcessor(Function<JsonObject, JsonObject> processor) {
this.processor = Objects.requireNonNull(processor, "The processor must not be `null`");
return this;
}
@Override
public ReadStream<JsonObject> configStream() {
return streamOfConfiguration;
}
private void scan() {
Handler<Void> h;
synchronized (this) {
h = this.beforeScan;
}
if (h != null) {
h.handle(null);
}
compute().onFailure(throwable -> {
streamOfConfiguration.fail(throwable);
LOGGER.error("Error while scanning configuration", throwable);
}).onSuccess(result -> {
JsonObject prev;
List<Handler<ConfigChange>> handlers;
synchronized (this) {
if (!current.equals(result)) {
prev = current;
current = result;
handlers = !listeners.isEmpty() ? new ArrayList<>(listeners) : Collections.emptyList();
} else {
prev = null;
handlers = null;
}
}
if (handlers != null) {
handlers.forEach(changeHandler -> changeHandler.handle(new ConfigChange(prev, result)));
streamOfConfiguration.handle(result);
}
});
}
private Future<JsonObject> compute() {
List<Future> futures = providers.stream()
.map(s -> s.get(context.owner()))
.collect(Collectors.toList());
return CompositeFuture.all(futures).map(compositeFuture -> {
JsonObject json = new JsonObject();
futures.forEach(future -> json.mergeIn((JsonObject) future.result(), true));
return json;
}).map(json -> processor != null ? processor.apply(json) : json);
}
public List<ConfigurationProvider> getProviders() {
return Collections.unmodifiableList(providers);
}
private class ConfigStreamImpl implements ReadStream<JsonObject> {
private Handler<JsonObject> handler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private JsonObject last;
private long demand = Long.MAX_VALUE;
@Override
public synchronized ReadStream<JsonObject> exceptionHandler(Handler<Throwable> handler) {
Objects.requireNonNull(handler);
this.exceptionHandler = handler;
return this;
}
@Override
public ReadStream<JsonObject> handler(Handler<JsonObject> handler) {
Objects.requireNonNull(handler);
JsonObject conf;
synchronized (this) {
this.handler = handler;
conf = getCachedConfig();
}
if (conf != null && !conf.isEmpty()) {
context.runOnContext(v -> this.handler.handle(conf));
}
return this;
}
@Override
public synchronized ReadStream<JsonObject> pause() {
demand = 0L;
return this;
}
@Override
public synchronized ReadStream<JsonObject> resume() {
boolean check = demand == 0;
demand = Long.MAX_VALUE;
if (check) {
checkPending();
}
return this;
}
@Override
public synchronized ReadStream<JsonObject> fetch(long amount) {
boolean check = demand == 0;
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (check) {
checkPending();
}
return this;
}
private void checkPending() {
Handler<JsonObject> succ = handler;
JsonObject conf = last;
last = null;
if (conf != null) {
if (demand != Long.MAX_VALUE) {
demand--;
}
if (succ != null) {
context.runOnContext(v -> succ.handle(conf));
}
}
}
@Override
public synchronized ReadStream<JsonObject> endHandler(Handler<Void> endHandler) {
Objects.requireNonNull(endHandler);
this.endHandler = endHandler;
return this;
}
synchronized void handle(JsonObject conf) {
Handler<JsonObject> succ = handler;
boolean isPaused = demand == 0;
if (isPaused) {
last = conf;
} else if (demand < Long.MAX_VALUE) {
demand--;
}
if (!isPaused && succ != null) {
context.runOnContext(v -> succ.handle(conf));
}
}
void fail(Throwable cause) {
Handler<Throwable> err;
synchronized (this) {
err = exceptionHandler;
}
if (err != null) {
context.runOnContext(v -> err.handle(cause));
}
}
void close() {
Handler<Void> handler;
synchronized (this) {
handler = endHandler;
}
if (handler != null) {
context.runOnContext(v -> handler.handle(null));
}
}
}
}