package org.terracotta.offheapstore.concurrent;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.terracotta.offheapstore.HashingMap;
import org.terracotta.offheapstore.OffHeapHashMap;
import org.terracotta.offheapstore.Segment;
import org.terracotta.offheapstore.MapInternals;
import org.terracotta.offheapstore.MetadataTuple;
import org.terracotta.offheapstore.exceptions.OversizeMappingException;
import org.terracotta.offheapstore.util.Factory;
public abstract class AbstractConcurrentOffHeapMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, ConcurrentMapInternals, HashingMap<K, V> {
private static final int MAX_SEGMENTS = 1 << 16;
private static final int DEFAULT_CONCURRENCY = 16;
protected final Segment<K, V>[] segments;
private final int segmentShift;
private final int segmentMask;
private Set<K> keySet;
private Set<Entry<K, V>> entrySet;
private Collection<V> values;
public AbstractConcurrentOffHeapMap(Factory<? extends Segment<K, V>> segmentFactory) {
this(segmentFactory, DEFAULT_CONCURRENCY);
}
@SuppressWarnings("unchecked")
public AbstractConcurrentOffHeapMap(Factory<? extends Segment<K, V>> segmentFactory, int concurrency) {
if (concurrency <= 0) {
throw new IllegalArgumentException("Concurrency must be positive [was: " + concurrency + "]");
}
if (concurrency > MAX_SEGMENTS) {
concurrency = MAX_SEGMENTS;
}
int sshift = 0;
int ssize = 1;
while (ssize < concurrency) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = new Segment[ssize];
try {
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] = segmentFactory.newInstance();
}
} catch (RuntimeException e) {
for (Segment<?, ?> s : this.segments) {
if (s != null) {
s.destroy();
}
}
throw e;
}
}
protected Segment<K, V> segmentFor(Object key) {
return segmentFor(key.hashCode());
}
protected Segment<K, V> segmentFor(int hash) {
return segments[getIndexFor(hash)];
}
public int getIndexFor(int hash) {
return (spread(hash) >>> segmentShift) & segmentMask;
}
public List<Segment<K, V>> getSegments() {
return Collections.unmodifiableList(Arrays.asList(segments));
}
protected int getConcurrency() {
return segments.length;
}
private static int spread(int hash) {
int h = hash;
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
@Override
public int size() {
long sum = 0;
readLockAll();
try {
for (Map<?, ?> m : segments) {
sum += m.size();
}
} finally {
readUnlockAll();
}
if (sum > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int) sum;
}
}
@Override
public boolean containsKey(Object key) {
return segmentFor(key).containsKey(key);
}
@Override
public boolean containsValue(Object value) {
readLockAll();
try {
for (Map<?, ?> m : segments) {
if (m.containsValue(value)) {
return true;
}
}
} finally {
readUnlockAll();
}
return false;
}
@Override
public V get(Object key) {
return segmentFor(key).get(key);
}
@Override
public V put(K key, V value) {
try {
return segmentFor(key).put(key, value);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).put(key, value);
} catch (OversizeMappingException ex) {
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).put(key, value);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
public V put(K key, V value, int metadata) {
try {
return segmentFor(key).put(key, value, metadata);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).put(key, value, metadata);
} catch (OversizeMappingException ex) {
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).put(key, value, metadata);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
public V fill(K key, V value) {
return segmentFor(key).fill(key, value);
}
public V fill(K key, V value, int metadata) {
return segmentFor(key).fill(key, value, metadata);
}
@Override
public V remove(Object key) {
return segmentFor(key).remove(key);
}
public boolean removeNoReturn(Object key) {
return segmentFor(key).removeNoReturn(key);
}
public Integer getMetadata(K key, int mask) throws IllegalArgumentException {
return segmentFor(key).getMetadata(key, mask);
}
public Integer getAndSetMetadata(K key, int mask, int values) throws IllegalArgumentException {
return segmentFor(key).getAndSetMetadata(key, mask, values);
}
public V getValueAndSetMetadata(K key, int mask, int values) {
return segmentFor(key).getValueAndSetMetadata(key, mask, values);
}
@Override
public void clear() {
writeLockAll();
try {
for (Map<?, ?> m : segments) {
m.clear();
}
} finally {
writeUnlockAll();
}
}
public void destroy() {
writeLockAll();
try {
for (Segment<?, ?> m : segments) {
m.destroy();
}
} finally {
writeUnlockAll();
}
}
@Override
public V putIfAbsent(K key, V value) {
try {
return segmentFor(key).putIfAbsent(key, value);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).putIfAbsent(key, value);
} catch (OversizeMappingException ex) {
e = ex;
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).putIfAbsent(key, value);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
@Override
public boolean remove(Object key, Object value) {
return segmentFor(key).remove(key, value);
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
try {
return segmentFor(key).replace(key, oldValue, newValue);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).replace(key, oldValue, newValue);
} catch (OversizeMappingException ex) {
e = ex;
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).replace(key, oldValue, newValue);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
@Override
public V replace(K key, V value) {
try {
return segmentFor(key).replace(key, value);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).replace(key, value);
} catch (OversizeMappingException ex) {
e = ex;
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).replace(key, value);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
@Override
public Set<K> keySet() {
Set<K> ks = keySet;
return ks != null ? ks : (keySet = new AggregateKeySet());
}
@Override
public Collection<V> values() {
Collection<V> vc = values;
return vc != null ? vc : (values = new AggregatedValuesCollection());
}
@Override
public Set<Entry<K, V>> entrySet() {
Set<Entry<K, V>> es = entrySet;
return es != null ? es : (entrySet = new AggregateEntrySet());
}
class AggregateKeySet extends BaseAggregateSet<K> {
@Override
public boolean contains(final Object o) {
return AbstractConcurrentOffHeapMap.this.containsKey(o);
}
@Override
public boolean remove(final Object o) {
return AbstractConcurrentOffHeapMap.this.remove(o) != null;
}
@Override
public Iterator<K> iterator() {
return new AggregateIterator<K>() {
@Override
protected Iterator<K> getNextIterator() {
return listIterator.next().keySet().iterator();
}
};
}
}
class AggregateEntrySet extends BaseAggregateSet<Map.Entry<K, V>> {
@Override
public Iterator<Map.Entry<K, V>> iterator() {
return new AggregateIterator<Map.Entry<K, V>>() {
@Override
protected Iterator<java.util.Map.Entry<K, V>> getNextIterator() {
return listIterator.next().entrySet().iterator();
}
};
}
@Override
public boolean contains(final Object o) {
if (!(o instanceof Entry<?, ?>)) { return false; }
final Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
final V value = AbstractConcurrentOffHeapMap.this.get(e.getKey());
return value != null && value.equals(e.getValue());
}
@Override
public boolean remove(final Object o) {
if (!(o instanceof Entry<?, ?>)) { return false; }
final Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
final V value = AbstractConcurrentOffHeapMap.this.get(e.getKey());
return value != null && value.equals(e.getValue()) && AbstractConcurrentOffHeapMap.this.remove(e.getKey()) != null;
}
}
class AggregatedValuesCollection extends AbstractCollection<V> {
@Override
public Iterator<V> iterator() {
return new AggregateIterator<V>() {
@Override
protected Iterator<V> getNextIterator() {
return listIterator.next().values().iterator();
}
};
}
@Override
public int size() {
return AbstractConcurrentOffHeapMap.this.size();
}
}
private abstract class BaseAggregateSet<T> extends AbstractSet<T> {
@Override
public int size() {
return AbstractConcurrentOffHeapMap.this.size();
}
@Override
public void clear() {
AbstractConcurrentOffHeapMap.this.clear();
}
}
protected abstract class AggregateIterator<T> implements Iterator<T> {
protected final Iterator<Map<K, V>> listIterator;
protected Iterator<T> currentIterator;
protected abstract Iterator<T> getNextIterator();
public AggregateIterator() {
listIterator = Arrays.<Map<K, V>>asList(segments).iterator();
while (listIterator.hasNext()) {
currentIterator = getNextIterator();
if (currentIterator.hasNext()) {
return;
}
}
}
@Override
public boolean hasNext() {
if (currentIterator == null) {
return false;
}
if (currentIterator.hasNext()) {
return true;
} else {
while (listIterator.hasNext()) {
currentIterator = getNextIterator();
if (currentIterator.hasNext()) {
return true;
}
}
return false;
}
}
@Override
public T next() {
if (currentIterator == null) {
throw new NoSuchElementException();
}
if (currentIterator.hasNext()) {
return currentIterator.next();
} else {
while (listIterator.hasNext()) {
currentIterator = getNextIterator();
if (currentIterator.hasNext()) {
return currentIterator.next();
}
}
}
throw new NoSuchElementException();
}
@Override
public void remove() {
currentIterator.remove();
}
}
protected void readLockAll() {
for (Segment<?, ?> m : segments) {
m.readLock().lock();
}
}
protected void readUnlockAll() {
for (Segment<?, ?> m : segments) {
m.readLock().unlock();
}
}
public final void writeLockAll() {
for (Segment<?, ?> m : segments) {
m.writeLock().lock();
}
}
public final void writeUnlockAll() {
for (Segment<?, ?> m : segments) {
m.writeLock().unlock();
}
}
@Override
public List<MapInternals> getSegmentInternals() {
return Collections.unmodifiableList(Arrays.<MapInternals>asList(segments));
}
@Override
public long getSize() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getSize();
}
return sum;
}
@Override
public long getTableCapacity() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getTableCapacity();
}
return sum;
}
@Override
public long getUsedSlotCount() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getUsedSlotCount();
}
return sum;
}
@Override
public long getRemovedSlotCount() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getRemovedSlotCount();
}
return sum;
}
@Override
public int getReprobeLength() {
throw new UnsupportedOperationException("Segmented maps do not have a reprobe length");
}
@Override
public long getAllocatedMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getAllocatedMemory();
}
return sum;
}
@Override
public long getOccupiedMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getOccupiedMemory();
}
return sum;
}
@Override
public long getVitalMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getVitalMemory();
}
return sum;
}
@Override
public long getDataAllocatedMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getDataAllocatedMemory();
}
return sum;
}
@Override
public long getDataOccupiedMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getDataOccupiedMemory();
}
return sum;
}
@Override
public long getDataVitalMemory() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getDataVitalMemory();
}
return sum;
}
@Override
public long getDataSize() {
long sum = 0;
for (MapInternals m : segments) {
sum += m.getDataSize();
}
return sum;
}
public final boolean handleOversizeMappingException(int hash) {
boolean evicted = false;
Segment<?, ?> target = segmentFor(hash);
for (Segment<?, ?> s : segments) {
if (s != target) {
evicted |= s.shrink();
}
}
return evicted;
}
public MetadataTuple<V> computeWithMetadata(K key, BiFunction<? super K, ? super MetadataTuple<V>, ? extends MetadataTuple<V>> remappingFunction) {
try {
return segmentFor(key).computeWithMetadata(key, remappingFunction);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).computeWithMetadata(key, remappingFunction);
} catch (OversizeMappingException ex) {
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).computeWithMetadata(key, remappingFunction);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
public MetadataTuple<V> computeIfAbsentWithMetadata(K key, Function<? super K,? extends MetadataTuple<V>> mappingFunction) {
try {
return segmentFor(key).computeIfAbsentWithMetadata(key, mappingFunction);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).computeIfAbsentWithMetadata(key, mappingFunction);
} catch (OversizeMappingException ex) {
e = ex;
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).computeIfAbsentWithMetadata(key, mappingFunction);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
public MetadataTuple<V> computeIfPresentWithMetadata(K key, BiFunction<? super K,? super MetadataTuple<V>,? extends MetadataTuple<V>> remappingFunction) {
try {
return segmentFor(key).computeIfPresentWithMetadata(key, remappingFunction);
} catch (OversizeMappingException e) {
if (handleOversizeMappingException(key.hashCode())) {
try {
return segmentFor(key).computeIfPresentWithMetadata(key, remappingFunction);
} catch (OversizeMappingException ex) {
e = ex;
}
}
writeLockAll();
try {
do {
try {
return segmentFor(key).computeIfPresentWithMetadata(key, remappingFunction);
} catch (OversizeMappingException ex) {
e = ex;
}
} while (handleOversizeMappingException(key.hashCode()));
throw e;
} finally {
writeUnlockAll();
}
}
}
@Override
public Map<K, V> removeAllWithHash(int keyHash) {
return segmentFor(keyHash).removeAllWithHash(keyHash);
}
}