package org.jruby.javasupport.util;
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.concurrent.locks.ReentrantLock;
Maps Java objects to their proxies. Combines elements of WeakHashMap and
ConcurrentHashMap to permit unsynchronized reads. May be configured to
use either Weak (the default) or Soft references.
Note that both Java objects and their proxies are held by weak/soft
references; because proxies (currently) keep strong references to their
Java objects, if we kept strong references to them the Java objects would
never be gc'ed. This presents a problem in the case where a user passes
a Rubified Java object out to Java but keeps no reference in Ruby to the
proxy; if the object is returned to Ruby after its proxy has been gc'ed,
a new (and possibly very wrong, in the case of JRuby-defined subclasses)
proxy will be created. Use of soft references may help reduce the
likelihood of this occurring; users may be advised to keep Ruby-side
references to prevent it occurring altogether.
Author: Bill Dortch
/**
* Maps Java objects to their proxies. Combines elements of WeakHashMap and
* ConcurrentHashMap to permit unsynchronized reads. May be configured to
* use either Weak (the default) or Soft references.<p>
*
* Note that both Java objects and their proxies are held by weak/soft
* references; because proxies (currently) keep strong references to their
* Java objects, if we kept strong references to them the Java objects would
* never be gc'ed. This presents a problem in the case where a user passes
* a Rubified Java object out to Java but keeps no reference in Ruby to the
* proxy; if the object is returned to Ruby after its proxy has been gc'ed,
* a new (and possibly very wrong, in the case of JRuby-defined subclasses)
* proxy will be created. Use of soft references may help reduce the
* likelihood of this occurring; users may be advised to keep Ruby-side
* references to prevent it occurring altogether.
*
* @author <a href="mailto:bill.dortch@gmail.com">Bill Dortch</a>
*
*/
public abstract class ObjectProxyCache<T,A> {
private static final Logger LOG = LoggerFactory.getLogger(ObjectProxyCache.class);
public static enum ReferenceType { WEAK, SOFT }
private static final int DEFAULT_SEGMENTS = 16; // must be power of 2
private static final int DEFAULT_SEGMENT_SIZE = 8; // must be power of 2
private static final float DEFAULT_LOAD_FACTOR = 0.75f;
private static final int MAX_CAPACITY = 1 << 30;
private static final int MAX_SEGMENTS = 1 << 16;
private static final int VULTURE_RUN_FREQ_SECONDS = 5;
private static int _nextId = 0;
private static synchronized int nextId() {
return ++_nextId;
}
private final ReferenceType referenceType;
private final Segment<T,A>[] segments;
private final int segmentShift;
private final int segmentMask;
private Thread vulture;
private final int id;
public ObjectProxyCache() {
this(DEFAULT_SEGMENTS, DEFAULT_SEGMENT_SIZE, ReferenceType.WEAK);
}
public ObjectProxyCache(ReferenceType refType) {
this(DEFAULT_SEGMENTS, DEFAULT_SEGMENT_SIZE, refType);
}
public ObjectProxyCache(int numSegments, int initialSegCapacity, ReferenceType refType) {
if (numSegments <= 0 || initialSegCapacity <= 0 || refType == null) {
throw new IllegalArgumentException();
}
this.id = nextId();
this.referenceType = refType;
if (numSegments > MAX_SEGMENTS) numSegments = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < numSegments) {
++sshift;
ssize <<= 1;
}
// note segmentShift differs from ConcurrentHashMap's calculation due to
// issues with System.identityHashCode (upper n bits always 0, at least
// under Java 1.6 / WinXP)
this.segmentShift = 24 - sshift;
this.segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialSegCapacity > MAX_CAPACITY) {
initialSegCapacity = MAX_CAPACITY;
}
int cap = 1;
while (cap < initialSegCapacity) cap <<= 1;
for (int i = ssize; --i >= 0; ) {
segments[i] = new Segment<T,A>(cap, this);
}
// vulture thread will periodically expunge dead
// entries. entries are also expunged during 'put'
// operations; this is designed to cover the case where
// many objects are created initially, followed by limited
// put activity.
//
// FIXME: DISABLED (below) pending resolution of finalization issue
//
try {
this.vulture = new Thread("ObjectProxyCache "+id+" vulture") {
public void run() {
for ( ;; ) {
try {
sleep(VULTURE_RUN_FREQ_SECONDS * 1000);
} catch (InterruptedException e) {}
boolean dump = size() > 200;
if (dump) {
LOG.debug("***Vulture {} waking, stats:", id);
LOG.debug(stats());
}
for (int i = segments.length; --i >= 0; ) {
Segment<T,A> seg = segments[i];
seg.lock();
try {
seg.expunge();
} finally {
seg.unlock();
}
yield();
}
if (dump) {
LOG.debug("***Vulture {} sleeping, stats:", id);
LOG.debug(stats());
}
}
}
};
vulture.setDaemon(true);
} catch (SecurityException e) {
this.vulture = null;
}
// FIXME: vulture daemon thread prevents finalization,
// find alternative approach.
// vulture.start();
// System.err.println("***ObjectProxyCache " + id + " started at "+ new java.util.Date());
}
// protected void finalize() throws Throwable {
// System.err.println("***ObjectProxyCache " + id + " finalized at "+ new java.util.Date());
// }
public abstract T allocateProxy(Object javaObject, A allocator);
public T get(Object javaObject) {
if (javaObject == null) return null;
int hash = hash(javaObject);
return segmentFor(hash).get(javaObject, hash);
}
public T getOrCreate(Object javaObject, A allocator) {
if (javaObject == null || allocator == null) return null;
int hash = hash(javaObject);
return segmentFor(hash).getOrCreate(javaObject, hash, allocator);
}
public void put(Object javaObject, T proxy) {
if (javaObject == null || proxy == null) return;
int hash = hash(javaObject);
segmentFor(hash).put(javaObject, hash, proxy);
}
private static int hash(Object javaObject) {
int h = System.identityHashCode(javaObject);
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
private Segment<T,A> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
Returns the approximate size (elements in use) of the cache. The
sizes of the segments are summed. No effort is made to synchronize
across segments, so the value returned may differ from the actual
size at any point in time.
Returns:
/**
* Returns the approximate size (elements in use) of the cache. The
* sizes of the segments are summed. No effort is made to synchronize
* across segments, so the value returned may differ from the actual
* size at any point in time.
*
* @return
*/
public int size() {
int size = 0;
for (Segment<T,A> seg : segments) {
size += seg.tableSize;
}
return size;
}
public String stats() {
StringBuilder b = new StringBuilder();
int n = 0;
int size = 0;
int alloc = 0;
b.append("Segments: ").append(segments.length).append("\n");
for (Segment<T,A> seg : segments) {
int ssize = 0;
int salloc = 0;
seg.lock();
try {
ssize = seg.count();
salloc = seg.entryTable.length;
} finally {
seg.unlock();
}
size += ssize;
alloc += salloc;
b.append("seg[").append(n++).append("]: size: ").append(ssize)
.append(" alloc: ").append(salloc).append("\n");
}
b.append("Total: size: ").append(size)
.append(" alloc: ").append(alloc).append("\n");
return b.toString();
}
// EntryRefs include hash with key to facilitate lookup by Segment#expunge
// after ref is removed from ReferenceQueue
private static interface EntryRef<T> {
T get();
int hash();
}
private static final class WeakEntryRef<T> extends WeakReference<T> implements EntryRef<T> {
final int hash;
WeakEntryRef(int hash, T rawObject, ReferenceQueue<Object> queue) {
super(rawObject, queue);
this.hash = hash;
}
public int hash() {
return hash;
}
}
private static final class SoftEntryRef<T> extends SoftReference<T> implements EntryRef<T> {
final int hash;
SoftEntryRef(int hash, T rawObject, ReferenceQueue<Object> queue) {
super(rawObject, queue);
this.hash = hash;
}
public int hash() {
return hash;
}
}
// Unlike WeakHashMap, our Entry does not subclass WeakReference, but rather
// makes it a final field. The theory is that doing so should force a happens-before
// relationship WRT the WeakReference constructor, guaranteeing that the key will be
// visibile to other threads (unless it's been GC'ed). See JLS 17.5 (final fields) and
// 17.4.5 (Happens-before order) to confirm or refute my reasoning here.
static class Entry<T> {
final EntryRef<Object> objectRef;
final int hash;
final EntryRef<T> proxyRef;
final Entry<T> next;
Entry(Object object, int hash, T proxy, ReferenceType type, Entry<T> next, ReferenceQueue<Object> queue) {
this.hash = hash;
this.next = next;
// references to the Java object and its proxy will either both be
// weak or both be soft, since the proxy contains a strong reference
// to the object, so it wouldn't make sense for the reference types
// to differ.
if (type == ReferenceType.WEAK) {
this.objectRef = new WeakEntryRef<Object>(hash, object, queue);
this.proxyRef = new WeakEntryRef<T>(hash, proxy, queue);
} else {
this.objectRef = new SoftEntryRef<Object>(hash, object, queue);
this.proxyRef = new SoftEntryRef<T>(hash, proxy, queue);
}
}
// ctor used by remove/rehash
Entry(EntryRef<Object> objectRef, int hash, EntryRef<T> proxyRef, Entry<T> next) {
this.objectRef = objectRef;
this.hash = hash;
this.proxyRef = proxyRef;
this.next = next;
}
@SuppressWarnings("unchecked")
static final <T> Entry<T>[] newArray(int size) {
return new Entry[size];
}
}
// lame generics issues: making Segment class static and manually
// inserting cache reference to work around various problems generically
// referencing methods/vars across classes.
static class Segment<T,A> extends ReentrantLock {
final ObjectProxyCache<T,A> cache;
final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<Object>();
volatile Entry<T>[] entryTable;
int tableSize;
int threshold;
Segment(int capacity, ObjectProxyCache<T,A> cache) {
threshold = (int)(capacity * DEFAULT_LOAD_FACTOR);
entryTable = Entry.newArray(capacity);
this.cache = cache;
}
// must be called under lock
private void expunge() {
Entry<T>[] table = entryTable;
ReferenceQueue<Object> queue = referenceQueue;
EntryRef ref;
// note that we'll potentially see the refs for both the java object and
// proxy -- whichever we see first will cause the entry to be removed;
// the other will not match an entry and will be ignored.
while ((ref = (EntryRef)queue.poll()) != null) {
int hash;
for (Entry<T> e = table[(hash = ref.hash()) & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && (ref == e.objectRef || ref == e.proxyRef)) {
remove(table, hash, e);
break;
}
}
}
}
// must be called under lock
private void remove(Entry<T>[] table, int hash, Entry<T> e) {
int index = hash & (table.length - 1);
Entry<T> first = table[index];
for (Entry<T> n = first; n != null; n = n.next) {
if (n == e) {
Entry<T> newFirst = n.next;
for (Entry<T> p = first; p != n; p = p.next) {
newFirst = new Entry<T>(p.objectRef, p.hash, p.proxyRef, newFirst);
}
table[index] = newFirst;
tableSize--;
entryTable = table; // write-volatile
return;
}
}
}
// temp method to verify tableSize value
// must be called under lock
private int count() {
int count = 0;
for (Entry<T> e : entryTable) {
while (e != null) {
count++;
e = e.next;
}
}
return count;
}
// must be called under lock
private Entry<T>[] rehash() {
assert tableSize == count() : "tableSize "+tableSize+" != count() "+count();
Entry<T>[] oldTable = entryTable; // read-volatile
int oldCapacity;
if ((oldCapacity = oldTable.length) >= MAX_CAPACITY) {
return oldTable;
}
int newCapacity = oldCapacity << 1;
int sizeMask = newCapacity - 1;
threshold = (int)(newCapacity * DEFAULT_LOAD_FACTOR);
Entry<T>[] newTable = Entry.newArray(newCapacity);
Entry<T> e;
for (int i = oldCapacity; --i >= 0; ) {
if ((e = oldTable[i]) != null) {
int idx = e.hash & sizeMask;
Entry<T> next;
if ((next = e.next) == null) {
// Single node in list
newTable[idx] = e;
} else {
// Reuse trailing consecutive sequence at same slot
int lastIdx = idx;
Entry<T> lastRun = e;
for (Entry<T> last = next; last != null; last = last.next) {
int k;
if ((k = last.hash & sizeMask) != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
for (Entry<T> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
Entry<T> m = new Entry<T>(p.objectRef, p.hash, p.proxyRef, newTable[k]);
newTable[k] = m;
}
}
}
}
entryTable = newTable; // write-volatile
return newTable;
}
void put(Object object, int hash, T proxy) {
lock();
try {
expunge();
Entry<T>[] table;
int potentialNewSize;
if ((potentialNewSize = tableSize + 1) > threshold) {
table = rehash(); // indirect read-/write- volatile
} else {
table = entryTable; // read-volatile
}
int index;
Entry<T> e;
for (e = table[index = hash & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if (proxy == e.proxyRef.get()) return;
// entry exists, proxy doesn't match. replace.
// this could happen if old proxy was gc'ed
// TODO: raise exception if stored proxy is non-null? (not gc'ed)
remove(table, hash, e);
potentialNewSize--;
break;
}
}
e = new Entry<T>(object, hash, proxy, cache.referenceType, table[index], referenceQueue);
table[index] = e;
tableSize = potentialNewSize;
entryTable = table; // write-volatile
} finally {
unlock();
}
}
T getOrCreate(Object object, int hash, A allocator) {
Entry<T>[] table;
T proxy;
for (Entry<T> e = (table = entryTable)[hash & table.length - 1]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if ((proxy = e.proxyRef.get()) != null) return proxy;
break;
}
}
lock();
try {
expunge();
int potentialNewSize;
if ((potentialNewSize = tableSize + 1) > threshold) {
table = rehash(); // indirect read-/write- volatile
} else {
table = entryTable; // read-volatile
}
int index;
Entry<T> e;
for (e = table[index = hash & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if ((proxy = e.proxyRef.get()) != null) return proxy;
// entry exists, proxy has been gc'ed. replace entry.
remove(table, hash, e);
potentialNewSize--;
break;
}
}
proxy = cache.allocateProxy(object, allocator);
e = new Entry<T>(object, hash, proxy, cache.referenceType, table[index], referenceQueue);
table[index] = e;
tableSize = potentialNewSize;
entryTable = table; // write-volatile
return proxy;
} finally {
unlock();
}
}
T get(Object object, int hash) {
Entry<T>[] table;
for (Entry<T> e = (table = entryTable)[hash & table.length - 1]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
return e.proxyRef.get();
}
}
return null;
}
@SuppressWarnings("unchecked")
static final <T,A> Segment<T,A>[] newArray(int size) {
return new Segment[size];
}
}
}