package org.glassfish.grizzly.memory;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.glassfish.grizzly.Cacheable;
import org.glassfish.grizzly.ThreadCache;
import org.glassfish.grizzly.monitoring.MonitoringConfig;
import org.glassfish.grizzly.monitoring.MonitoringUtils;
public class ByteBufferManager extends AbstractMemoryManager<ByteBufferWrapper> implements WrapperAware, ByteBufferAware {
public static final int DEFAULT_SMALL_BUFFER_SIZE = 32;
private static final ThreadCache.CachedTypeIndex<TrimAwareWrapper> CACHE_IDX = ThreadCache.obtainIndex(TrimAwareWrapper.class,
Integer.getInteger(ByteBufferManager.class.getName() + ".taw-cache-size", 2));
private final ThreadCache.CachedTypeIndex<SmallByteBufferWrapper> SMALL_BUFFER_CACHE_IDX = ThreadCache.obtainIndex(
SmallByteBufferWrapper.class.getName() + '.' + System.identityHashCode(this), SmallByteBufferWrapper.class,
Integer.getInteger(ByteBufferManager.class.getName() + ".sbbw-cache-size", 16));
protected boolean isDirect;
protected final int maxSmallBufferSize;
public ByteBufferManager() {
this(false, DEFAULT_MAX_BUFFER_SIZE, DEFAULT_SMALL_BUFFER_SIZE);
}
public ByteBufferManager(final boolean isDirect) {
this(isDirect, DEFAULT_MAX_BUFFER_SIZE, DEFAULT_SMALL_BUFFER_SIZE);
}
public ByteBufferManager(final boolean isDirect, final int maxBufferSize, final int maxSmallBufferSize) {
super(maxBufferSize);
this.maxSmallBufferSize = maxSmallBufferSize;
this.isDirect = isDirect;
}
public int getMaxSmallBufferSize() {
return maxSmallBufferSize;
}
@Override
public ByteBufferWrapper allocate(final int size) {
if (size <= maxSmallBufferSize) {
final SmallByteBufferWrapper buffer = createSmallBuffer();
buffer.limit(size);
return buffer;
}
return wrap(allocateByteBuffer(size));
}
@Override
public ByteBufferWrapper allocateAtLeast(int size) {
if (size <= maxSmallBufferSize) {
final SmallByteBufferWrapper buffer = createSmallBuffer();
buffer.limit(size);
return buffer;
}
return wrap(allocateByteBufferAtLeast(size));
}
@Override
public ByteBufferWrapper reallocate(ByteBufferWrapper oldBuffer, int newSize) {
return wrap(reallocateByteBuffer(oldBuffer.underlying(), newSize));
}
@Override
public void release(ByteBufferWrapper buffer) {
releaseByteBuffer(buffer.underlying());
}
public boolean isDirect() {
return isDirect;
}
public void setDirect(boolean isDirect) {
this.isDirect = isDirect;
}
@Override
public boolean willAllocateDirect(int size) {
return isDirect;
}
@Override
public ByteBufferWrapper wrap(byte[] data) {
return wrap(data, 0, data.length);
}
@Override
public ByteBufferWrapper wrap(byte[] data, int offset, int length) {
return wrap(ByteBuffer.wrap(data, offset, length));
}
@Override
public ByteBufferWrapper wrap(String s) {
return wrap(s, Charset.defaultCharset());
}
@Override
public ByteBufferWrapper wrap(String s, Charset charset) {
try {
byte[] byteRepresentation = s.getBytes(charset.name());
return wrap(ByteBuffer.wrap(byteRepresentation));
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
@Override
public ThreadLocalPool createThreadLocalPool() {
return new ByteBufferThreadLocalPool();
}
@Override
public ByteBufferWrapper wrap(final ByteBuffer byteBuffer) {
return createTrimAwareBuffer(byteBuffer);
}
@Override
@SuppressWarnings("unchecked")
public ByteBuffer allocateByteBuffer(final int size) {
if (size > maxBufferSize) {
return allocateByteBuffer0(size);
}
final ThreadLocalPool<ByteBuffer> threadLocalCache = getByteBufferThreadLocalPool();
if (threadLocalCache != null) {
final int remaining = threadLocalCache.remaining();
if (remaining == 0 || remaining < size) {
reallocatePoolBuffer();
}
return (ByteBuffer) allocateFromPool(threadLocalCache, size);
} else {
return allocateByteBuffer0(size);
}
}
@Override
@SuppressWarnings("unchecked")
public ByteBuffer allocateByteBufferAtLeast(final int size) {
if (size > maxBufferSize) {
return allocateByteBuffer0(size);
}
final ThreadLocalPool<ByteBuffer> threadLocalCache = getByteBufferThreadLocalPool();
if (threadLocalCache != null) {
int remaining = threadLocalCache.remaining();
if (remaining == 0 || remaining < size) {
reallocatePoolBuffer();
remaining = threadLocalCache.remaining();
}
return (ByteBuffer) allocateFromPool(threadLocalCache, remaining);
} else {
return allocateByteBuffer0(size);
}
}
@Override
@SuppressWarnings("unchecked")
public ByteBuffer reallocateByteBuffer(ByteBuffer oldByteBuffer, int newSize) {
if (oldByteBuffer.capacity() >= newSize) {
return oldByteBuffer;
}
final ThreadLocalPool<ByteBuffer> memoryPool = getByteBufferThreadLocalPool();
if (memoryPool != null) {
final ByteBuffer newBuffer = memoryPool.reallocate(oldByteBuffer, newSize);
if (newBuffer != null) {
ProbeNotifier.notifyBufferAllocatedFromPool(monitoringConfig, newSize - oldByteBuffer.capacity());
return newBuffer;
}
}
ByteBuffer newByteBuffer = allocateByteBuffer(newSize);
oldByteBuffer.flip();
return newByteBuffer.put(oldByteBuffer);
}
@Override
@SuppressWarnings("unchecked")
public void releaseByteBuffer(ByteBuffer byteBuffer) {
ThreadLocalPool<ByteBuffer> memoryPool = getByteBufferThreadLocalPool();
if (memoryPool != null) {
if (memoryPool.release((ByteBuffer) byteBuffer.clear())) {
ProbeNotifier.notifyBufferReleasedToPool(monitoringConfig, byteBuffer.capacity());
}
}
}
protected SmallByteBufferWrapper createSmallBuffer() {
final SmallByteBufferWrapper buffer = ThreadCache.takeFromCache(SMALL_BUFFER_CACHE_IDX);
if (buffer != null) {
ProbeNotifier.notifyBufferAllocatedFromPool(monitoringConfig, maxSmallBufferSize);
return buffer;
}
return new SmallByteBufferWrapper(allocateByteBuffer0(maxSmallBufferSize));
}
@Override
public MonitoringConfig<MemoryProbe> getMonitoringConfig() {
return monitoringConfig;
}
@Override
protected Object createJmxManagementObject() {
return MonitoringUtils.loadJmxObject("org.glassfish.grizzly.memory.jmx.ByteBufferManager", this, ByteBufferManager.class);
}
protected final ByteBuffer allocateByteBuffer0(final int size) {
ProbeNotifier.notifyBufferAllocated(monitoringConfig, size);
if (isDirect) {
return ByteBuffer.allocateDirect(size);
} else {
return ByteBuffer.allocate(size);
}
}
private TrimAwareWrapper createTrimAwareBuffer(final ByteBuffer underlyingByteBuffer) {
final TrimAwareWrapper buffer = ThreadCache.takeFromCache(CACHE_IDX);
if (buffer != null) {
buffer.visible = underlyingByteBuffer;
return buffer;
}
return new TrimAwareWrapper(underlyingByteBuffer);
}
@SuppressWarnings({ "unchecked" })
private void reallocatePoolBuffer() {
final ByteBuffer byteBuffer = allocateByteBuffer0(maxBufferSize);
final ThreadLocalPool<ByteBuffer> threadLocalCache = getByteBufferThreadLocalPool();
if (threadLocalCache != null) {
threadLocalCache.reset(byteBuffer);
}
}
@SuppressWarnings("unchecked")
private static ByteBufferThreadLocalPool getByteBufferThreadLocalPool() {
final ThreadLocalPool pool = getThreadLocalPool();
return pool instanceof ByteBufferThreadLocalPool ? (ByteBufferThreadLocalPool) pool : null;
}
private static final class ByteBufferThreadLocalPool implements ThreadLocalPool<ByteBuffer> {
private ByteBuffer pool;
private Object[] allocationHistory;
private int lastAllocatedIndex;
public ByteBufferThreadLocalPool() {
allocationHistory = new Object[8];
}
@Override
public void reset(ByteBuffer pool) {
Arrays.fill(allocationHistory, 0, lastAllocatedIndex, null);
lastAllocatedIndex = 0;
this.pool = pool;
}
@Override
public ByteBuffer allocate(int size) {
final ByteBuffer allocated = Buffers.slice(pool, size);
return addHistory(allocated);
}
@Override
public ByteBuffer reallocate(ByteBuffer oldByteBuffer, int newSize) {
if (isLastAllocated(oldByteBuffer) && remaining() + oldByteBuffer.capacity() >= newSize) {
lastAllocatedIndex--;
pool.position(pool.position() - oldByteBuffer.capacity());
final ByteBuffer newByteBuffer = Buffers.slice(pool, newSize);
newByteBuffer.position(oldByteBuffer.position());
return addHistory(newByteBuffer);
}
return null;
}
@Override
public boolean release(ByteBuffer underlyingBuffer) {
if (isLastAllocated(underlyingBuffer)) {
pool.position(pool.position() - underlyingBuffer.capacity());
allocationHistory[--lastAllocatedIndex] = null;
return true;
} else if (wantReset(underlyingBuffer.capacity())) {
reset(underlyingBuffer);
return true;
}
return false;
}
@Override
public boolean wantReset(int size) {
return !hasRemaining() || lastAllocatedIndex == 0 && pool.remaining() < size;
}
@Override
public boolean isLastAllocated(ByteBuffer oldByteBuffer) {
return lastAllocatedIndex > 0 && allocationHistory[lastAllocatedIndex - 1] == oldByteBuffer;
}
@Override
public ByteBuffer reduceLastAllocated(ByteBuffer byteBuffer) {
final ByteBuffer oldLastAllocated = (ByteBuffer) allocationHistory[lastAllocatedIndex - 1];
pool.position(pool.position() - (oldLastAllocated.capacity() - byteBuffer.capacity()));
allocationHistory[lastAllocatedIndex - 1] = byteBuffer;
return oldLastAllocated;
}
@Override
public int remaining() {
return pool != null ? pool.remaining() : 0;
}
@Override
public boolean hasRemaining() {
return remaining() > 0;
}
private ByteBuffer addHistory(ByteBuffer allocated) {
if (lastAllocatedIndex >= allocationHistory.length) {
allocationHistory = Arrays.copyOf(allocationHistory, allocationHistory.length * 3 / 2 + 1);
}
allocationHistory[lastAllocatedIndex++] = allocated;
return allocated;
}
@Override
public String toString() {
return "(pool=" + pool + " last-allocated-index=" + (lastAllocatedIndex - 1) + " allocation-history=" + Arrays.toString(allocationHistory) + ')';
}
}
private final class TrimAwareWrapper extends ByteBufferWrapper implements TrimAware {
private TrimAwareWrapper(ByteBuffer underlyingByteBuffer) {
super(underlyingByteBuffer);
}
@Override
@SuppressWarnings("unchecked")
public void trim() {
final int sizeToReturn = visible.capacity() - visible.position();
if (sizeToReturn > 0) {
final ThreadLocalPool<ByteBuffer> threadLocalCache = getByteBufferThreadLocalPool();
if (threadLocalCache != null) {
if (threadLocalCache.isLastAllocated(visible)) {
visible.flip();
visible = visible.slice();
threadLocalCache.reduceLastAllocated(visible);
return;
} else if (threadLocalCache.wantReset(sizeToReturn)) {
visible.flip();
final ByteBuffer originalByteBuffer = visible;
visible = visible.slice();
originalByteBuffer.position(originalByteBuffer.limit());
originalByteBuffer.limit(originalByteBuffer.capacity());
threadLocalCache.reset(originalByteBuffer);
return;
}
}
}
super.trim();
}
@Override
public void recycle() {
allowBufferDispose = false;
ThreadCache.putToCache(CACHE_IDX, this);
}
@Override
public void dispose() {
prepareDispose();
ByteBufferManager.this.release(this);
visible = null;
recycle();
}
@Override
protected ByteBufferWrapper wrapByteBuffer(ByteBuffer byteBuffer) {
return ByteBufferManager.this.wrap(byteBuffer);
}
}
protected final class SmallByteBufferWrapper extends ByteBufferWrapper implements Cacheable {
private SmallByteBufferWrapper(ByteBuffer underlyingByteBuffer) {
super(underlyingByteBuffer);
}
@Override
public void dispose() {
super.prepareDispose();
visible.clear();
recycle();
}
@Override
public void recycle() {
if (visible.remaining() == maxSmallBufferSize) {
allowBufferDispose = false;
disposeStackTrace = null;
if (ThreadCache.putToCache(SMALL_BUFFER_CACHE_IDX, this)) {
ProbeNotifier.notifyBufferReleasedToPool(monitoringConfig, maxSmallBufferSize);
}
}
}
@Override
protected ByteBufferWrapper wrapByteBuffer(final ByteBuffer byteBuffer) {
return ByteBufferManager.this.wrap(byteBuffer);
}
}
}