/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.streaming;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ning.compress.lzf.LZFOutputStream;

import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;

StreamWriter writes given section of the SSTable to given channel.
/** * StreamWriter writes given section of the SSTable to given channel. */
public class StreamWriter { private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class); protected final SSTableReader sstable; protected final Collection<Pair<Long, Long>> sections; protected final StreamRateLimiter limiter; protected final StreamSession session; private OutputStream compressedOutput; // allocate buffer to use for transfers only once private byte[] transferBuffer; public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session) { this.session = session; this.sstable = sstable; this.sections = sections; this.limiter = StreamManager.getRateLimiter(session.peer); }
Stream file of specified sections to given channel. StreamWriter uses LZF compression on wire to decrease size to transfer.
Params:
  • output – where this writes data to
Throws:
/** * Stream file of specified sections to given channel. * * StreamWriter uses LZF compression on wire to decrease size to transfer. * * @param output where this writes data to * @throws IOException on any I/O error */
public void write(DataOutputStreamPlus output) throws IOException { long totalSize = totalSize(); logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); try(RandomAccessReader file = sstable.openDataReader(); ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() ? DataIntegrityMetadata.checksumValidator(sstable.descriptor) : null;) { transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize]; // setting up data compression stream compressedOutput = new LZFOutputStream(output); long progress = 0L; // stream each of the required sections of the file for (Pair<Long, Long> section : sections) { long start = validator == null ? section.left : validator.chunkStart(section.left); int readOffset = (int) (section.left - start); // seek to the beginning of the section file.seek(start); if (validator != null) validator.seek(start); // length of the section to read long length = section.right - start; // tracks write progress long bytesRead = 0; while (bytesRead < length) { long lastBytesRead = write(file, validator, readOffset, length, bytesRead); bytesRead += lastBytesRead; progress += (lastBytesRead - readOffset); session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); readOffset = 0; } // make sure that current section is sent compressedOutput.flush(); } logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); } } protected long totalSize() { long size = 0; for (Pair<Long, Long> section : sections) size += section.right - section.left; return size; }
Sequentially read bytes from the file and write them to the output stream
Params:
  • reader – The file reader to read from
  • validator – validator to verify data integrity
  • start – number of bytes to skip transfer, but include for validation.
  • length – The full length that should be read from reader
  • bytesTransferred – Number of bytes already read out of length
Throws:
Returns:Number of bytes read
/** * Sequentially read bytes from the file and write them to the output stream * * @param reader The file reader to read from * @param validator validator to verify data integrity * @param start number of bytes to skip transfer, but include for validation. * @param length The full length that should be read from {@code reader} * @param bytesTransferred Number of bytes already read out of {@code length} * * @return Number of bytes read * * @throws java.io.IOException on any I/O error */
protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException { int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred); int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer()); reader.readFully(transferBuffer, 0, minReadable); if (validator != null) validator.validate(transferBuffer, 0, minReadable); limiter.acquire(toTransfer - start); compressedOutput.write(transferBuffer, start, (toTransfer - start)); return toTransfer; } }