package org.ehcache.management.registry;
import org.ehcache.Cache;
import org.ehcache.Status;
import org.ehcache.core.events.CacheManagerListener;
import org.ehcache.core.spi.service.CacheManagerProviderService;
import org.ehcache.core.spi.service.ExecutionService;
import org.ehcache.core.spi.store.InternalCacheManager;
import org.ehcache.core.spi.time.TimeSource;
import org.ehcache.core.spi.time.TimeSourceService;
import org.ehcache.impl.internal.statistics.StatsUtils;
import org.ehcache.management.CollectorService;
import org.ehcache.management.ManagementRegistryService;
import org.ehcache.management.ManagementRegistryServiceConfiguration;
import org.ehcache.spi.service.Service;
import org.ehcache.spi.service.ServiceDependencies;
import org.ehcache.spi.service.ServiceProvider;
import org.terracotta.management.model.notification.ContextualNotification;
import org.terracotta.management.registry.collect.DefaultStatisticCollector;
import org.ehcache.core.statistics.CacheOperationOutcomes.ClearOutcome;
import org.terracotta.statistics.OperationStatistic;
import org.terracotta.statistics.derived.OperationResultFilter;
import java.util.EnumSet;
import java.util.concurrent.ScheduledExecutorService;
import static org.ehcache.impl.internal.executor.ExecutorUtil.shutdownNow;
@ServiceDependencies({CacheManagerProviderService.class, ManagementRegistryService.class, ExecutionService.class, TimeSourceService.class})
public class DefaultCollectorService implements CollectorService, CacheManagerListener {
private enum EhcacheNotification {
CACHE_ADDED,
CACHE_REMOVED,
CACHE_CLEARED,
CACHE_MANAGER_AVAILABLE,
CACHE_MANAGER_MAINTENANCE,
CACHE_MANAGER_CLOSED,
}
private final Collector collector;
private volatile ManagementRegistryService managementRegistry;
private volatile ScheduledExecutorService scheduledExecutorService;
private volatile InternalCacheManager cacheManager;
private volatile ManagementRegistryServiceConfiguration configuration;
private volatile DefaultStatisticCollector statisticCollector;
public DefaultCollectorService() {
this(Collector.EMPTY);
}
public DefaultCollectorService(Collector collector) {
this.collector = collector;
}
@Override
public synchronized void start(ServiceProvider<Service> serviceProvider) {
managementRegistry = serviceProvider.getService(ManagementRegistryService.class);
configuration = managementRegistry.getConfiguration();
cacheManager = serviceProvider.getService(CacheManagerProviderService.class).getCacheManager();
scheduledExecutorService = serviceProvider.getService(ExecutionService.class).getScheduledExecutor(configuration.getCollectorExecutorAlias());
TimeSource timeSource = serviceProvider.getService(TimeSourceService.class).getTimeSource();
statisticCollector = new DefaultStatisticCollector(
managementRegistry,
scheduledExecutorService,
collector::onStatistics,
timeSource::getTimeMillis);
cacheManager.registerListener(this);
}
@Override
public synchronized void stop() {
collector.onNotification(
new ContextualNotification(
configuration.getContext(),
EhcacheNotification.CACHE_MANAGER_CLOSED.name()));
statisticCollector.stopStatisticCollector();
shutdownNow(scheduledExecutorService);
}
@Override
public void cacheAdded(String alias, Cache<?, ?> cache) {
registerClearNotification(alias, cache);
collector.onNotification(
new ContextualNotification(
configuration.getContext().with("cacheName", alias),
EhcacheNotification.CACHE_ADDED.name()));
}
@Override
public void cacheRemoved(String alias, Cache<?, ?> cache) {
collector.onNotification(
new ContextualNotification(
configuration.getContext().with("cacheName", alias),
EhcacheNotification.CACHE_REMOVED.name()));
}
private void cacheCleared(String alias) {
collector.onNotification(
new ContextualNotification(
configuration.getContext().with("cacheName", alias),
EhcacheNotification.CACHE_CLEARED.name()));
}
private void registerClearNotification(String alias, Cache<?, ?> cache) {
OperationStatistic<ClearOutcome> clear = StatsUtils.findOperationStatisticOnChildren(cache,
ClearOutcome.class, "clear");
clear.addDerivedStatistic(new OperationResultFilter<>(EnumSet.of(ClearOutcome.SUCCESS),
(time, latency) -> cacheCleared(alias)));
}
@Override
public void stateTransition(Status from, Status to) {
switch (to) {
case AVAILABLE:
managementRegistry.register(statisticCollector);
collector.onNotification(
new ContextualNotification(
configuration.getContext(),
EhcacheNotification.CACHE_MANAGER_AVAILABLE.name()));
break;
case MAINTENANCE:
collector.onNotification(
new ContextualNotification(
configuration.getContext(),
EhcacheNotification.CACHE_MANAGER_MAINTENANCE.name()));
break;
case UNINITIALIZED:
cacheManager.deregisterListener(this);
break;
default:
throw new AssertionError("Unsupported state: " + to);
}
}
}