/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.commitlog;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Timer.Context;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;

public abstract class AbstractCommitLogService
{
    
When in CommitLogSync.periodic mode, the default number of milliseconds to wait between updating the commit log chained markers.
/** * When in {@link Config.CommitLogSync#periodic} mode, the default number of milliseconds to wait between updating * the commit log chained markers. */
static final long DEFAULT_MARKER_INTERVAL_MILLIS = 100; private volatile Thread thread; private volatile boolean shutdown = false; // all Allocations written before this time will be synced protected volatile long lastSyncedAt = System.currentTimeMillis(); // counts of total written, and pending, log messages private final AtomicLong written = new AtomicLong(0); protected final AtomicLong pending = new AtomicLong(0); // signal that writers can wait on to be notified of a completed sync protected final WaitQueue syncComplete = new WaitQueue(); final CommitLog commitLog; private final String name;
The duration between syncs to disk.
/** * The duration between syncs to disk. */
final long syncIntervalNanos;
The duration between updating the chained markers in the the commit log file. This value should be 0 < markerIntervalNanos <= syncIntervalNanos.
/** * The duration between updating the chained markers in the the commit log file. This value should be * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}. */
final long markerIntervalNanos;
A flag that callers outside of the sync thread can use to signal they want the commitlog segments to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires an immediate flush to disk on every mutation; see BatchCommitLogService.maybeWaitForSync(Allocation).
/** * A flag that callers outside of the sync thread can use to signal they want the commitlog segments * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}. */
private volatile boolean syncRequested; private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
CommitLogService provides a fsync service for Allocations, fulfilling either the Batch or Periodic contract. Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
/** * CommitLogService provides a fsync service for Allocations, fulfilling either the * Batch or Periodic contract. * * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. */
AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis) { this (commitLog, name, syncIntervalMillis, false); }
CommitLogService provides a fsync service for Allocations, fulfilling either the Batch or Periodic contract. Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
Params:
  • markHeadersFaster – true if the chained markers should be updated more frequently than on the disk sync bounds.
/** * CommitLogService provides a fsync service for Allocations, fulfilling either the * Batch or Periodic contract. * * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. * * @param markHeadersFaster true if the chained markers should be updated more frequently than on the disk sync bounds. */
AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis, boolean markHeadersFaster) { this.commitLog = commitLog; this.name = name; final long markerIntervalMillis; if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS) { markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS; long modulo = syncIntervalMillis % markerIntervalMillis; if (modulo != 0) { // quantize syncIntervalMillis to a multiple of markerIntervalMillis syncIntervalMillis -= modulo; if (modulo >= markerIntervalMillis / 2) syncIntervalMillis += markerIntervalMillis; } logger.debug("Will update the commitlog markers every {}ms and flush every {}ms", markerIntervalMillis, syncIntervalMillis); } else { markerIntervalMillis = syncIntervalMillis; } assert syncIntervalMillis % markerIntervalMillis == 0; this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS); this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS); } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { if (syncIntervalNanos < 1) throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", syncIntervalNanos * 1e-6)); shutdown = false; Runnable runnable = new SyncRunnable(new Clock()); thread = NamedThreadFactory.createThread(runnable, name); thread.start(); } class SyncRunnable implements Runnable { private final Clock clock; private long firstLagAt = 0; private long totalSyncDuration = 0; // total time spent syncing since firstLagAt private long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt private int lagCount = 0; private int syncCount = 0; SyncRunnable(Clock clock) { this.clock = clock; } public void run() { while (true) { if (!sync()) break; } } boolean sync() { // always run once after shutdown signalled boolean shutdownRequested = shutdown; try { // sync and signal long pollStarted = clock.nanoTime(); boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested; if (flushToDisk) { // in this branch, we want to flush the commit log to disk syncRequested = false; commitLog.sync(true); lastSyncedAt = pollStarted; syncComplete.signalAll(); syncCount++; } else { // in this branch, just update the commit log sync headers commitLog.sync(false); } long now = clock.nanoTime(); if (flushToDisk) maybeLogFlushLag(pollStarted, now); if (shutdownRequested) return false; long wakeUpAt = pollStarted + markerIntervalNanos; if (wakeUpAt > now) LockSupport.parkNanos(wakeUpAt - now); } catch (Throwable t) { if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) return false; // sleep for full poll-interval after an error, so we don't spam the log file LockSupport.parkNanos(markerIntervalNanos); } return true; }
Add a log entry whenever the time to flush the commit log to disk exceeds AbstractCommitLogService.syncIntervalNanos.
/** * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}. */
@VisibleForTesting boolean maybeLogFlushLag(long pollStarted, long now) { long flushDuration = now - pollStarted; totalSyncDuration += flushDuration; // this is the timestamp by which we should have completed the flush long maxFlushTimestamp = pollStarted + syncIntervalNanos; if (maxFlushTimestamp > now) return false; // if we have lagged noticeably, update our lag counter if (firstLagAt == 0) { firstLagAt = now; syncExceededIntervalBy = lagCount = 0; syncCount = 1; totalSyncDuration = flushDuration; } syncExceededIntervalBy += now - maxFlushTimestamp; lagCount++; if (firstLagAt > 0) { //Only reset the lag tracking if it actually logged this time boolean logged = NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 5, TimeUnit.MINUTES, "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", syncCount, String.format("%.2f", (now - firstLagAt) * 1e-9d), String.format("%.2f", totalSyncDuration * 1e-6d / syncCount), lagCount, String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount)); if (logged) firstLagAt = 0; } return true; } @VisibleForTesting long getTotalSyncDuration() { return totalSyncDuration; } }
Block for @param alloc to be sync'd as necessary, and handle bookkeeping
/** * Block for @param alloc to be sync'd as necessary, and handle bookkeeping */
public void finishWriteFor(Allocation alloc) { maybeWaitForSync(alloc); written.incrementAndGet(); } protected abstract void maybeWaitForSync(Allocation alloc);
Request an additional sync cycle without blocking.
/** * Request an additional sync cycle without blocking. */
void requestExtraSync() { syncRequested = true; LockSupport.unpark(thread); } public void shutdown() { shutdown = true; requestExtraSync(); }
Request sync and wait until the current state is synced. Note: If a sync is in progress at the time of this request, the call will return after both it and a cycle initiated immediately afterwards complete.
/** * Request sync and wait until the current state is synced. * * Note: If a sync is in progress at the time of this request, the call will return after both it and a cycle * initiated immediately afterwards complete. */
public void syncBlocking() { long requestTime = System.nanoTime(); requestExtraSync(); awaitSyncAt(requestTime, null); } void awaitSyncAt(long syncTime, Context context) { do { WaitQueue.Signal signal = context != null ? syncComplete.register(context) : syncComplete.register(); if (lastSyncedAt < syncTime) signal.awaitUninterruptibly(); else signal.cancel(); } while (lastSyncedAt < syncTime); } public void awaitTermination() throws InterruptedException { thread.join(); } public long getCompletedTasks() { return written.get(); } public long getPendingTasks() { return pending.get(); } }