/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.commitlog;

import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.FileUtils;

A very simple Bytebuffer pool with a fixed allocation size and a cached max allocation count. Will allow you to go past the "max", freeing all buffers allocated beyond the max buffer count on release. Has a reusable thread local ByteBuffer that users can make use of.
/** * A very simple Bytebuffer pool with a fixed allocation size and a cached max allocation count. Will allow * you to go past the "max", freeing all buffers allocated beyond the max buffer count on release. * * Has a reusable thread local ByteBuffer that users can make use of. */
public class SimpleCachedBufferPool { protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return ByteBuffer.allocate(0); } }; private Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>(); private AtomicInteger usedBuffers = new AtomicInteger(0);
Maximum number of buffers in the compression pool. Any buffers above this count that are allocated will be cleaned upon release rather than held and re-used.
/** * Maximum number of buffers in the compression pool. Any buffers above this count that are allocated will be cleaned * upon release rather than held and re-used. */
private final int maxBufferPoolSize;
Size of individual buffer segments on allocation.
/** * Size of individual buffer segments on allocation. */
private final int bufferSize; private BufferType preferredReusableBufferType = BufferType.ON_HEAP; public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize) { this.maxBufferPoolSize = maxBufferPoolSize; this.bufferSize = bufferSize; } public ByteBuffer createBuffer(BufferType bufferType) { usedBuffers.incrementAndGet(); ByteBuffer buf = bufferPool.poll(); if (buf != null) { buf.clear(); return buf; } return bufferType.allocate(bufferSize); } public ByteBuffer getThreadLocalReusableBuffer(int size) { ByteBuffer result = reusableBufferHolder.get(); if (result.capacity() < size || BufferType.typeOf(result) != preferredReusableBufferType) { FileUtils.clean(result); result = preferredReusableBufferType.allocate(size); reusableBufferHolder.set(result); } return result; } public void setPreferredReusableBufferType(BufferType type) { preferredReusableBufferType = type; } public void releaseBuffer(ByteBuffer buffer) { usedBuffers.decrementAndGet(); if (bufferPool.size() < maxBufferPoolSize) bufferPool.add(buffer); else FileUtils.clean(buffer); } public void shutdown() { bufferPool.clear(); } public boolean atLimit() { return usedBuffers.get() >= maxBufferPoolSize; } @Override public String toString() { return new StringBuilder() .append("SimpleBufferPool:") .append(" bufferCount:").append(usedBuffers.get()) .append(", bufferSize:").append(maxBufferPoolSize) .append(", buffer size:").append(bufferSize) .toString(); } }