package org.ehcache.impl.internal.store.heap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.ehcache.Cache;
import org.ehcache.config.SizedResourcePool;
import org.ehcache.core.CacheConfigurationChangeEvent;
import org.ehcache.core.CacheConfigurationChangeListener;
import org.ehcache.core.CacheConfigurationProperty;
import org.ehcache.config.EvictionAdvisor;
import org.ehcache.config.ResourcePools;
import org.ehcache.config.ResourceType;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.config.ExpiryUtils;
import org.ehcache.core.events.StoreEventDispatcher;
import org.ehcache.core.events.StoreEventSink;
import org.ehcache.core.spi.service.StatisticsService;
import org.ehcache.core.statistics.OperationObserver;
import org.ehcache.core.statistics.OperationStatistic;
import org.ehcache.impl.internal.concurrent.EvictingConcurrentMap;
import org.ehcache.impl.store.BaseStore;
import org.ehcache.spi.resilience.StoreAccessException;
import org.ehcache.core.spi.store.heap.LimitExceededException;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.copy.IdentityCopier;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.ehcache.impl.copy.SerializingCopier;
import org.ehcache.core.events.NullStoreEventDispatcher;
import org.ehcache.impl.store.DefaultStoreEventDispatcher;
import org.ehcache.impl.internal.sizeof.NoopSizeOfEngine;
import org.ehcache.impl.internal.store.heap.holders.CopiedOnHeapValueHolder;
import org.ehcache.impl.internal.store.heap.holders.OnHeapValueHolder;
import org.ehcache.impl.internal.store.heap.holders.SerializedOnHeapValueHolder;
import org.ehcache.core.spi.time.TimeSource;
import org.ehcache.core.spi.time.TimeSourceService;
import org.ehcache.impl.store.HashUtils;
import org.ehcache.impl.serialization.TransientStateRepository;
import org.ehcache.sizeof.annotations.IgnoreSizeOf;
import org.ehcache.spi.serialization.Serializer;
import org.ehcache.spi.serialization.StatefulSerializer;
import org.ehcache.core.spi.store.Store;
import org.ehcache.core.spi.store.events.StoreEventSource;
import org.ehcache.core.spi.store.tiering.CachingTier;
import org.ehcache.core.spi.store.tiering.HigherCachingTier;
import org.ehcache.impl.internal.store.BinaryValueHolder;
import org.ehcache.spi.copy.Copier;
import org.ehcache.spi.copy.CopyProvider;
import org.ehcache.spi.service.ServiceConfiguration;
import org.ehcache.spi.service.ServiceDependencies;
import org.ehcache.core.spi.store.heap.SizeOfEngine;
import org.ehcache.core.spi.store.heap.SizeOfEngineProvider;
import org.ehcache.core.statistics.CachingTierOperationOutcomes;
import org.ehcache.core.statistics.HigherCachingTierOperationOutcomes;
import org.ehcache.core.statistics.StoreOperationOutcomes;
import org.ehcache.core.collections.ConcurrentWeakIdentityHashMap;
import org.ehcache.core.statistics.TierOperationOutcomes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.management.model.stats.StatisticType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.ehcache.config.Eviction.noAdvice;
import static org.ehcache.core.config.ExpiryUtils.isExpiryDurationInfinite;
import static org.ehcache.core.exceptions.StorePassThroughException.handleException;
public class OnHeapStore<K, V> extends BaseStore<K, V> implements HigherCachingTier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(OnHeapStore.class);
private static final int ATTEMPT_RATIO = 4;
private static final int EVICTION_RATIO = 2;
private static final EvictionAdvisor<Object, OnHeapValueHolder<?>> EVICTION_ADVISOR = (key, value) -> value.evictionAdvice();
private static final Comparator<ValueHolder<?>> EVICTION_PRIORITIZER = (t, u) -> {
if (t instanceof Fault) {
return -1;
} else if (u instanceof Fault) {
return 1;
} else {
return Long.signum(u.lastAccessTime() - t.lastAccessTime());
}
};
private static final InvalidationListener<?, ?> NULL_INVALIDATION_LISTENER = (InvalidationListener<Object, Object>) (key, valueHolder) -> {
};
static final int SAMPLE_SIZE = 8;
private final Backend<K, V> map;
private final Copier<V> valueCopier;
private final SizeOfEngine sizeOfEngine;
private final OnHeapStrategy<K, V> strategy;
private volatile long capacity;
private final EvictionAdvisor<? super K, ? super V> evictionAdvisor;
private final ExpiryPolicy<? super K, ? super V> expiry;
private final TimeSource timeSource;
private final StoreEventDispatcher<K, V> storeEventDispatcher;
@SuppressWarnings("unchecked")
private volatile InvalidationListener<K, V> invalidationListener = (InvalidationListener<K, V>) NULL_INVALIDATION_LISTENER;
private final CacheConfigurationChangeListener cacheConfigurationChangeListener = new CacheConfigurationChangeListener() {
@Override
public void cacheConfigurationChange(CacheConfigurationChangeEvent event) {
if(event.getProperty().equals(CacheConfigurationProperty.UPDATE_SIZE)) {
ResourcePools updatedPools = (ResourcePools)event.getNewValue();
ResourcePools configuredPools = (ResourcePools)event.getOldValue();
if(updatedPools.getPoolForResource(ResourceType.Core.HEAP).getSize() !=
configuredPools.getPoolForResource(ResourceType.Core.HEAP).getSize()) {
LOG.info("Updating size to: {}", updatedPools.getPoolForResource(ResourceType.Core.HEAP).getSize());
SizedResourcePool pool = updatedPools.getPoolForResource(ResourceType.Core.HEAP);
if (pool.getUnit() instanceof MemoryUnit) {
capacity = ((MemoryUnit)pool.getUnit()).toBytes(pool.getSize());
} else {
capacity = pool.getSize();
}
}
}
}
};
private final OperationObserver<StoreOperationOutcomes.GetOutcome> getObserver;
private final OperationObserver<StoreOperationOutcomes.PutOutcome> putObserver;
private final OperationObserver<StoreOperationOutcomes.RemoveOutcome> removeObserver;
private final OperationObserver<StoreOperationOutcomes.PutIfAbsentOutcome> putIfAbsentObserver;
private final OperationObserver<StoreOperationOutcomes.ConditionalRemoveOutcome> conditionalRemoveObserver;
private final OperationObserver<StoreOperationOutcomes.ReplaceOutcome> replaceObserver;
private final OperationObserver<StoreOperationOutcomes.ConditionalReplaceOutcome> conditionalReplaceObserver;
private final OperationObserver<StoreOperationOutcomes.ComputeOutcome> computeObserver;
private final OperationObserver<StoreOperationOutcomes.ComputeIfAbsentOutcome> computeIfAbsentObserver;
private final OperationObserver<StoreOperationOutcomes.EvictionOutcome> evictionObserver;
private final OperationObserver<StoreOperationOutcomes.ExpirationOutcome> expirationObserver;
private final OperationObserver<CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome> getOrComputeIfAbsentObserver;
private final OperationObserver<CachingTierOperationOutcomes.InvalidateOutcome> invalidateObserver;
private final OperationObserver<CachingTierOperationOutcomes.InvalidateAllOutcome> invalidateAllObserver;
private final OperationObserver<CachingTierOperationOutcomes.InvalidateAllWithHashOutcome> invalidateAllWithHashObserver;
private final OperationObserver<HigherCachingTierOperationOutcomes.SilentInvalidateOutcome> silentInvalidateObserver;
private final OperationObserver<HigherCachingTierOperationOutcomes.SilentInvalidateAllOutcome> silentInvalidateAllObserver;
private final OperationObserver<HigherCachingTierOperationOutcomes.SilentInvalidateAllWithHashOutcome> silentInvalidateAllWithHashObserver;
private static final Supplier<Boolean> REPLACE_EQUALS_TRUE = () -> Boolean.TRUE;
public OnHeapStore(Configuration<K, V> config, TimeSource timeSource, Copier<K> keyCopier, Copier<V> valueCopier, SizeOfEngine sizeOfEngine, StoreEventDispatcher<K, V> eventDispatcher, StatisticsService statisticsService) {
this(config, timeSource, keyCopier, valueCopier, sizeOfEngine, eventDispatcher, ConcurrentHashMap::new, statisticsService);
}
public OnHeapStore(Configuration<K, V> config, TimeSource timeSource, Copier<K> keyCopier, Copier<V> valueCopier,
SizeOfEngine sizeOfEngine, StoreEventDispatcher<K, V> eventDispatcher, Supplier<EvictingConcurrentMap<?, ?>> backingMapSupplier, StatisticsService statisticsService) {
super(config, statisticsService);
Objects.requireNonNull(keyCopier, "keyCopier must not be null");
this.valueCopier = Objects.requireNonNull(valueCopier, "valueCopier must not be null");
this.timeSource = Objects.requireNonNull(timeSource, "timeSource must not be null");
this.sizeOfEngine = Objects.requireNonNull(sizeOfEngine, "sizeOfEngine must not be null");
SizedResourcePool heapPool = config.getResourcePools().getPoolForResource(ResourceType.Core.HEAP);
if (heapPool == null) {
throw new IllegalArgumentException("OnHeap store must be configured with a resource of type 'heap'");
}
boolean byteSized = !(this.sizeOfEngine instanceof NoopSizeOfEngine);
this.capacity = byteSized ? ((MemoryUnit) heapPool.getUnit()).toBytes(heapPool.getSize()) : heapPool.getSize();
if (config.getEvictionAdvisor() == null) {
this.evictionAdvisor = noAdvice();
} else {
this.evictionAdvisor = config.getEvictionAdvisor();
}
this.expiry = config.getExpiry();
this.storeEventDispatcher = eventDispatcher;
if (keyCopier instanceof IdentityCopier) {
this.map = new SimpleBackend<>(byteSized, castBackend(backingMapSupplier));
} else {
this.map = new KeyCopyBackend<>(byteSized, keyCopier, castBackend(backingMapSupplier));
}
strategy = OnHeapStrategy.strategy(this, expiry, timeSource);
getObserver = createObserver("get", StoreOperationOutcomes.GetOutcome.class, true);
putObserver = createObserver("put", StoreOperationOutcomes.PutOutcome.class, true);
removeObserver = createObserver("remove", StoreOperationOutcomes.RemoveOutcome.class, true);
putIfAbsentObserver = createObserver("putIfAbsent", StoreOperationOutcomes.PutIfAbsentOutcome.class, true);
conditionalRemoveObserver = createObserver("conditionalRemove", StoreOperationOutcomes.ConditionalRemoveOutcome.class, true);
replaceObserver = createObserver("replace", StoreOperationOutcomes.ReplaceOutcome.class, true);
conditionalReplaceObserver = createObserver("conditionalReplace", StoreOperationOutcomes.ConditionalReplaceOutcome.class, true);
computeObserver = createObserver("compute", StoreOperationOutcomes.ComputeOutcome.class, true);
computeIfAbsentObserver = createObserver("computeIfAbsent", StoreOperationOutcomes.ComputeIfAbsentOutcome.class, true);
evictionObserver = createObserver("eviction", StoreOperationOutcomes.EvictionOutcome.class, false);
expirationObserver = createObserver("expiration", StoreOperationOutcomes.ExpirationOutcome.class, false);
getOrComputeIfAbsentObserver = createObserver("getOrComputeIfAbsent", CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.class, true);
invalidateObserver = createObserver("invalidate", CachingTierOperationOutcomes.InvalidateOutcome.class, true);
invalidateAllObserver = createObserver("invalidateAll", CachingTierOperationOutcomes.InvalidateAllOutcome.class, true);
invalidateAllWithHashObserver = createObserver("invalidateAllWithHash", CachingTierOperationOutcomes.InvalidateAllWithHashOutcome.class, true);
silentInvalidateObserver = createObserver("silentInvalidate", HigherCachingTierOperationOutcomes.SilentInvalidateOutcome.class, true);
silentInvalidateAllObserver = createObserver("silentInvalidateAll", HigherCachingTierOperationOutcomes.SilentInvalidateAllOutcome.class, true);
silentInvalidateAllWithHashObserver = createObserver("silentInvalidateAllWithHash", HigherCachingTierOperationOutcomes.SilentInvalidateAllWithHashOutcome.class, true);
Set<String> tags = new HashSet<>(Arrays.asList(getStatisticsTag(), "tier"));
registerStatistic("mappings", StatisticType.COUNTER, tags, () -> map.mappingCount());
if (byteSized) {
registerStatistic("occupiedMemory", StatisticType.GAUGE, tags, () -> map.byteSize());
}
}
@Override
protected String getStatisticsTag() {
return "OnHeap";
}
@SuppressWarnings({"unchecked", "rawtype"})
private <L, M> Supplier<EvictingConcurrentMap<L, M>> castBackend(Supplier<EvictingConcurrentMap<?, ?>> backingMap) {
return (Supplier) backingMap;
}
@Override
public ValueHolder<V> get(K key) throws StoreAccessException {
checkKey(key);
getObserver.begin();
try {
OnHeapValueHolder<V> mapping = getQuiet(key);
if (mapping == null) {
getObserver.end(StoreOperationOutcomes.GetOutcome.MISS);
return null;
}
strategy.setAccessAndExpiryTimeWhenCallerOutsideLock(key, mapping, timeSource.getTimeMillis());
getObserver.end(StoreOperationOutcomes.GetOutcome.HIT);
return mapping;
} catch (RuntimeException re) {
throw handleException(re);
}
}
private OnHeapValueHolder<V> getQuiet(K key) throws StoreAccessException {
try {
OnHeapValueHolder<V> mapping = map.get(key);
if (mapping == null) {
return null;
}
if (strategy.isExpired(mapping)) {
expireMappingUnderLock(key, mapping);
return null;
}
return mapping;
} catch (RuntimeException re) {
throw handleException(re);
}
}
@Override
public boolean containsKey(K key) throws StoreAccessException {
checkKey(key);
return getQuiet(key) != null;
}
@Override
public PutStatus put(K key, V value) throws StoreAccessException {
checkKey(key);
checkValue(value);
putObserver.begin();
long now = timeSource.getTimeMillis();
AtomicReference<StoreOperationOutcomes.PutOutcome> statOutcome = new AtomicReference<>(StoreOperationOutcomes.PutOutcome.NOOP);
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
map.compute(key, (mappedKey, mappedValue) -> {
long delta = 0;
if (mappedValue != null && mappedValue.isExpired(now)) {
delta -= mappedValue.size();
mappedValue = null;
}
OnHeapValueHolder<V> newValue;
if (mappedValue == null) {
newValue = newCreateValueHolder(key, value, now, eventSink);
if (newValue != null) {
delta += newValue.size();
statOutcome.set(StoreOperationOutcomes.PutOutcome.PUT);
}
} else {
newValue = newUpdateValueHolder(key, mappedValue, value, now, eventSink);
if (newValue != null) {
delta += newValue.size() - mappedValue.size();
} else {
delta -= mappedValue.size();
}
statOutcome.set(StoreOperationOutcomes.PutOutcome.PUT);
}
updateUsageInBytesIfRequired(delta);
return newValue;
});
storeEventDispatcher.releaseEventSink(eventSink);
enforceCapacity();
StoreOperationOutcomes.PutOutcome outcome = statOutcome.get();
putObserver.end(outcome);
switch (outcome) {
case PUT:
return PutStatus.PUT;
case NOOP:
return PutStatus.NOOP;
default:
throw new AssertionError("Unknown enum value " + outcome);
}
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
putObserver.end(StoreOperationOutcomes.PutOutcome.FAILURE);
throw handleException(re);
}
}
@Override
public boolean remove(K key) throws StoreAccessException {
checkKey(key);
removeObserver.begin();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
long now = timeSource.getTimeMillis();
try {
AtomicReference<StoreOperationOutcomes.RemoveOutcome> statisticOutcome = new AtomicReference<>(StoreOperationOutcomes.RemoveOutcome.MISS);
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
updateUsageInBytesIfRequired(- mappedValue.size());
if (mappedValue.isExpired(now)) {
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
return null;
}
statisticOutcome.set(StoreOperationOutcomes.RemoveOutcome.REMOVED);
eventSink.removed(mappedKey, mappedValue);
return null;
});
storeEventDispatcher.releaseEventSink(eventSink);
StoreOperationOutcomes.RemoveOutcome outcome = statisticOutcome.get();
removeObserver.end(outcome);
switch (outcome) {
case REMOVED:
return true;
case MISS:
return false;
default:
throw new AssertionError("Unknown enum value " + outcome);
}
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public ValueHolder<V> putIfAbsent(K key, V value, Consumer<Boolean> put) throws StoreAccessException {
checkKey(key);
checkValue(value);
putIfAbsentObserver.begin();
AtomicReference<OnHeapValueHolder<V>> returnValue = new AtomicReference<>(null);
AtomicBoolean entryActuallyAdded = new AtomicBoolean();
long now = timeSource.getTimeMillis();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
map.compute(key, (mappedKey, mappedValue) -> {
long delta = 0;
OnHeapValueHolder<V> holder;
if (mappedValue == null || mappedValue.isExpired(now)) {
if (mappedValue != null) {
delta -= mappedValue.size();
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
}
holder = newCreateValueHolder(key, value, now, eventSink);
if (holder != null) {
delta += holder.size();
}
entryActuallyAdded.set(holder != null);
} else {
returnValue.set(mappedValue);
holder = strategy.setAccessAndExpiryWhenCallerlUnderLock(key, mappedValue, now, eventSink);
if (holder == null) {
delta -= mappedValue.size();
}
}
updateUsageInBytesIfRequired(delta);
return holder;
});
storeEventDispatcher.releaseEventSink(eventSink);
if (entryActuallyAdded.get()) {
enforceCapacity();
putIfAbsentObserver.end(StoreOperationOutcomes.PutIfAbsentOutcome.PUT);
} else {
putIfAbsentObserver.end(StoreOperationOutcomes.PutIfAbsentOutcome.HIT);
}
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
return returnValue.get();
}
@Override
public RemoveStatus remove(K key, V value) throws StoreAccessException {
checkKey(key);
checkValue(value);
conditionalRemoveObserver.begin();
AtomicReference<RemoveStatus> outcome = new AtomicReference<>(RemoveStatus.KEY_MISSING);
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
long now = timeSource.getTimeMillis();
if (mappedValue.isExpired(now)) {
updateUsageInBytesIfRequired(- mappedValue.size());
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
return null;
} else if (value.equals(mappedValue.get())) {
updateUsageInBytesIfRequired(- mappedValue.size());
eventSink.removed(mappedKey, mappedValue);
outcome.set(RemoveStatus.REMOVED);
return null;
} else {
outcome.set(RemoveStatus.KEY_PRESENT);
OnHeapValueHolder<V> holder = strategy.setAccessAndExpiryWhenCallerlUnderLock(key, mappedValue, now, eventSink);
if (holder == null) {
updateUsageInBytesIfRequired(- mappedValue.size());
}
return holder;
}
});
storeEventDispatcher.releaseEventSink(eventSink);
RemoveStatus removeStatus = outcome.get();
switch (removeStatus) {
case REMOVED:
conditionalRemoveObserver.end(StoreOperationOutcomes.ConditionalRemoveOutcome.REMOVED);
break;
case KEY_MISSING:
case KEY_PRESENT:
conditionalRemoveObserver.end(StoreOperationOutcomes.ConditionalRemoveOutcome.MISS);
break;
default:
}
return removeStatus;
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public ValueHolder<V> replace(K key, V value) throws StoreAccessException {
checkKey(key);
checkValue(value);
replaceObserver.begin();
AtomicReference<OnHeapValueHolder<V>> returnValue = new AtomicReference<>(null);
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
long now = timeSource.getTimeMillis();
if (mappedValue.isExpired(now)) {
updateUsageInBytesIfRequired(- mappedValue.size());
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
return null;
} else {
returnValue.set(mappedValue);
OnHeapValueHolder<V> holder = newUpdateValueHolder(key, mappedValue, value, now, eventSink);
if (holder != null) {
updateUsageInBytesIfRequired(holder.size() - mappedValue.size());
} else {
updateUsageInBytesIfRequired(- mappedValue.size());
}
return holder;
}
});
OnHeapValueHolder<V> valueHolder = returnValue.get();
storeEventDispatcher.releaseEventSink(eventSink);
enforceCapacity();
if (valueHolder != null) {
replaceObserver.end(StoreOperationOutcomes.ReplaceOutcome.REPLACED);
} else {
replaceObserver.end(StoreOperationOutcomes.ReplaceOutcome.MISS);
}
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
return returnValue.get();
}
@Override
public ReplaceStatus replace(K key, V oldValue, V newValue) throws StoreAccessException {
checkKey(key);
checkValue(oldValue);
checkValue(newValue);
conditionalReplaceObserver.begin();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
AtomicReference<ReplaceStatus> outcome = new AtomicReference<>(ReplaceStatus.MISS_NOT_PRESENT);
try {
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
long now = timeSource.getTimeMillis();
V existingValue = mappedValue.get();
if (mappedValue.isExpired(now)) {
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
updateUsageInBytesIfRequired(- mappedValue.size());
return null;
} else if (oldValue.equals(existingValue)) {
outcome.set(ReplaceStatus.HIT);
OnHeapValueHolder<V> holder = newUpdateValueHolder(key, mappedValue, newValue, now, eventSink);
if (holder != null) {
updateUsageInBytesIfRequired(holder.size() - mappedValue.size());
} else {
updateUsageInBytesIfRequired(- mappedValue.size());
}
return holder;
} else {
outcome.set(ReplaceStatus.MISS_PRESENT);
OnHeapValueHolder<V> holder = strategy.setAccessAndExpiryWhenCallerlUnderLock(key, mappedValue, now, eventSink);
if (holder == null) {
updateUsageInBytesIfRequired(- mappedValue.size());
}
return holder;
}
});
storeEventDispatcher.releaseEventSink(eventSink);
enforceCapacity();
ReplaceStatus replaceStatus = outcome.get();
switch (replaceStatus) {
case HIT:
conditionalReplaceObserver.end(StoreOperationOutcomes.ConditionalReplaceOutcome.REPLACED);
break;
case MISS_PRESENT:
case MISS_NOT_PRESENT:
conditionalReplaceObserver.end(StoreOperationOutcomes.ConditionalReplaceOutcome.MISS);
break;
default:
throw new AssertionError("Unknown enum value " + replaceStatus);
}
return replaceStatus;
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public void clear() {
map.clear();
}
@Override
public Iterator<Cache.Entry<K, ValueHolder<V>>> iterator() {
java.util.Iterator<Entry<K, OnHeapValueHolder<V>>> iterator = map.entrySetIterator();
return new Iterator<Cache.Entry<K, ValueHolder<V>>>() {
private Cache.Entry<K, ValueHolder<V>> prefetched = advance();
@Override
public boolean hasNext() {
return prefetched != null;
}
@Override
public Cache.Entry<K, ValueHolder<V>> next() throws StoreAccessException {
if (prefetched == null) {
throw new NoSuchElementException();
} else {
Cache.Entry<K, ValueHolder<V>> next = prefetched;
prefetched = advance();
return next;
}
}
private Cache.Entry<K, ValueHolder<V>> advance() {
while (iterator.hasNext()) {
Entry<K, OnHeapValueHolder<V>> next = iterator.next();
if (strategy.isExpired(next.getValue())) {
expireMappingUnderLock(next.getKey(), next.getValue());
} else {
return new Cache.Entry<K, ValueHolder<V>>() {
@Override
public K getKey() {
return next.getKey();
}
@Override
public ValueHolder<V> getValue() {
return next.getValue();
}
};
}
}
return null;
}
};
}
@Override
public ValueHolder<V> getOrComputeIfAbsent(K key, Function<K, ValueHolder<V>> source) throws StoreAccessException {
try {
getOrComputeIfAbsentObserver.begin();
Backend<K, V> backEnd = map;
OnHeapValueHolder<V> cachedValue = backEnd.get(key);
long now = timeSource.getTimeMillis();
if (cachedValue == null) {
Fault<V> fault = new Fault<>(() -> source.apply(key));
cachedValue = backEnd.putIfAbsent(key, fault);
if (cachedValue == null) {
return resolveFault(key, backEnd, now, fault);
}
}
if (!(cachedValue instanceof Fault)) {
if (cachedValue.isExpired(now)) {
expireMappingUnderLock(key, cachedValue);
Fault<V> fault = new Fault<>(() -> source.apply(key));
cachedValue = backEnd.putIfAbsent(key, fault);
if (cachedValue == null) {
return resolveFault(key, backEnd, now, fault);
}
}
else {
strategy.setAccessAndExpiryTimeWhenCallerOutsideLock(key, cachedValue, now);
}
}
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.HIT);
return getValue(cachedValue);
} catch (RuntimeException re) {
throw handleException(re);
}
}
@Override
public ValueHolder<V> getOrDefault(K key, Function<K, ValueHolder<V>> source) throws StoreAccessException {
try {
Backend<K, V> backEnd = map;
OnHeapValueHolder<V> cachedValue = backEnd.get(key);
if (cachedValue == null) {
return source.apply(key);
} else {
if (!(cachedValue instanceof Fault)) {
if (cachedValue.isExpired(timeSource.getTimeMillis())) {
expireMappingUnderLock(key, cachedValue);
return null;
}
}
return getValue(cachedValue);
}
} catch (RuntimeException re) {
throw handleException(re);
}
}
private ValueHolder<V> resolveFault(K key, Backend<K, V> backEnd, long now, Fault<V> fault) throws StoreAccessException {
try {
ValueHolder<V> value = fault.getValueHolder();
OnHeapValueHolder<V> newValue;
if(value != null) {
newValue = importValueFromLowerTier(key, value, now, backEnd, fault);
if (newValue == null) {
backEnd.remove(key, fault);
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return value;
}
} else {
backEnd.remove(key, fault);
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.MISS);
return null;
}
if (backEnd.replace(key, fault, newValue)) {
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULTED);
updateUsageInBytesIfRequired(newValue.size());
enforceCapacity();
return newValue;
}
AtomicReference<ValueHolder<V>> invalidatedValue = new AtomicReference<>();
backEnd.computeIfPresent(key, (mappedKey, mappedValue) -> {
notifyInvalidation(key, mappedValue);
invalidatedValue.set(mappedValue);
updateUsageInBytesIfRequired(mappedValue.size());
return null;
});
ValueHolder<V> p = getValue(invalidatedValue.get());
if (p != null) {
if (p.isExpired(now)) {
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED_MISS);
return null;
}
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return p;
}
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return newValue;
} catch (Throwable e) {
backEnd.remove(key, fault);
throw new StoreAccessException(e);
}
}
private void invalidateInGetOrComputeIfAbsent(Backend<K, V> map, K key, ValueHolder<V> value, Fault<V> fault, long now, Duration expiration) {
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
if(mappedValue.equals(fault)) {
try {
invalidationListener.onInvalidation(key, cloneValueHolder(key, value, now, expiration, false));
} catch (LimitExceededException ex) {
throw new AssertionError("Sizing is not expected to happen.");
}
return null;
}
return mappedValue;
});
}
@Override
public void invalidate(K key) throws StoreAccessException {
checkKey(key);
invalidateObserver.begin();
try {
AtomicReference<CachingTierOperationOutcomes.InvalidateOutcome> outcome = new AtomicReference<>(CachingTierOperationOutcomes.InvalidateOutcome.MISS);
map.computeIfPresent(key, (k, present) -> {
if (!(present instanceof Fault)) {
notifyInvalidation(key, present);
outcome.set(CachingTierOperationOutcomes.InvalidateOutcome.REMOVED);
}
updateUsageInBytesIfRequired(- present.size());
return null;
});
invalidateObserver.end(outcome.get());
} catch (RuntimeException re) {
throw handleException(re);
}
}
@Override
public void silentInvalidate(K key, Function<Store.ValueHolder<V>, Void> function) throws StoreAccessException {
checkKey(key);
silentInvalidateObserver.begin();
try {
AtomicReference<HigherCachingTierOperationOutcomes.SilentInvalidateOutcome> outcome =
new AtomicReference<>(HigherCachingTierOperationOutcomes.SilentInvalidateOutcome.MISS);
map.compute(key, (mappedKey, mappedValue) -> {
long size = 0L;
OnHeapValueHolder<V> holderToPass = null;
if (mappedValue != null) {
size = mappedValue.size();
if (!(mappedValue instanceof Fault)) {
holderToPass = mappedValue;
outcome.set(HigherCachingTierOperationOutcomes.SilentInvalidateOutcome.REMOVED);
}
}
function.apply(holderToPass);
updateUsageInBytesIfRequired(- size);
return null;
});
silentInvalidateObserver.end(outcome.get());
} catch (RuntimeException re) {
throw handleException(re);
}
}
@Override
public void invalidateAll() throws StoreAccessException {
invalidateAllObserver.begin();
long errorCount = 0;
StoreAccessException firstException = null;
for(K key : map.keySet()) {
try {
invalidate(key);
} catch (StoreAccessException cae) {
errorCount++;
if (firstException == null) {
firstException = cae;
}
}
}
if (firstException != null) {
invalidateAllObserver.end(CachingTierOperationOutcomes.InvalidateAllOutcome.FAILURE);
throw new StoreAccessException("Error(s) during invalidation - count is " + errorCount, firstException);
}
clear();
invalidateAllObserver.end(CachingTierOperationOutcomes.InvalidateAllOutcome.SUCCESS);
}
@Override
public void silentInvalidateAll(BiFunction<K, ValueHolder<V>, Void> biFunction) throws StoreAccessException {
silentInvalidateAllObserver.begin();
StoreAccessException exception = null;
long errorCount = 0;
for (K k : map.keySet()) {
try {
silentInvalidate(k, mappedValue -> {
biFunction.apply(k, mappedValue);
return null;
});
} catch (StoreAccessException e) {
errorCount++;
if (exception == null) {
exception = e;
}
}
}
if (exception != null) {
silentInvalidateAllObserver.end(HigherCachingTierOperationOutcomes.SilentInvalidateAllOutcome.FAILURE);
throw new StoreAccessException("silentInvalidateAll failed - error count: " + errorCount, exception);
}
silentInvalidateAllObserver.end(HigherCachingTierOperationOutcomes.SilentInvalidateAllOutcome.SUCCESS);
}
@Override
public void silentInvalidateAllWithHash(long hash, BiFunction<K, ValueHolder<V>, Void> biFunction) {
silentInvalidateAllWithHashObserver.begin();
int intHash = HashUtils.longHashToInt(hash);
Collection<Entry<K, OnHeapValueHolder<V>>> removed = map.removeAllWithHash(intHash);
for (Entry<K, OnHeapValueHolder<V>> entry : removed) {
biFunction.apply(entry.getKey(), entry.getValue());
}
silentInvalidateAllWithHashObserver.end(HigherCachingTierOperationOutcomes.SilentInvalidateAllWithHashOutcome.SUCCESS);
}
private void notifyInvalidation(K key, ValueHolder<V> p) {
InvalidationListener<K, V> invalidationListener = this.invalidationListener;
if(invalidationListener != null) {
invalidationListener.onInvalidation(key, p);
}
}
@Override
public void setInvalidationListener(InvalidationListener<K, V> providedInvalidationListener) {
this.invalidationListener = (key, valueHolder) -> {
if (!(valueHolder instanceof Fault)) {
providedInvalidationListener.onInvalidation(key, valueHolder);
}
};
}
@Override
public void invalidateAllWithHash(long hash) {
invalidateAllWithHashObserver.begin();
int intHash = HashUtils.longHashToInt(hash);
Collection<Entry<K, OnHeapValueHolder<V>>> removed = map.removeAllWithHash(intHash);
for (Entry<K, OnHeapValueHolder<V>> entry : removed) {
notifyInvalidation(entry.getKey(), entry.getValue());
}
LOG.debug("CLIENT: onheap store removed all with hash {}", intHash);
invalidateAllWithHashObserver.end(CachingTierOperationOutcomes.InvalidateAllWithHashOutcome.SUCCESS);
}
private ValueHolder<V> getValue(ValueHolder<V> cachedValue) {
if (cachedValue instanceof Fault) {
return ((Fault<V>)cachedValue).getValueHolder();
} else {
return cachedValue;
}
}
private long getSizeOfKeyValuePairs(K key, OnHeapValueHolder<V> holder) throws LimitExceededException {
return sizeOfEngine.sizeof(key, holder);
}
private static class Fault<V> extends OnHeapValueHolder<V> {
private static final int FAULT_ID = -1;
@IgnoreSizeOf
private final Supplier<ValueHolder<V>> source;
private ValueHolder<V> value;
private Throwable throwable;
private boolean complete;
public Fault(Supplier<ValueHolder<V>> source) {
super(FAULT_ID, 0, true);
this.source = source;
}
private void complete(ValueHolder<V> value) {
synchronized (this) {
this.value = value;
this.complete = true;
notifyAll();
}
}
private ValueHolder<V> getValueHolder() {
synchronized (this) {
if (!complete) {
try {
complete(source.get());
} catch (Throwable e) {
fail(e);
}
}
}
return throwOrReturn();
}
@Override
public long getId() {
throw new UnsupportedOperationException("You should NOT call that?!");
}
private ValueHolder<V> throwOrReturn() {
if (throwable != null) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
throw new RuntimeException("Faulting from repository failed", throwable);
}
return value;
}
private void fail(Throwable t) {
synchronized (this) {
this.throwable = t;
this.complete = true;
notifyAll();
}
throwOrReturn();
}
@Override
public V get() {
throw new UnsupportedOperationException();
}
@Override
public long creationTime() {
throw new UnsupportedOperationException();
}
@Override
public void setExpirationTime(long expirationTime) {
throw new UnsupportedOperationException();
}
@Override
public long expirationTime() {
throw new UnsupportedOperationException();
}
@Override
public boolean isExpired(long expirationTime) {
throw new UnsupportedOperationException();
}
@Override
public long lastAccessTime() {
return Long.MAX_VALUE;
}
@Override
public void setLastAccessTime(long lastAccessTime) {
throw new UnsupportedOperationException();
}
@Override
public void setSize(long size) {
throw new UnsupportedOperationException("Faults should not be sized");
}
@Override
public long size() {
return 0L;
}
@Override
public String toString() {
return "[Fault : " + (complete ? (throwable == null ? String.valueOf(value) : throwable.getMessage()) : "???") + "]";
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
}
@Override
public ValueHolder<V> getAndCompute(K key, BiFunction<? super K, ? super V, ? extends V> mappingFunction) throws StoreAccessException {
checkKey(key);
computeObserver.begin();
long now = timeSource.getTimeMillis();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
AtomicReference<OnHeapValueHolder<V>> oldValue = new AtomicReference<>();
AtomicReference<StoreOperationOutcomes.ComputeOutcome> outcome =
new AtomicReference<>(StoreOperationOutcomes.ComputeOutcome.MISS);
map.compute(key, (mappedKey, mappedValue) -> {
long delta = 0L;
if (mappedValue != null && mappedValue.isExpired(now)) {
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
delta -= mappedValue.size();
mappedValue = null;
}
OnHeapValueHolder<V> holder;
V existingValue = mappedValue == null ? null : mappedValue.get();
if (mappedValue != null) {
oldValue.set(mappedValue);
}
V computedValue = mappingFunction.apply(mappedKey, existingValue);
if (computedValue == null) {
if (existingValue != null) {
eventSink.removed(mappedKey, mappedValue);
outcome.set(StoreOperationOutcomes.ComputeOutcome.REMOVED);
delta -= mappedValue.size();
}
holder = null;
} else {
checkValue(computedValue);
if (mappedValue != null) {
outcome.set(StoreOperationOutcomes.ComputeOutcome.PUT);
holder = newUpdateValueHolder(key, mappedValue, computedValue, now, eventSink);
delta -= mappedValue.size();
if (holder != null) {
delta += holder.size();
}
} else {
holder = newCreateValueHolder(key, computedValue, now, eventSink);
if (holder != null) {
outcome.set(StoreOperationOutcomes.ComputeOutcome.PUT);
delta += holder.size();
}
}
}
updateUsageInBytesIfRequired(delta);
return holder;
});
storeEventDispatcher.releaseEventSink(eventSink);
enforceCapacity();
computeObserver.end(outcome.get());
return oldValue.get();
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public ValueHolder<V> computeAndGet(K key, BiFunction<? super K, ? super V, ? extends V> mappingFunction, Supplier<Boolean> replaceEqual, Supplier<Boolean> invokeWriter) throws StoreAccessException {
checkKey(key);
computeObserver.begin();
long now = timeSource.getTimeMillis();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
AtomicReference<OnHeapValueHolder<V>> valueHeld = new AtomicReference<>();
AtomicReference<StoreOperationOutcomes.ComputeOutcome> outcome =
new AtomicReference<>(StoreOperationOutcomes.ComputeOutcome.MISS);
OnHeapValueHolder<V> computeResult = map.compute(key, (mappedKey, mappedValue) -> {
long delta = 0L;
if (mappedValue != null && mappedValue.isExpired(now)) {
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
delta -= mappedValue.size();
mappedValue = null;
}
OnHeapValueHolder<V> holder;
V existingValue = mappedValue == null ? null : mappedValue.get();
V computedValue = mappingFunction.apply(mappedKey, existingValue);
if (computedValue == null) {
if (existingValue != null) {
eventSink.removed(mappedKey, mappedValue);
outcome.set(StoreOperationOutcomes.ComputeOutcome.REMOVED);
delta -= mappedValue.size();
}
holder = null;
} else if (Objects.equals(existingValue, computedValue) && !replaceEqual.get() && mappedValue != null) {
holder = strategy.setAccessAndExpiryWhenCallerlUnderLock(key, mappedValue, now, eventSink);
outcome.set(StoreOperationOutcomes.ComputeOutcome.HIT);
if (holder == null) {
valueHeld.set(mappedValue);
delta -= mappedValue.size();
}
} else {
checkValue(computedValue);
if (mappedValue != null) {
outcome.set(StoreOperationOutcomes.ComputeOutcome.PUT);
long expirationTime = mappedValue.expirationTime();
holder = newUpdateValueHolder(key, mappedValue, computedValue, now, eventSink);
delta -= mappedValue.size();
if (holder == null) {
try {
valueHeld.set(makeValue(key, computedValue, now, expirationTime, valueCopier, false));
} catch (LimitExceededException e) {
}
} else {
delta += holder.size();
}
} else {
holder = newCreateValueHolder(key, computedValue, now, eventSink);
if (holder != null) {
outcome.set(StoreOperationOutcomes.ComputeOutcome.PUT);
delta += holder.size();
}
}
}
updateUsageInBytesIfRequired(delta);
return holder;
});
if (computeResult == null && valueHeld.get() != null) {
computeResult = valueHeld.get();
}
storeEventDispatcher.releaseEventSink(eventSink);
enforceCapacity();
computeObserver.end(outcome.get());
return computeResult;
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public ValueHolder<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) throws StoreAccessException {
checkKey(key);
computeIfAbsentObserver.begin();
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
long now = timeSource.getTimeMillis();
AtomicReference<OnHeapValueHolder<V>> previousValue = new AtomicReference<>();
AtomicReference<StoreOperationOutcomes.ComputeIfAbsentOutcome> outcome =
new AtomicReference<>(StoreOperationOutcomes.ComputeIfAbsentOutcome.NOOP);
OnHeapValueHolder<V> computeResult = map.compute(key, (mappedKey, mappedValue) -> {
long delta = 0;
OnHeapValueHolder<V> holder;
if (mappedValue == null || mappedValue.isExpired(now)) {
if (mappedValue != null) {
delta -= mappedValue.size();
fireOnExpirationEvent(mappedKey, mappedValue, eventSink);
}
V computedValue = mappingFunction.apply(mappedKey);
if (computedValue == null) {
holder = null;
} else {
checkValue(computedValue);
holder = newCreateValueHolder(key, computedValue, now, eventSink);
if (holder != null) {
outcome.set(StoreOperationOutcomes.ComputeIfAbsentOutcome.PUT);
delta += holder.size();
}
}
} else {
previousValue.set(mappedValue);
outcome.set(StoreOperationOutcomes.ComputeIfAbsentOutcome.HIT);
holder = strategy.setAccessAndExpiryWhenCallerlUnderLock(key, mappedValue, now, eventSink);
if (holder == null) {
delta -= mappedValue.size();
}
}
updateUsageInBytesIfRequired(delta);
return holder;
});
OnHeapValueHolder<V> previousValueHolder = previousValue.get();
storeEventDispatcher.releaseEventSink(eventSink);
if (computeResult != null) {
enforceCapacity();
}
computeIfAbsentObserver.end(outcome.get());
if (computeResult == null && previousValueHolder != null) {
return previousValueHolder;
}
return computeResult;
} catch (RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw handleException(re);
}
}
@Override
public Map<K, ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
Map<K, ValueHolder<V>> result = new HashMap<>(keys.size());
for (K key : keys) {
ValueHolder<V> newValue = computeIfAbsent(key, k -> {
Iterable<K> keySet = Collections.singleton(k);
Iterable<? extends Entry<? extends K, ? extends V>> entries = mappingFunction.apply(keySet);
java.util.Iterator<? extends Entry<? extends K, ? extends V>> iterator = entries.iterator();
Entry<? extends K, ? extends V> next = iterator.next();
K computedKey = next.getKey();
checkKey(computedKey);
V computedValue = next.getValue();
if (computedValue == null) {
return null;
}
checkValue(computedValue);
return computedValue;
});
result.put(key, newValue);
}
return result;
}
@Override
public List<CacheConfigurationChangeListener> getConfigurationChangeListeners() {
List<CacheConfigurationChangeListener> configurationChangeListenerList
= new ArrayList<>();
configurationChangeListenerList.add(this.cacheConfigurationChangeListener);
return configurationChangeListenerList;
}
@Override
public Map<K, ValueHolder<V>> bulkCompute(Set<? extends K> keys, Function<Iterable<? extends Entry<? extends K, ? extends V>>, Iterable<? extends Entry<? extends K, ? extends V>>> remappingFunction) throws StoreAccessException {
return bulkCompute(keys, remappingFunction, REPLACE_EQUALS_TRUE);
}
@Override
public Map<K, ValueHolder<V>> bulkCompute(Set<? extends K> keys, Function<Iterable<? extends Map.Entry<? extends K, ? extends V>>, Iterable<? extends Map.Entry<? extends K,? extends V>>> remappingFunction, Supplier<Boolean> replaceEqual) throws StoreAccessException {
Map<K, ValueHolder<V>> result = new HashMap<>();
for (K key : keys) {
checkKey(key);
ValueHolder<V> newValue = computeAndGet(key, (k, oldValue) -> {
Set<Entry<K, V>> entrySet = Collections.singletonMap(k, oldValue).entrySet();
Iterable<? extends Entry<? extends K, ? extends V>> entries = remappingFunction.apply(entrySet);
java.util.Iterator<? extends Entry<? extends K, ? extends V>> iterator = entries.iterator();
Entry<? extends K, ? extends V> next = iterator.next();
K key1 = next.getKey();
V value = next.getValue();
checkKey(key1);
if (value != null) {
checkValue(value);
}
return value;
}, replaceEqual, () -> false);
result.put(key, newValue);
}
return result;
}
@Override
public StoreEventSource<K, V> getStoreEventSource() {
return storeEventDispatcher;
}
void expireMappingUnderLock(K key, ValueHolder<V> value) {
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
map.computeIfPresent(key, (mappedKey, mappedValue) -> {
if(mappedValue.equals(value)) {
fireOnExpirationEvent(key, value, eventSink);
updateUsageInBytesIfRequired(- mappedValue.size());
return null;
}
return mappedValue;
});
storeEventDispatcher.releaseEventSink(eventSink);
} catch(RuntimeException re) {
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw re;
}
}
private OnHeapValueHolder<V> newUpdateValueHolder(K key, OnHeapValueHolder<V> oldValue, V newValue, long now, StoreEventSink<K, V> eventSink) {
Objects.requireNonNull(oldValue);
Objects.requireNonNull(newValue);
Duration duration = strategy.getUpdateDuration(key, oldValue, newValue);
if (Duration.ZERO.equals(duration)) {
eventSink.updated(key, oldValue, newValue);
eventSink.expired(key, () -> newValue);
return null;
}
long expirationTime;
if (duration == null) {
expirationTime = oldValue.expirationTime();
} else {
if (isExpiryDurationInfinite(duration)) {
expirationTime = ValueHolder.NO_EXPIRE;
} else {
expirationTime = ExpiryUtils.getExpirationMillis(now, duration);
}
}
OnHeapValueHolder<V> holder = null;
try {
holder = makeValue(key, newValue, now, expirationTime, this.valueCopier);
eventSink.updated(key, oldValue, newValue);
} catch (LimitExceededException e) {
LOG.warn(e.getMessage());
eventSink.removed(key, oldValue);
}
return holder;
}
private OnHeapValueHolder<V> newCreateValueHolder(K key, V value, long now, StoreEventSink<K, V> eventSink) {
Objects.requireNonNull(value);
Duration duration = ExpiryUtils.getExpiryForCreation(key, value, expiry);
if(duration.isZero()) {
return null;
}
long expirationTime = isExpiryDurationInfinite(duration) ? ValueHolder.NO_EXPIRE : ExpiryUtils.getExpirationMillis(now, duration);
OnHeapValueHolder<V> holder = null;
try {
holder = makeValue(key, value, now, expirationTime, this.valueCopier);
eventSink.created(key, value);
} catch (LimitExceededException e) {
LOG.warn(e.getMessage());
}
return holder;
}
private OnHeapValueHolder<V> importValueFromLowerTier(K key, ValueHolder<V> valueHolder, long now, Backend<K, V> backEnd, Fault<V> fault) {
Duration expiration = strategy.getAccessDuration(key, valueHolder);
if (Duration.ZERO.equals(expiration)) {
invalidateInGetOrComputeIfAbsent(backEnd, key, valueHolder, fault, now, Duration.ZERO);
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return null;
}
try{
return cloneValueHolder(key, valueHolder, now, expiration, true);
} catch (LimitExceededException e) {
LOG.warn(e.getMessage());
invalidateInGetOrComputeIfAbsent(backEnd, key, valueHolder, fault, now, expiration);
getOrComputeIfAbsentObserver.end(CachingTierOperationOutcomes.GetOrComputeIfAbsentOutcome.FAULT_FAILED);
return null;
}
}
private OnHeapValueHolder<V> cloneValueHolder(K key, ValueHolder<V> valueHolder, long now, Duration expiration, boolean sizingEnabled) throws LimitExceededException {
V realValue = valueHolder.get();
boolean evictionAdvice = checkEvictionAdvice(key, realValue);
OnHeapValueHolder<V> clonedValueHolder;
if(valueCopier instanceof SerializingCopier) {
if (valueHolder instanceof BinaryValueHolder && ((BinaryValueHolder) valueHolder).isBinaryValueAvailable()) {
clonedValueHolder = new SerializedOnHeapValueHolder<>(valueHolder, ((BinaryValueHolder) valueHolder).getBinaryValue(),
evictionAdvice, ((SerializingCopier<V>) valueCopier).getSerializer(), now, expiration);
} else {
clonedValueHolder = new SerializedOnHeapValueHolder<>(valueHolder, realValue, evictionAdvice,
((SerializingCopier<V>) valueCopier).getSerializer(), now, expiration);
}
} else {
clonedValueHolder = new CopiedOnHeapValueHolder<>(valueHolder, realValue, evictionAdvice, valueCopier, now, expiration);
}
if (sizingEnabled) {
clonedValueHolder.setSize(getSizeOfKeyValuePairs(key, clonedValueHolder));
}
return clonedValueHolder;
}
private OnHeapValueHolder<V> makeValue(K key, V value, long creationTime, long expirationTime, Copier<V> valueCopier) throws LimitExceededException {
return makeValue(key, value, creationTime, expirationTime, valueCopier, true);
}
private OnHeapValueHolder<V> makeValue(K key, V value, long creationTime, long expirationTime, Copier<V> valueCopier, boolean size) throws LimitExceededException {
boolean evictionAdvice = checkEvictionAdvice(key, value);
OnHeapValueHolder<V> valueHolder;
if (valueCopier instanceof SerializingCopier) {
valueHolder = new SerializedOnHeapValueHolder<>(value, creationTime, expirationTime, evictionAdvice, ((SerializingCopier<V>) valueCopier)
.getSerializer());
} else {
valueHolder = new CopiedOnHeapValueHolder<>(value, creationTime, expirationTime, evictionAdvice, valueCopier);
}
if (size) {
valueHolder.setSize(getSizeOfKeyValuePairs(key, valueHolder));
}
return valueHolder;
}
private boolean checkEvictionAdvice(K key, V value) {
try {
return evictionAdvisor.adviseAgainstEviction(key, value);
} catch (Exception e) {
LOG.error("Exception raised while running eviction advisor " +
"- Eviction will assume entry is NOT advised against eviction", e);
return false;
}
}
private void updateUsageInBytesIfRequired(long delta) {
map.updateUsageInBytesIfRequired(delta);
}
protected long byteSized() {
return map.byteSize();
}
@SuppressFBWarnings("QF_QUESTIONABLE_FOR_LOOP")
protected void enforceCapacity() {
StoreEventSink<K, V> eventSink = storeEventDispatcher.eventSink();
try {
for (int attempts = 0, evicted = 0; attempts < ATTEMPT_RATIO && evicted < EVICTION_RATIO
&& capacity < map.naturalSize(); attempts++) {
if (evict(eventSink)) {
evicted++;
}
}
storeEventDispatcher.releaseEventSink(eventSink);
} catch (RuntimeException re){
storeEventDispatcher.releaseEventSinkAfterFailure(eventSink, re);
throw re;
}
}
boolean evict(StoreEventSink<K, V> eventSink) {
evictionObserver.begin();
Random random = new Random();
@SuppressWarnings("unchecked")
Map.Entry<K, OnHeapValueHolder<V>> candidate = map.getEvictionCandidate(random, SAMPLE_SIZE, EVICTION_PRIORITIZER, EVICTION_ADVISOR);
if (candidate == null) {
candidate = map.getEvictionCandidate(random, SAMPLE_SIZE, EVICTION_PRIORITIZER, noAdvice());
}
if (candidate == null) {
return false;
} else {
Map.Entry<K, OnHeapValueHolder<V>> evictionCandidate = candidate;
AtomicBoolean removed = new AtomicBoolean(false);
map.computeIfPresent(evictionCandidate.getKey(), (mappedKey, mappedValue) -> {
if (mappedValue.equals(evictionCandidate.getValue())) {
removed.set(true);
if (!(evictionCandidate.getValue() instanceof Fault)) {
eventSink.evicted(evictionCandidate.getKey(), evictionCandidate.getValue());
invalidationListener.onInvalidation(mappedKey, evictionCandidate.getValue());
}
updateUsageInBytesIfRequired(-mappedValue.size());
return null;
}
return mappedValue;
});
if (removed.get()) {
evictionObserver.end(StoreOperationOutcomes.EvictionOutcome.SUCCESS);
return true;
} else {
evictionObserver.end(StoreOperationOutcomes.EvictionOutcome.FAILURE);
return false;
}
}
}
void fireOnExpirationEvent(K mappedKey, ValueHolder<V> mappedValue, StoreEventSink<K, V> eventSink) {
expirationObserver.begin();
expirationObserver.end(StoreOperationOutcomes.ExpirationOutcome.SUCCESS);
eventSink.expired(mappedKey, mappedValue);
invalidationListener.onInvalidation(mappedKey, mappedValue);
}
@ServiceDependencies({TimeSourceService.class, CopyProvider.class, SizeOfEngineProvider.class})
public static class Provider extends BaseStoreProvider implements CachingTier.Provider, HigherCachingTier.Provider {
private final Map<Store<?, ?>, List<Copier<?>>> createdStores = new ConcurrentWeakIdentityHashMap<>();
private final Map<OnHeapStore<?, ?>, OperationStatistic<?>[]> tierOperationStatistics = new ConcurrentWeakIdentityHashMap<>();
@Override
protected ResourceType<SizedResourcePool> getResourceType() {
return ResourceType.Core.HEAP;
}
@Override
public int rank(Set<ResourceType<?>> resourceTypes, Collection<ServiceConfiguration<?, ?>> serviceConfigs) {
return resourceTypes.equals(Collections.singleton(getResourceType())) ? 1 : 0;
}
@Override
public int rankCachingTier(Set<ResourceType<?>> resourceTypes, Collection<ServiceConfiguration<?, ?>> serviceConfigs) {
return rank(resourceTypes, serviceConfigs);
}
@Override
public <K, V> OnHeapStore<K, V> createStore(Configuration<K, V> storeConfig, ServiceConfiguration<?, ?>... serviceConfigs) {
OnHeapStore<K, V> store = createStoreInternal(storeConfig, new DefaultStoreEventDispatcher<>(storeConfig.getDispatcherConcurrency()), serviceConfigs);
tierOperationStatistics.put(store, new OperationStatistic<?>[] {
createTranslatedStatistic(store, "get", TierOperationOutcomes.GET_TRANSLATION, "get"),
createTranslatedStatistic(store, "eviction", TierOperationOutcomes.EVICTION_TRANSLATION, "eviction") });
return store;
}
public <K, V> OnHeapStore<K, V> createStoreInternal(Configuration<K, V> storeConfig, StoreEventDispatcher<K, V> eventDispatcher,
ServiceConfiguration<?, ?>... serviceConfigs) {
TimeSource timeSource = getServiceProvider().getService(TimeSourceService.class).getTimeSource();
CopyProvider copyProvider = getServiceProvider().getService(CopyProvider.class);
Copier<K> keyCopier = copyProvider.createKeyCopier(storeConfig.getKeyType(), storeConfig.getKeySerializer(), serviceConfigs);
Copier<V> valueCopier = copyProvider.createValueCopier(storeConfig.getValueType(), storeConfig.getValueSerializer(), serviceConfigs);
List<Copier<?>> copiers = Arrays.asList(keyCopier, valueCopier);
SizeOfEngineProvider sizeOfEngineProvider = getServiceProvider().getService(SizeOfEngineProvider.class);
SizeOfEngine sizeOfEngine = sizeOfEngineProvider.createSizeOfEngine(
storeConfig.getResourcePools().getPoolForResource(ResourceType.Core.HEAP).getUnit(), serviceConfigs);
OnHeapStore<K, V> onHeapStore = new OnHeapStore<>(storeConfig, timeSource, keyCopier, valueCopier, sizeOfEngine, eventDispatcher, ConcurrentHashMap::new, getServiceProvider().getService(StatisticsService.class));
createdStores.put(onHeapStore, copiers);
return onHeapStore;
}
@Override
public void releaseStore(Store<?, ?> resource) {
List<Copier<?>> copiers = createdStores.remove(resource);
if (copiers == null) {
throw new IllegalArgumentException("Given store is not managed by this provider : " + resource);
}
OnHeapStore<?, ?> onHeapStore = (OnHeapStore)resource;
close(onHeapStore);
getServiceProvider().getService(StatisticsService.class).cleanForNode(onHeapStore);
tierOperationStatistics.remove(onHeapStore);
CopyProvider copyProvider = getServiceProvider().getService(CopyProvider.class);
for (Copier<?> copier: copiers) {
try {
copyProvider.releaseCopier(copier);
} catch (Exception e) {
throw new IllegalStateException("Exception while releasing Copier instance.", e);
}
}
}
static void close(OnHeapStore<?, ?> onHeapStore) {
onHeapStore.clear();
}
@Override
public void initStore(Store<?, ?> resource) {
checkResource(resource);
List<Copier<?>> copiers = createdStores.get(resource);
for (Copier<?> copier : copiers) {
if(copier instanceof SerializingCopier) {
Serializer<?> serializer = ((SerializingCopier)copier).getSerializer();
if(serializer instanceof StatefulSerializer) {
((StatefulSerializer)serializer).init(new TransientStateRepository());
}
}
}
}
private void checkResource(Object resource) {
if (!createdStores.containsKey(resource)) {
throw new IllegalArgumentException("Given store is not managed by this provider : " + resource);
}
}
@Override
public void stop() {
try {
createdStores.clear();
} finally {
super.stop();
}
}
@Override
public <K, V> CachingTier<K, V> createCachingTier(Configuration<K, V> storeConfig, ServiceConfiguration<?, ?>... serviceConfigs) {
OnHeapStore<K, V> cachingTier = createStoreInternal(storeConfig, NullStoreEventDispatcher.nullStoreEventDispatcher(), serviceConfigs);
this.tierOperationStatistics.put(cachingTier, new OperationStatistic<?>[] {
createTranslatedStatistic(cachingTier, "get", TierOperationOutcomes.GET_OR_COMPUTEIFABSENT_TRANSLATION, "getOrComputeIfAbsent"),
createTranslatedStatistic(cachingTier, "eviction", TierOperationOutcomes.EVICTION_TRANSLATION, "eviction")
});
return cachingTier;
}
@Override
public void releaseCachingTier(CachingTier<?, ?> resource) {
checkResource(resource);
try {
resource.invalidateAll();
} catch (StoreAccessException e) {
LOG.warn("Invalidation failure while releasing caching tier", e);
}
releaseStore((Store<?, ?>) resource);
}
@Override
public void initCachingTier(CachingTier<?, ?> resource) {
checkResource(resource);
}
@Override
public <K, V> HigherCachingTier<K, V> createHigherCachingTier(Configuration<K, V> storeConfig, ServiceConfiguration<?, ?>... serviceConfigs) {
OnHeapStore<K, V> higherCachingTier = createStoreInternal(storeConfig, new DefaultStoreEventDispatcher<>(storeConfig
.getDispatcherConcurrency()), serviceConfigs);
this.tierOperationStatistics.put(higherCachingTier, new OperationStatistic<?>[] {
createTranslatedStatistic(higherCachingTier, "get", TierOperationOutcomes.GET_OR_COMPUTEIFABSENT_TRANSLATION, "getOrComputeIfAbsent"),
createTranslatedStatistic(higherCachingTier, "eviction", TierOperationOutcomes.EVICTION_TRANSLATION, "eviction")
});
return higherCachingTier;
}
@Override
public void releaseHigherCachingTier(HigherCachingTier<?, ?> resource) {
releaseCachingTier(resource);
}
@Override
public void initHigherCachingTier(HigherCachingTier<?, ?> resource) {
checkResource(resource);
}
}
}