/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.commitlog;

import java.nio.ByteBuffer;

import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.ICompressor;

Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written section of the buffer and writes it to the destination channel. The format of the compressed commit log is as follows: - standard commit log header (as written by CommitLogDescriptor.writeHeader(ByteBuffer, CommitLogDescriptor)) - a series of 'sync segments' that are written every time the commit log is sync()'ed -- a sync section header, see CommitLogSegment.writeSyncMarker(long, ByteBuffer, int, int, int) -- total plain text length for this section -- a block of compressed data
/** * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written * section of the buffer and writes it to the destination channel. * * The format of the compressed commit log is as follows: * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) * - a series of 'sync segments' that are written every time the commit log is sync()'ed * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)} * -- total plain text length for this section * -- a block of compressed data */
public class CompressedSegment extends FileDirectSegment { static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4; final ICompressor compressor;
Constructs a new segment file.
/** * Constructs a new segment file. */
CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) { super(commitLog, manager); this.compressor = commitLog.configuration.getCompressor(); manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType()); } ByteBuffer createBuffer(CommitLog commitLog) { return manager.getBufferPool().createBuffer(commitLog.configuration.getCompressor().preferredBufferType()); } @Override void write(int startMarker, int nextMarker) { int contentStart = startMarker + SYNC_MARKER_SIZE; int length = nextMarker - contentStart; // The length may be 0 when the segment is being closed. assert length > 0 || length == 0 && !isStillAllocating(); try { int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE; ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer(neededBufferSize); ByteBuffer inputBuffer = buffer.duplicate(); inputBuffer.limit(contentStart + length).position(contentStart); compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE); compressor.compress(inputBuffer, compressedBuffer); compressedBuffer.flip(); compressedBuffer.putInt(SYNC_MARKER_SIZE, length); // Only one thread can be here at a given time. // Protected by synchronization on CommitLogSegment.sync(). writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); manager.addSize(compressedBuffer.limit()); channel.write(compressedBuffer); assert channel.position() - lastWrittenPos == compressedBuffer.limit(); lastWrittenPos = channel.position(); } catch (Exception e) { throw new FSWriteError(e, getPath()); } } @Override public long onDiskSize() { return lastWrittenPos; } }