package org.apache.cassandra.cache;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import com.googlecode.concurrentlinkedhashmap.Weigher;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.utils.FBUtilities;
public class SerializingCache<K, V> implements ICache<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
private static final int DEFAULT_CONCURENCY_LEVEL = 64;
private final ConcurrentLinkedHashMap<K, RefCountedMemory> map;
private final ISerializer<V> serializer;
private SerializingCache(long capacity, Weigher<RefCountedMemory> weigher, ISerializer<V> serializer)
{
this.serializer = serializer;
EvictionListener<K,RefCountedMemory> listener = new EvictionListener<K, RefCountedMemory>()
{
public void onEviction(K k, RefCountedMemory mem)
{
mem.unreference();
}
};
this.map = new ConcurrentLinkedHashMap.Builder<K, RefCountedMemory>()
.weigher(weigher)
.maximumWeightedCapacity(capacity)
.concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
.listener(listener)
.build();
}
public static <K, V> SerializingCache<K, V> create(long weightedCapacity, Weigher<RefCountedMemory> weigher, ISerializer<V> serializer)
{
return new SerializingCache<>(weightedCapacity, weigher, serializer);
}
public static <K, V> SerializingCache<K, V> create(long weightedCapacity, ISerializer<V> serializer)
{
return create(weightedCapacity, new Weigher<RefCountedMemory>()
{
public int weightOf(RefCountedMemory value)
{
long size = value.size();
assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB";
return (int) size;
}
}, serializer);
}
private V deserialize(RefCountedMemory mem)
{
try
{
return serializer.deserialize(new MemoryInputStream(mem));
}
catch (IOException e)
{
logger.trace("Cannot fetch in memory data, we will fallback to read from disk ", e);
return null;
}
}
private RefCountedMemory serialize(V value)
{
long serializedSize = serializer.serializedSize(value);
if (serializedSize > Integer.MAX_VALUE)
throw new IllegalArgumentException(String.format("Unable to allocate %s", FBUtilities.prettyPrintMemory(serializedSize)));
RefCountedMemory freeableMemory;
try
{
freeableMemory = new RefCountedMemory(serializedSize);
}
catch (OutOfMemoryError e)
{
return null;
}
try
{
serializer.serialize(value, new WrappedDataOutputStreamPlus(new MemoryOutputStream(freeableMemory)));
}
catch (IOException e)
{
freeableMemory.unreference();
throw new RuntimeException(e);
}
return freeableMemory;
}
public long capacity()
{
return map.capacity();
}
public void setCapacity(long capacity)
{
map.setCapacity(capacity);
}
public boolean isEmpty()
{
return map.isEmpty();
}
public int size()
{
return map.size();
}
public long weightedSize()
{
return map.weightedSize();
}
public void clear()
{
map.clear();
}
@SuppressWarnings("resource")
public V get(K key)
{
RefCountedMemory mem = map.get(key);
if (mem == null)
return null;
if (!mem.reference())
return null;
try
{
return deserialize(mem);
}
finally
{
mem.unreference();
}
}
@SuppressWarnings("resource")
public void put(K key, V value)
{
RefCountedMemory mem = serialize(value);
if (mem == null)
return;
RefCountedMemory old;
try
{
old = map.put(key, mem);
}
catch (Throwable t)
{
mem.unreference();
throw t;
}
if (old != null)
old.unreference();
}
@SuppressWarnings("resource")
public boolean putIfAbsent(K key, V value)
{
RefCountedMemory mem = serialize(value);
if (mem == null)
return false;
RefCountedMemory old;
try
{
old = map.putIfAbsent(key, mem);
}
catch (Throwable t)
{
mem.unreference();
throw t;
}
if (old != null)
mem.unreference();
return old == null;
}
@SuppressWarnings("resource")
public boolean replace(K key, V oldToReplace, V value)
{
RefCountedMemory old = map.get(key);
if (old == null)
return false;
V oldValue;
if (!old.reference())
return false;
oldValue = deserialize(old);
old.unreference();
if (!oldValue.equals(oldToReplace))
return false;
RefCountedMemory mem = serialize(value);
if (mem == null)
return false;
boolean success;
try
{
success = map.replace(key, old, mem);
}
catch (Throwable t)
{
mem.unreference();
throw t;
}
if (success)
old.unreference();
else
mem.unreference();
return success;
}
public void remove(K key)
{
@SuppressWarnings("resource")
RefCountedMemory mem = map.remove(key);
if (mem != null)
mem.unreference();
}
public Iterator<K> keyIterator()
{
return map.keySet().iterator();
}
public Iterator<K> hotKeyIterator(int n)
{
return map.descendingKeySetWithLimit(n).iterator();
}
public boolean containsKey(K key)
{
return map.containsKey(key);
}
}