//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.io;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.

Given a capacity factor of 1024, the Map entry with key 1 holds a queue of ByteBuffers each of capacity 1024, the Map entry with key 2 holds a queue of ByteBuffers each of capacity 2048, and so on.

/** * <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.</p> * <p>Given a capacity {@code factor} of 1024, the Map entry with key {@code 1} holds a * queue of ByteBuffers each of capacity 1024, the Map entry with key {@code 2} holds a * queue of ByteBuffers each of capacity 2048, and so on.</p> */
@ManagedObject public class MappedByteBufferPool extends AbstractByteBufferPool { private static final Logger LOG = LoggerFactory.getLogger(MappedByteBufferPool.class); private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>(); private final Function<Integer, Bucket> _newBucket;
Creates a new MappedByteBufferPool with a default configuration.
/** * Creates a new MappedByteBufferPool with a default configuration. */
public MappedByteBufferPool() { this(-1); }
Creates a new MappedByteBufferPool with the given capacity factor.
Params:
  • factor – the capacity factor
/** * Creates a new MappedByteBufferPool with the given capacity factor. * * @param factor the capacity factor */
public MappedByteBufferPool(int factor) { this(factor, -1); }
Creates a new MappedByteBufferPool with the given configuration.
Params:
  • factor – the capacity factor
  • maxQueueLength – the maximum ByteBuffer queue length
/** * Creates a new MappedByteBufferPool with the given configuration. * * @param factor the capacity factor * @param maxQueueLength the maximum ByteBuffer queue length */
public MappedByteBufferPool(int factor, int maxQueueLength) { this(factor, maxQueueLength, null); }
Creates a new MappedByteBufferPool with the given configuration.
Params:
  • factor – the capacity factor
  • maxQueueLength – the maximum ByteBuffer queue length
  • newBucket – the function that creates a Bucket
/** * Creates a new MappedByteBufferPool with the given configuration. * * @param factor the capacity factor * @param maxQueueLength the maximum ByteBuffer queue length * @param newBucket the function that creates a Bucket */
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket) { this(factor, maxQueueLength, newBucket, -1, -1); }
Creates a new MappedByteBufferPool with the given configuration.
Params:
  • factor – the capacity factor
  • maxQueueLength – the maximum ByteBuffer queue length
  • newBucket – the function that creates a Bucket
  • maxHeapMemory – the max heap memory in bytes
  • maxDirectMemory – the max direct memory in bytes
/** * Creates a new MappedByteBufferPool with the given configuration. * * @param factor the capacity factor * @param maxQueueLength the maximum ByteBuffer queue length * @param newBucket the function that creates a Bucket * @param maxHeapMemory the max heap memory in bytes * @param maxDirectMemory the max direct memory in bytes */
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory) { super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory); _newBucket = newBucket != null ? newBucket : this::newBucket; } private Bucket newBucket(int key) { return new Bucket(key * getCapacityFactor(), getMaxQueueLength()); } @Override public ByteBuffer acquire(int size, boolean direct) { int b = bucketFor(size); int capacity = b * getCapacityFactor(); ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct); Bucket bucket = buffers.get(b); if (bucket == null) return newByteBuffer(capacity, direct); ByteBuffer buffer = bucket.acquire(); if (buffer == null) return newByteBuffer(capacity, direct); decrementMemory(buffer); return buffer; } @Override public void release(ByteBuffer buffer) { if (buffer == null) return; // nothing to do int capacity = buffer.capacity(); // Validate that this buffer is from this pool. if ((capacity % getCapacityFactor()) != 0) { if (LOG.isDebugEnabled()) LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer)); return; } int b = bucketFor(capacity); boolean direct = buffer.isDirect(); ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct); Bucket bucket = buckets.computeIfAbsent(b, _newBucket); bucket.release(buffer); incrementMemory(buffer); releaseExcessMemory(direct, this::clearOldestBucket); } @Override public void clear() { super.clear(); _directBuffers.values().forEach(Bucket::clear); _directBuffers.clear(); _heapBuffers.values().forEach(Bucket::clear); _heapBuffers.clear(); } private void clearOldestBucket(boolean direct) { long oldest = Long.MAX_VALUE; int index = -1; ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct); for (Map.Entry<Integer, Bucket> entry : buckets.entrySet()) { Bucket bucket = entry.getValue(); long lastUpdate = bucket.getLastUpdate(); if (lastUpdate < oldest) { oldest = lastUpdate; index = entry.getKey(); } } if (index >= 0) { Bucket bucket = buckets.remove(index); // The same bucket may be concurrently // removed, so we need this null guard. if (bucket != null) bucket.clear(this::decrementMemory); } } private int bucketFor(int size) { int factor = getCapacityFactor(); int bucket = size / factor; if (bucket * factor != size) ++bucket; return bucket; } @ManagedAttribute("The number of pooled direct ByteBuffers") public long getDirectByteBufferCount() { return getByteBufferCount(true); } @ManagedAttribute("The number of pooled heap ByteBuffers") public long getHeapByteBufferCount() { return getByteBufferCount(false); } private long getByteBufferCount(boolean direct) { return bucketsFor(direct).values().stream() .mapToLong(Bucket::size) .sum(); } // Package local for testing ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct) { return direct ? _directBuffers : _heapBuffers; } public static class Tagged extends MappedByteBufferPool { private final AtomicInteger tag = new AtomicInteger(); @Override public ByteBuffer newByteBuffer(int capacity, boolean direct) { ByteBuffer buffer = super.newByteBuffer(capacity + 4, direct); buffer.limit(buffer.capacity()); buffer.putInt(tag.incrementAndGet()); ByteBuffer slice = buffer.slice(); BufferUtil.clear(slice); return slice; } } }