/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.locator;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.ExponentiallyDecayingReservoir;

import com.codahale.metrics.Snapshot;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;


A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
/** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector */
public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values private static final int WINDOW_SIZE = 100; private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to // warrant not merging two ranges into a single range private static final double RANGE_MERGING_PREFERENCE = 1.5; private String mbeanName; private boolean registered = false; private volatile HashMap<InetAddress, Double> scores = new HashMap<>(); private final ConcurrentHashMap<InetAddress, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>(); public final IEndpointSnitch subsnitch; private volatile ScheduledFuture<?> updateSchedular; private volatile ScheduledFuture<?> resetSchedular; private final Runnable update; private final Runnable reset; public DynamicEndpointSnitch(IEndpointSnitch snitch) { this(snitch, null); } public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) { mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch"; if (instance != null) mbeanName += ",instance=" + instance; subsnitch = snitch; update = new Runnable() { public void run() { updateScores(); } }; reset = new Runnable() { public void run() { // we do this so that a host considered bad has a chance to recover, otherwise would we never try // to read from it, which would cause its score to never change reset(); } }; if (DatabaseDescriptor.isDaemonInitialized()) { updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS); resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS); registerMBean(); } }
Update configuration from DatabaseDescriptor and estart the update-scheduler and reset-scheduler tasks if the configured rates for these tasks have changed.
/** * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler and reset-scheduler tasks * if the configured rates for these tasks have changed. */
public void applyConfigChanges() { if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval()) { dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); if (DatabaseDescriptor.isDaemonInitialized()) { updateSchedular.cancel(false); updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS); } } if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval()) { dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); if (DatabaseDescriptor.isDaemonInitialized()) { resetSchedular.cancel(false); resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS); } } dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); } private void registerMBean() { MBeanWrapper.instance.registerMBean(this, mbeanName); } public void close() { updateSchedular.cancel(false); resetSchedular.cancel(false); MBeanWrapper.instance.unregisterMBean(mbeanName); } @Override public void gossiperStarting() { subsnitch.gossiperStarting(); } public String getRack(InetAddress endpoint) { return subsnitch.getRack(endpoint); } public String getDatacenter(InetAddress endpoint) { return subsnitch.getDatacenter(endpoint); } public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) { List<InetAddress> list = new ArrayList<InetAddress>(addresses); sortByProximity(address, list); return list; } @Override public void sortByProximity(final InetAddress address, List<InetAddress> addresses) { assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself if (dynamicBadnessThreshold == 0) { sortByProximityWithScore(address, addresses); } else { sortByProximityWithBadness(address, addresses); } } private void sortByProximityWithScore(final InetAddress address, List<InetAddress> addresses) { // Scores can change concurrently from a call to this method. But Collections.sort() expects // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current // version of it during this call. final HashMap<InetAddress, Double> scores = this.scores; Collections.sort(addresses, new Comparator<InetAddress>() { public int compare(InetAddress a1, InetAddress a2) { return compareEndpoints(address, a1, a2, scores); } }); } private void sortByProximityWithBadness(final InetAddress address, List<InetAddress> addresses) { if (addresses.size() < 2) return; subsnitch.sortByProximity(address, addresses); HashMap<InetAddress, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below // (which wouldn't really matter here but its cleaner that way). ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size()); for (InetAddress inet : addresses) { Double score = scores.get(inet); if (score == null) continue; subsnitchOrderedScores.add(score); } // Sort the scores and then compare them (positionally) to the scores in the subsnitch order. // If any of the subsnitch-ordered scores exceed the optimal/sorted score by dynamicBadnessThreshold, use // the score-sorted ordering instead of the subsnitch ordering. ArrayList<Double> sortedScores = new ArrayList<>(subsnitchOrderedScores); Collections.sort(sortedScores); Iterator<Double> sortedScoreIterator = sortedScores.iterator(); for (Double subsnitchScore : subsnitchOrderedScores) { if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold))) { sortByProximityWithScore(address, addresses); return; } } } // Compare endpoints given an immutable snapshot of the scores private int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2, Map<InetAddress, Double> scores) { Double scored1 = scores.get(a1); Double scored2 = scores.get(a2); if (scored1 == null) { scored1 = 0.0; } if (scored2 == null) { scored2 = 0.0; } if (scored1.equals(scored2)) return subsnitch.compareEndpoints(target, a1, a2); if (scored1 < scored2) return -1; else return 1; } public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { // That function is fundamentally unsafe because the scores can change at any time and so the result of that // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in // sortByProximityWithScore(). throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)"); } public void receiveTiming(InetAddress host, long latency) // this is cheap { ExponentiallyDecayingReservoir sample = samples.get(host); if (sample == null) { ExponentiallyDecayingReservoir maybeNewSample = new ExponentiallyDecayingReservoir(WINDOW_SIZE, ALPHA); sample = samples.putIfAbsent(host, maybeNewSample); if (sample == null) sample = maybeNewSample; } sample.update(latency); } private void updateScores() // this is expensive { if (!StorageService.instance.isGossipActive()) return; if (!registered) { if (MessagingService.instance() != null) { MessagingService.instance().register(this); registered = true; } } double maxLatency = 1; Map<InetAddress, Snapshot> snapshots = new HashMap<>(samples.size()); for (Map.Entry<InetAddress, ExponentiallyDecayingReservoir> entry : samples.entrySet()) { snapshots.put(entry.getKey(), entry.getValue().getSnapshot()); } // We're going to weight the latency for each host against the worst one we see, to // arrive at sort of a 'badness percentage' for them. First, find the worst for each: HashMap<InetAddress, Double> newScores = new HashMap<>(); for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet()) { double mean = entry.getValue().getMedian(); if (mean > maxLatency) maxLatency = mean; } // now make another pass to do the weighting based on the maximums we found before for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet()) { double score = entry.getValue().getMedian() / maxLatency; // finally, add the severity without any weighting, since hosts scale this relative to their own load and the size of the task causing the severity. // "Severity" is basically a measure of compaction activity (CASSANDRA-3722). if (USE_SEVERITY) score += getSeverity(entry.getKey()); // lowest score (least amount of badness) wins. newScores.put(entry.getKey(), score); } scores = newScores; } private void reset() { samples.clear(); } public Map<InetAddress, Double> getScores() { return scores; } public int getUpdateInterval() { return dynamicUpdateInterval; } public int getResetInterval() { return dynamicResetInterval; } public double getBadnessThreshold() { return dynamicBadnessThreshold; } public String getSubsnitchClassName() { return subsnitch.getClass().getName(); } public List<Double> dumpTimings(String hostname) throws UnknownHostException { InetAddress host = InetAddress.getByName(hostname); ArrayList<Double> timings = new ArrayList<Double>(); ExponentiallyDecayingReservoir sample = samples.get(host); if (sample != null) { for (double time: sample.getSnapshot().getValues()) timings.add(time); } return timings; } public void setSeverity(double severity) { Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, StorageService.instance.valueFactory.severity(severity)); } private double getSeverity(InetAddress endpoint) { EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null) return 0.0; VersionedValue event = state.getApplicationState(ApplicationState.SEVERITY); if (event == null) return 0.0; return Double.parseDouble(event.value); } public double getSeverity() { return getSeverity(FBUtilities.getBroadcastAddress()); } public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) { if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2)) return false; // skip checking scores in the single-node case if (l1.size() == 1 && l2.size() == 1 && l1.get(0).equals(l2.get(0))) return true; // Make sure we return the subsnitch decision (i.e true if we're here) if we lack too much scores double maxMerged = maxScore(merged); double maxL1 = maxScore(l1); double maxL2 = maxScore(l2); if (maxMerged < 0 || maxL1 < 0 || maxL2 < 0) return true; return maxMerged <= (maxL1 + maxL2) * RANGE_MERGING_PREFERENCE; } // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score. private double maxScore(List<InetAddress> endpoints) { double maxScore = -1.0; for (InetAddress endpoint : endpoints) { Double score = scores.get(endpoint); if (score == null) continue; if (score > maxScore) maxScore = score; } return maxScore; } }