//
//  ========================================================================
//  Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.IntFunction;

import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.

Given a capacity factor of 1024, the first array element holds a queue of ByteBuffers each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity 2048, and so on.

/** * <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p> * <p>Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers * each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity * 2048, and so on.</p> */
@ManagedObject public class ArrayByteBufferPool extends AbstractByteBufferPool { private final int _minCapacity; private final ByteBufferPool.Bucket[] _direct; private final ByteBufferPool.Bucket[] _indirect;
Creates a new ArrayByteBufferPool with a default configuration.
/** * Creates a new ArrayByteBufferPool with a default configuration. */
public ArrayByteBufferPool() { this(-1, -1, -1); }
Creates a new ArrayByteBufferPool with the given configuration.
Params:
  • minCapacity – the minimum ByteBuffer capacity
  • factor – the capacity factor
  • maxCapacity – the maximum ByteBuffer capacity
/** * Creates a new ArrayByteBufferPool with the given configuration. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor * @param maxCapacity the maximum ByteBuffer capacity */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) { this(minCapacity, factor, maxCapacity, -1, -1, -1); }
Creates a new ArrayByteBufferPool with the given configuration.
Params:
  • minCapacity – the minimum ByteBuffer capacity
  • factor – the capacity factor
  • maxCapacity – the maximum ByteBuffer capacity
  • maxQueueLength – the maximum ByteBuffer queue length
/** * Creates a new ArrayByteBufferPool with the given configuration. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor * @param maxCapacity the maximum ByteBuffer capacity * @param maxQueueLength the maximum ByteBuffer queue length */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength) { this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1); }
Creates a new ArrayByteBufferPool with the given configuration.
Params:
  • minCapacity – the minimum ByteBuffer capacity
  • factor – the capacity factor
  • maxCapacity – the maximum ByteBuffer capacity
  • maxQueueLength – the maximum ByteBuffer queue length
  • maxHeapMemory – the max heap memory in bytes
  • maxDirectMemory – the max direct memory in bytes
/** * Creates a new ArrayByteBufferPool with the given configuration. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor * @param maxCapacity the maximum ByteBuffer capacity * @param maxQueueLength the maximum ByteBuffer queue length * @param maxHeapMemory the max heap memory in bytes * @param maxDirectMemory the max direct memory in bytes */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) { super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory); factor = getCapacityFactor(); if (minCapacity <= 0) minCapacity = 0; if (maxCapacity <= 0) maxCapacity = 64 * 1024; if ((maxCapacity % factor) != 0 || factor >= maxCapacity) throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); _minCapacity = minCapacity; int length = maxCapacity / factor; _direct = new ByteBufferPool.Bucket[length]; _indirect = new ByteBufferPool.Bucket[length]; } @Override public ByteBuffer acquire(int size, boolean direct) { int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor(); ByteBufferPool.Bucket bucket = bucketFor(size, direct, null); if (bucket == null) return newByteBuffer(capacity, direct); ByteBuffer buffer = bucket.acquire(); if (buffer == null) return newByteBuffer(capacity, direct); decrementMemory(buffer); return buffer; } @Override public void release(ByteBuffer buffer) { if (buffer == null) return; boolean direct = buffer.isDirect(); ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), direct, this::newBucket); if (bucket != null) { bucket.release(buffer); incrementMemory(buffer); releaseExcessMemory(direct, this::clearOldestBucket); } } private Bucket newBucket(int key) { return new Bucket(key * getCapacityFactor(), getMaxQueueLength()); } @Override public void clear() { super.clear(); for (int i = 0; i < _direct.length; ++i) { Bucket bucket = _direct[i]; if (bucket != null) bucket.clear(); _direct[i] = null; bucket = _indirect[i]; if (bucket != null) bucket.clear(); _indirect[i] = null; } } private void clearOldestBucket(boolean direct) { long oldest = Long.MAX_VALUE; int index = -1; Bucket[] buckets = bucketsFor(direct); for (int i = 0; i < buckets.length; ++i) { Bucket bucket = buckets[i]; if (bucket == null) continue; long lastUpdate = bucket.getLastUpdate(); if (lastUpdate < oldest) { oldest = lastUpdate; index = i; } } if (index >= 0) { Bucket bucket = buckets[index]; buckets[index] = null; // The same bucket may be concurrently // removed, so we need this null guard. if (bucket != null) bucket.clear(this::decrementMemory); } } private int bucketFor(int capacity) { return (capacity - 1) / getCapacityFactor(); } private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket) { if (capacity < _minCapacity) return null; int b = bucketFor(capacity); if (b >= _direct.length) return null; Bucket[] buckets = bucketsFor(direct); Bucket bucket = buckets[b]; if (bucket == null && newBucket != null) buckets[b] = bucket = newBucket.apply(b + 1); return bucket; } @ManagedAttribute("The number of pooled direct ByteBuffers") public long getDirectByteBufferCount() { return getByteBufferCount(true); } @ManagedAttribute("The number of pooled heap ByteBuffers") public long getHeapByteBufferCount() { return getByteBufferCount(false); } private long getByteBufferCount(boolean direct) { return Arrays.stream(bucketsFor(direct)) .filter(Objects::nonNull) .mapToLong(Bucket::size) .sum(); } // Package local for testing ByteBufferPool.Bucket[] bucketsFor(boolean direct) { return direct ? _direct : _indirect; } }