/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.io.sstable;

import java.lang.ref.WeakReference;
import java.util.*;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Transactional;

Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully flushed to the index file, and then double check that the key is fully present in the flushed data file. Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable for on-close (i.e. when all references expire) that drops the page cache prior to that key position hard-links are created for each partially written sstable so that readers opened against them continue to work past the rename of the temporary file, which is deleted once all readers against the hard-link have been closed. If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker. On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress, but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished successfully.
/** * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully * flushed to the index file, and then double check that the key is fully present in the flushed data file. * Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable * for on-close (i.e. when all references expire) that drops the page cache prior to that key position * * hard-links are created for each partially written sstable so that readers opened against them continue to work past * the rename of the temporary file, which is deleted once all readers against the hard-link have been closed. * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker. * * On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress, * but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished * successfully. */
public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional { @VisibleForTesting public static boolean disableEarlyOpeningForTests = false; private final long preemptiveOpenInterval; private final long maxAge; private long repairedAt = -1; // the set of final readers we will expose on commit private final ILifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced) private final List<SSTableReader> preparedForCommit = new ArrayList<>(); private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at private final List<SSTableWriter> writers = new ArrayList<>(); private final boolean keepOriginals; // true if we do not want to obsolete the originals private SSTableWriter writer; private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>(); // for testing (TODO: remove when have byteman setup) private boolean throwEarly, throwLate; @Deprecated public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline) { this(transaction, maxAge, isOffline, true); } @Deprecated public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly) { this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false); } @VisibleForTesting public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals) { this.transaction = transaction; this.maxAge = maxAge; this.keepOriginals = keepOriginals; this.preemptiveOpenInterval = preemptiveOpenInterval; } @Deprecated public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline) { return constructKeepingOriginals(transaction, keepOriginals, maxAge); } public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals); } public static SSTableRewriter constructWithoutEarlyOpening(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals); } public static SSTableRewriter construct(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals); } private static long calculateOpenInterval(boolean shouldOpenEarly) { long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20); if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0) interval = Long.MAX_VALUE; return interval; } public SSTableWriter currentWriter() { return writer; } public RowIndexEntry append(UnfilteredRowIterator partition) { // we do this before appending to ensure we can resetAndTruncate() safely if the append fails DecoratedKey key = partition.partitionKey(); maybeReopenEarly(key); RowIndexEntry index = writer.append(partition); if (!transaction.isOffline() && index != null) { for (SSTableReader reader : transaction.originals()) { if (reader.getCachedPosition(key, false) != null) { cachedKeys.put(key, index); break; } } } return index; } // attempts to append the row, if fails resets the writer position public RowIndexEntry tryAppend(UnfilteredRowIterator partition) { writer.mark(); try { return append(partition); } catch (Throwable t) { writer.resetAndTruncate(); throw t; } } private void maybeReopenEarly(DecoratedKey key) { if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval) { if (transaction.isOffline()) { for (SSTableReader reader : transaction.originals()) { RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE); NativeLibrary.trySkipCache(reader.getFilename(), 0, index == null ? 0 : index.position); } } else { SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly(); if (reader != null) { transaction.update(reader, false); currentlyOpenedEarlyAt = writer.getFilePointer(); moveStarts(reader, reader.last); transaction.checkpoint(); } } } } protected Throwable doAbort(Throwable accumulate) { // abort the writers for (SSTableWriter writer : writers) accumulate = writer.abort(accumulate); // abort the lifecycle transaction accumulate = transaction.abort(accumulate); return accumulate; } protected Throwable doCommit(Throwable accumulate) { for (SSTableWriter writer : writers) accumulate = writer.commit(accumulate); accumulate = transaction.commit(accumulate); return accumulate; }
Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset is true, we are instead restoring the starts of the readers from before the rewriting began note that we replace an existing sstable with a new *instance* of the same sstable, the replacement sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old instance, we would get exceptions.
Params:
  • newReader – the rewritten reader that replaces them for this region
  • lowerbound – if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
/** * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset * is true, we are instead restoring the starts of the readers from before the rewriting began * * note that we replace an existing sstable with a new *instance* of the same sstable, the replacement * sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old * one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old * instance, we would get exceptions. * * @param newReader the rewritten reader that replaces them for this region * @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable */
private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound) { if (transaction.isOffline()) return; if (preemptiveOpenInterval == Long.MAX_VALUE) return; newReader.setupOnline(); List<DecoratedKey> invalidateKeys = null; if (!cachedKeys.isEmpty()) { invalidateKeys = new ArrayList<>(cachedKeys.size()); for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet()) { invalidateKeys.add(cacheKey.getKey()); newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue()); } } cachedKeys.clear(); for (SSTableReader sstable : transaction.originals()) { // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once. // note: only one such writer should be written to at any moment final SSTableReader latest = transaction.current(sstable); // skip any sstables that we know to already be shadowed if (latest.first.compareTo(lowerbound) > 0) continue; Runnable runOnClose = invalidateKeys != null ? new InvalidateKeys(latest, invalidateKeys) : null; if (lowerbound.compareTo(latest.last) >= 0) { if (!transaction.isObsolete(latest)) { if (runOnClose != null) { latest.runOnClose(runOnClose); } transaction.obsolete(latest); } continue; } DecoratedKey newStart = latest.firstKeyBeyond(lowerbound); assert newStart != null; SSTableReader replacement = latest.cloneWithNewStart(newStart, runOnClose); transaction.update(replacement, true); } } private static final class InvalidateKeys implements Runnable { final List<KeyCacheKey> cacheKeys = new ArrayList<>(); final WeakReference<InstrumentingCache<KeyCacheKey, ?>> cacheRef; private InvalidateKeys(SSTableReader reader, Collection<DecoratedKey> invalidate) { this.cacheRef = new WeakReference<>(reader.getKeyCache()); if (cacheRef.get() != null) { for (DecoratedKey key : invalidate) cacheKeys.add(reader.getCacheKey(key)); } } public void run() { for (KeyCacheKey key : cacheKeys) { InstrumentingCache<KeyCacheKey, ?> cache = cacheRef.get(); if (cache != null) cache.remove(key); } } } public void switchWriter(SSTableWriter newWriter) { if (newWriter != null) writers.add(newWriter.setMaxDataAge(maxAge)); if (writer == null || writer.getFilePointer() == 0) { if (writer != null) { writer.abort(); transaction.untrackNew(writer); writers.remove(writer); } writer = newWriter; return; } if (preemptiveOpenInterval != Long.MAX_VALUE) { // we leave it as a tmp file, but we open it and add it to the Tracker SSTableReader reader = writer.setMaxDataAge(maxAge).openFinalEarly(); transaction.update(reader, false); moveStarts(reader, reader.last); transaction.checkpoint(); } currentlyOpenedEarlyAt = 0; writer = newWriter; }
Params:
  • repairedAt – the repair time, -1 if we should use the time we supplied when we created the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the repair time.
/** * @param repairedAt the repair time, -1 if we should use the time we supplied when we created * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the * repair time. */
public SSTableRewriter setRepairedAt(long repairedAt) { this.repairedAt = repairedAt; return this; }
Finishes the new file(s) Creates final files, adds the new files to the Tracker (via replaceReader). We add them to the tracker to be able to get rid of the tmpfiles It is up to the caller to do the compacted sstables replacement gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..))
/** * Finishes the new file(s) * * Creates final files, adds the new files to the Tracker (via replaceReader). * * We add them to the tracker to be able to get rid of the tmpfiles * * It is up to the caller to do the compacted sstables replacement * gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..)) * * */
public List<SSTableReader> finish() { super.finish(); return finished(); } // returns, in list form, the public List<SSTableReader> finished() { assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT; return preparedForCommit; } protected void doPrepare() { switchWriter(null); if (throwEarly) throw new RuntimeException("exception thrown early in finish, for testing"); // No early open to finalize and replace for (SSTableWriter writer : writers) { assert writer.getFilePointer() > 0; writer.setRepairedAt(repairedAt).setOpenResult(true).prepareToCommit(); SSTableReader reader = writer.finished(); transaction.update(reader, false); preparedForCommit.add(reader); } transaction.checkpoint(); if (throwLate) throw new RuntimeException("exception thrown after all sstables finished, for testing"); if (!keepOriginals) transaction.obsoleteOriginals(); transaction.prepareToCommit(); } public void throwDuringPrepare(boolean earlyException) { if (earlyException) throwEarly = true; else throwLate = true; } }