/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.hints;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.Throwables;

import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
import static org.apache.cassandra.utils.Throwables.perform;

class HintsWriter implements AutoCloseable
{
    static final int PAGE_SIZE = 4096;

    private final File directory;
    private final HintsDescriptor descriptor;
    private final File file;
    protected final FileChannel channel;
    private final int fd;
    protected final CRC32 globalCRC;

    private volatile long lastSyncPosition = 0L;

    protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
    {
        this.directory = directory;
        this.descriptor = descriptor;
        this.file = file;
        this.channel = channel;
        this.fd = fd;
        this.globalCRC = globalCRC;
    }

    @SuppressWarnings("resource") // HintsWriter owns channel
    static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOException
    {
        File file = new File(directory, descriptor.fileName());

        FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
        int fd = NativeLibrary.getfd(channel);

        CRC32 crc = new CRC32();

        try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
        {
            // write the descriptor
            descriptor.serialize(dob);
            ByteBuffer descriptorBytes = dob.buffer();
            updateChecksum(crc, descriptorBytes);
            channel.write(descriptorBytes);
        }
        catch (Throwable e)
        {
            channel.close();
            throw e;
        }

        if (descriptor.isEncrypted())
            return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
        if (descriptor.isCompressed())
            return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
    }

    HintsDescriptor descriptor()
    {
        return descriptor;
    }

    private void writeChecksum()
    {
        File checksumFile = new File(directory, descriptor.checksumFileName());
        try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
        {
            out.write(Integer.toHexString((int) globalCRC.getValue()).getBytes(StandardCharsets.UTF_8));
        }
        catch (IOException e)
        {
            throw new FSWriteError(e, checksumFile);
        }
    }

    public void close()
    {
        perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close);

        writeChecksum();
    }

    public void fsync()
    {
        perform(file, Throwables.FileOpType.WRITE, this::doFsync);
    }

    private void doFsync() throws IOException
    {
        SyncUtil.force(channel, true);
        lastSyncPosition = channel.position();
    }

    Session newSession(ByteBuffer buffer)
    {
        try
        {
            return new Session(buffer, channel.size());
        }
        catch (IOException e)
        {
            throw new FSWriteError(e, file);
        }
    }

    
Writes byte buffer into the file channel. Buffer should be flipped before calling this
/** * Writes byte buffer into the file channel. Buffer should be flipped before calling this */
protected void writeBuffer(ByteBuffer bb) throws IOException { updateChecksum(globalCRC, bb); channel.write(bb); }
The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds of hints writers, and ensure that their contents are always written to the underlying channels in the end.
/** * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds * of hints writers, and ensure that their contents are always written to the underlying channels in the end. */
final class Session implements AutoCloseable { private final ByteBuffer buffer; private final long initialSize; private long bytesWritten; Session(ByteBuffer buffer, long initialSize) { buffer.clear(); bytesWritten = 0L; this.buffer = buffer; this.initialSize = initialSize; } @VisibleForTesting long getBytesWritten() { return bytesWritten; } long position() { return initialSize + bytesWritten; }
Appends the serialized hint (with CRC included) to this session's aggregation buffer, writes to the underlying channel when the buffer is overflown.
Params:
  • hint – the serialized hint (with CRC included)
Throws:
/** * Appends the serialized hint (with CRC included) to this session's aggregation buffer, * writes to the underlying channel when the buffer is overflown. * * @param hint the serialized hint (with CRC included) * @throws IOException */
void append(ByteBuffer hint) throws IOException { bytesWritten += hint.remaining(); // if the hint to write won't fit in the aggregation buffer, flush it if (hint.remaining() > buffer.remaining()) { buffer.flip(); writeBuffer(buffer); buffer.clear(); } // if the hint fits in the aggregation buffer, then update the aggregation buffer, // otherwise write the hint buffer to the channel if (hint.remaining() <= buffer.remaining()) { buffer.put(hint); } else { writeBuffer(hint); } }
Serializes and appends the hint (with CRC included) to this session's aggregation buffer, writes to the underlying channel when the buffer is overflown. Used mainly by tests and LegacyHintsMigrator
Params:
  • hint – the unserialized hint
Throws:
/** * Serializes and appends the hint (with CRC included) to this session's aggregation buffer, * writes to the underlying channel when the buffer is overflown. * * Used mainly by tests and {@link LegacyHintsMigrator} * * @param hint the unserialized hint * @throws IOException */
void append(Hint hint) throws IOException { int hintSize = (int) Hint.serializer.serializedSize(hint, descriptor.messagingVersion()); int totalSize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE; if (totalSize > buffer.remaining()) flushBuffer(); ByteBuffer hintBuffer = totalSize <= buffer.remaining() ? buffer : ByteBuffer.allocate(totalSize); CRC32 crc = new CRC32(); try (DataOutputBufferFixed out = new DataOutputBufferFixed(hintBuffer)) { out.writeInt(hintSize); updateChecksumInt(crc, hintSize); out.writeInt((int) crc.getValue()); Hint.serializer.serialize(hint, out, descriptor.messagingVersion()); updateChecksum(crc, hintBuffer, hintBuffer.position() - hintSize, hintSize); out.writeInt((int) crc.getValue()); } if (hintBuffer == buffer) bytesWritten += totalSize; else append((ByteBuffer) hintBuffer.flip()); }
Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs.
Throws:
  • IOException –
/** * Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs. * @throws IOException */
public void close() throws IOException { flushBuffer(); maybeFsync(); maybeSkipCache(); } private void flushBuffer() throws IOException { buffer.flip(); if (buffer.remaining() > 0) { writeBuffer(buffer); } buffer.clear(); } private void maybeFsync() { if (position() >= lastSyncPosition + DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024L) fsync(); } private void maybeSkipCache() { long position = position(); // don't skip page cache for tiny files, on the assumption that if they are tiny, the target node is probably // alive, and if so, the file will be closed and dispatched shortly (within a minute), and the file will be dropped. if (position >= DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024L) NativeLibrary.trySkipCache(fd, 0, position - (position % PAGE_SIZE), file.getPath()); } } }