package org.apache.cassandra.gms;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.net:type=Gossiper";
public static class Props
{
public static final String DISABLE_THREAD_VALIDATION = "cassandra.gossip.disable_thread_validation";
}
private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
static
{
SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
}
private volatile ScheduledFuture<?> scheduledGossipTask;
private static final ReentrantLock taskLock = new ReentrantLock();
public final static int intervalInMillis = 1000;
public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
public static final Gossiper instance = new Gossiper();
volatile long firstSynSendAt = 0L;
public static final long aVeryLongTime = 259200 * 1000;
static final int MAX_GENERATION_DIFFERENCE = 86400 * 365;
private long fatClientTimeout;
private final Random random = new Random();
private final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>()
{
public int compare(InetAddress addr1, InetAddress addr2)
{
return addr1.getHostAddress().compareTo(addr2.getHostAddress());
}
};
private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
private final Set<InetAddress> liveEndpoints = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
@VisibleForTesting
final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
private final Map<InetAddress, Long> justRemovedEndpoints = new ConcurrentHashMap<InetAddress, Long>();
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean anyNodeOn30 = false;
private volatile boolean inShadowRound = false;
private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private static FastThreadLocal<Boolean> isGossipStage = new FastThreadLocal<>();
private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION);
private static boolean isInGossipStage()
{
Boolean isGossip = isGossipStage.get();
if (isGossip == null)
{
isGossip = Thread.currentThread().getName().contains(Stage.GOSSIP.getJmxName());
isGossipStage.set(isGossip);
}
return isGossip;
}
private static void checkProperThreadForStateMutation()
{
if (disableThreadValidation || isInGossipStage())
return;
IllegalStateException e = new IllegalStateException("Attempting gossip state mutation from illegal thread: " + Thread.currentThread().getName());
if (DatabaseDescriptor.strictRuntimeChecks())
{
throw e;
}
else
{
noSpamLogger.getStatement(Throwables.getStackTraceAsString(e)).error(e.getMessage(), e);
}
}
private class GossipTask implements Runnable
{
public void run()
{
try
{
MessagingService.instance().waitUntilListening();
taskLock.lock();
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
if (logger.isTraceEnabled())
logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests);
if (gDigests.size() > 0)
{
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
DatabaseDescriptor.getPartitionerName(),
gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
digestSynMessage,
GossipDigestSyn.serializer);
boolean gossipedToSeed = doGossipToLiveMember(message);
maybeGossipToUnreachableMember(message);
if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
maybeGossipToSeed(message);
doStatusCheck();
}
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Gossip error", e);
}
finally
{
taskLock.unlock();
}
}
}
private Gossiper()
{
fatClientTimeout = (QUARANTINE_DELAY / 2);
FailureDetector.instance.registerFailureDetectionEventListener(this);
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
public void setLastProcessedMessageAt(long timeInMillis)
{
this.lastProcessedMessageAt = timeInMillis;
}
public boolean seenAnySeed()
{
for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
{
if (seeds.contains(entry.getKey()))
return true;
try
{
VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value)))
return true;
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
return false;
}
public void register(IEndpointStateChangeSubscriber subscriber)
{
subscribers.add(subscriber);
}
public void unregister(IEndpointStateChangeSubscriber subscriber)
{
subscribers.remove(subscriber);
}
public Set<InetAddress> getLiveMembers()
{
Set<InetAddress> liveMembers = new HashSet<>(liveEndpoints);
if (!liveMembers.contains(FBUtilities.getBroadcastAddress()))
liveMembers.add(FBUtilities.getBroadcastAddress());
return liveMembers;
}
public Set<InetAddress> getLiveTokenOwners()
{
return StorageService.instance.getLiveRingMembers(true);
}
public Set<InetAddress> getUnreachableMembers()
{
return unreachableEndpoints.keySet();
}
public Set<InetAddress> getUnreachableTokenOwners()
{
Set<InetAddress> tokenOwners = new HashSet<>();
for (InetAddress endpoint : unreachableEndpoints.keySet())
{
if (StorageService.instance.getTokenMetadata().isMember(endpoint))
tokenOwners.add(endpoint);
}
return tokenOwners;
}
public long getEndpointDowntime(InetAddress ep)
{
Long downtime = unreachableEndpoints.get(ep);
if (downtime != null)
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - downtime);
else
return 0L;
}
private boolean isShutdown(InetAddress endpoint)
{
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
return false;
if (epState.getApplicationState(ApplicationState.STATUS) == null)
return false;
String value = epState.getApplicationState(ApplicationState.STATUS).value;
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
assert (pieces.length > 0);
String state = pieces[0];
return state.equals(VersionedValue.SHUTDOWN);
}
public static void runInGossipStageBlocking(Runnable runnable)
{
if (isInGossipStage())
{
runnable.run();
return;
}
ListenableFutureTask task = ListenableFutureTask.create(runnable, null);
StageManager.getStage(Stage.GOSSIP).execute(task);
try
{
task.get();
}
catch (InterruptedException | ExecutionException e)
{
throw new AssertionError(e);
}
}
public void convict(InetAddress endpoint, double phi)
{
runInGossipStageBlocking(() -> {
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
return;
if (!epState.isAlive())
return;
logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
if (isShutdown(endpoint))
{
markAsShutdown(endpoint);
}
else
{
markDead(endpoint, epState);
}
});
}
protected void markAsShutdown(InetAddress endpoint)
{
checkProperThreadForStateMutation();
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
return;
epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
markDead(endpoint, epState);
FailureDetector.instance.forceConviction(endpoint);
}
int getMaxEndpointStateVersion(EndpointState epState)
{
int maxVersion = epState.getHeartBeatState().getHeartBeatVersion();
for (Map.Entry<ApplicationState, VersionedValue> state : epState.states())
maxVersion = Math.max(maxVersion, state.getValue().version);
return maxVersion;
}
private void evictFromMembership(InetAddress endpoint)
{
checkProperThreadForStateMutation();
unreachableEndpoints.remove(endpoint);
endpointStateMap.remove(endpoint);
expireTimeEndpointMap.remove(endpoint);
FailureDetector.instance.remove(endpoint);
quarantineEndpoint(endpoint);
if (logger.isDebugEnabled())
logger.debug("evicting {} from gossip", endpoint);
}
public void removeEndpoint(InetAddress endpoint)
{
checkProperThreadForStateMutation();
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onRemove(endpoint);
if(seeds.contains(endpoint))
{
buildSeedsList();
seeds.remove(endpoint);
logger.info("removed {} from seeds, updated seeds list = {}", endpoint, seeds);
}
liveEndpoints.remove(endpoint);
unreachableEndpoints.remove(endpoint);
MessagingService.instance().resetVersion(endpoint);
quarantineEndpoint(endpoint);
MessagingService.instance().destroyConnectionPool(endpoint);
if (logger.isDebugEnabled())
logger.debug("removing endpoint {}", endpoint);
}
private void quarantineEndpoint(InetAddress endpoint)
{
quarantineEndpoint(endpoint, System.currentTimeMillis());
}
private void quarantineEndpoint(InetAddress endpoint, long quarantineExpiration)
{
justRemovedEndpoints.put(endpoint, quarantineExpiration);
}
public void replacementQuarantine(InetAddress endpoint)
{
logger.debug("");
quarantineEndpoint(endpoint, System.currentTimeMillis() + QUARANTINE_DELAY);
}
public void replacedEndpoint(InetAddress endpoint)
{
checkProperThreadForStateMutation();
removeEndpoint(endpoint);
evictFromMembership(endpoint);
replacementQuarantine(endpoint);
}
private void makeRandomGossipDigest(List<GossipDigest> gDigests)
{
EndpointState epState;
int generation = 0;
int maxVersion = 0;
List<InetAddress> endpoints = new ArrayList<InetAddress>(endpointStateMap.keySet());
Collections.shuffle(endpoints, random);
for (InetAddress endpoint : endpoints)
{
epState = endpointStateMap.get(endpoint);
if (epState != null)
{
generation = epState.getHeartBeatState().getGeneration();
maxVersion = getMaxEndpointStateVersion(epState);
}
gDigests.add(new GossipDigest(endpoint, generation, maxVersion));
}
if (logger.isTraceEnabled())
{
StringBuilder sb = new StringBuilder();
for (GossipDigest gDigest : gDigests)
{
sb.append(gDigest);
sb.append(" ");
}
logger.trace("Gossip Digests are : {}", sb);
}
}
public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID localHostId)
{
EndpointState epState = endpointStateMap.get(endpoint);
int generation = epState.getHeartBeatState().getGeneration();
logger.info("Removing host: {}", hostId);
logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint);
Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
epState = endpointStateMap.get(endpoint);
if (epState.getHeartBeatState().getGeneration() != generation)
throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
logger.info("Advertising removal for {}", endpoint);
epState.updateTimestamp();
epState.getHeartBeatState().forceNewerGenerationUnsafe();
Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
epState.addApplicationStates(states);
endpointStateMap.put(endpoint, epState);
}
public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId)
{
EndpointState epState = endpointStateMap.get(endpoint);
epState.updateTimestamp();
epState.getHeartBeatState().forceNewerGenerationUnsafe();
long expireTime = computeExpireTime();
epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
logger.info("Completing removal of {}", endpoint);
addExpireTimeForEndpoint(endpoint, expireTime);
endpointStateMap.put(endpoint, epState);
Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS);
}
public void unsafeAssassinateEndpoint(String address) throws UnknownHostException
{
logger.warn("Gossiper.unsafeAssassinateEndpoint is deprecated and will be removed in the next release; use assassinateEndpoint instead");
assassinateEndpoint(address);
}
public void assassinateEndpoint(String address) throws UnknownHostException
{
InetAddress endpoint = InetAddress.getByName(address);
runInGossipStageBlocking(() -> {
EndpointState epState = endpointStateMap.get(endpoint);
Collection<Token> tokens = null;
logger.warn("Assassinating {} via gossip", endpoint);
if (epState == null)
{
epState = new EndpointState(new HeartBeatState((int) ((System.currentTimeMillis() + 60000) / 1000), 9999));
}
else
{
int generation = epState.getHeartBeatState().getGeneration();
int heartbeat = epState.getHeartBeatState().getHeartBeatVersion();
logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint);
Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
EndpointState newState = endpointStateMap.get(endpoint);
if (newState == null)
logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint);
else if (newState.getHeartBeatState().getGeneration() != generation)
throw new RuntimeException("Endpoint still alive: " + endpoint + " generation changed while trying to assassinate it");
else if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat)
throw new RuntimeException("Endpoint still alive: " + endpoint + " heartbeat changed while trying to assassinate it");
epState.updateTimestamp();
epState.getHeartBeatState().forceNewerGenerationUnsafe();
}
try
{
tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint);
}
catch (Throwable th)
{
JVMStabilityInspector.inspectThrowable(th);
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
}
epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
handleMajorStateChange(endpoint, epState);
Uninterruptibles.sleepUninterruptibly(intervalInMillis * 4, TimeUnit.MILLISECONDS);
logger.warn("Finished assassinating {}", endpoint);
});
}
public boolean isKnownEndpoint(InetAddress endpoint)
{
return endpointStateMap.containsKey(endpoint);
}
public int getCurrentGenerationNumber(InetAddress endpoint)
{
return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration();
}
private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddress> epSet)
{
List<InetAddress> liveEndpoints = ImmutableList.copyOf(epSet);
int size = liveEndpoints.size();
if (size < 1)
return false;
int index = (size == 1) ? 0 : random.nextInt(size);
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
if (firstSynSendAt == 0)
firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
private boolean doGossipToLiveMember(MessageOut<GossipDigestSyn> message)
{
int size = liveEndpoints.size();
if (size == 0)
return false;
return sendGossip(message, liveEndpoints);
}
private void maybeGossipToUnreachableMember(MessageOut<GossipDigestSyn> message)
{
double liveEndpointCount = liveEndpoints.size();
double unreachableEndpointCount = unreachableEndpoints.size();
if (unreachableEndpointCount > 0)
{
double prob = unreachableEndpointCount / (liveEndpointCount + 1);
double randDbl = random.nextDouble();
if (randDbl < prob)
sendGossip(message, unreachableEndpoints.keySet());
}
}
private void maybeGossipToSeed(MessageOut<GossipDigestSyn> prod)
{
int size = seeds.size();
if (size > 0)
{
if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddress()))
{
return;
}
if (liveEndpoints.size() == 0)
{
sendGossip(prod, seeds);
}
else
{
double probability = seeds.size() / (double) (liveEndpoints.size() + unreachableEndpoints.size());
double randDbl = random.nextDouble();
if (randDbl <= probability)
sendGossip(prod, seeds);
}
}
}
public boolean isGossipOnlyMember(InetAddress endpoint)
{
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
{
return false;
}
return !isDeadState(epState) && !StorageService.instance.getTokenMetadata().isMember(endpoint);
}
public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping,
Map<InetAddress, EndpointState> epStates)
{
EndpointState epState = epStates.get(endpoint);
if (epState == null || isDeadState(epState))
return true;
if (isBootstrapping)
{
String status = getGossipStatus(epState);
final List<String> unsafeStatuses = new ArrayList<String>()
{{
add("");
add(VersionedValue.STATUS_NORMAL);
add(VersionedValue.SHUTDOWN);
}};
return !unsafeStatuses.contains(status);
}
else
{
VersionedValue previous = epState.getApplicationState(ApplicationState.HOST_ID);
return UUID.fromString(previous.value).equals(localHostUUID);
}
}
private void doStatusCheck()
{
if (logger.isTraceEnabled())
logger.trace("Performing status check ...");
long now = System.currentTimeMillis();
long nowNano = System.nanoTime();
long pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).metrics.pendingTasks.getValue();
if (pending > 0 && lastProcessedMessageAt < now - 1000)
{
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
if (lastProcessedMessageAt < now - 1000)
{
logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending);
return;
}
}
Set<InetAddress> eps = endpointStateMap.keySet();
for (InetAddress endpoint : eps)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
continue;
FailureDetector.instance.interpret(endpoint);
EndpointState epState = endpointStateMap.get(endpoint);
if (epState != null)
{
if (isGossipOnlyMember(endpoint)
&& !justRemovedEndpoints.containsKey(endpoint)
&& TimeUnit.NANOSECONDS.toMillis(nowNano - epState.getUpdateTimestamp()) > fatClientTimeout)
{
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fatClientTimeout);
runInGossipStageBlocking(() -> {
removeEndpoint(endpoint);
evictFromMembership(endpoint);
});
}
long expireTime = getExpireTimeForEndpoint(endpoint);
if (!epState.isAlive() && (now > expireTime)
&& (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
{
if (logger.isDebugEnabled())
{
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);
}
evictFromMembership(endpoint);
}
}
}
if (!justRemovedEndpoints.isEmpty())
{
for (Entry<InetAddress, Long> entry : justRemovedEndpoints.entrySet())
{
if ((now - entry.getValue()) > QUARANTINE_DELAY)
{
if (logger.isDebugEnabled())
logger.debug("{} elapsed, {} gossip quarantine over", QUARANTINE_DELAY, entry.getKey());
justRemovedEndpoints.remove(entry.getKey());
}
}
}
}
protected long getExpireTimeForEndpoint(InetAddress endpoint)
{
Long storedTime = expireTimeEndpointMap.get(endpoint);
return storedTime == null ? computeExpireTime() : storedTime;
}
public EndpointState getEndpointStateForEndpoint(InetAddress ep)
{
return endpointStateMap.get(ep);
}
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
}
public UUID getHostId(InetAddress endpoint)
{
return getHostId(endpoint, endpointStateMap);
}
public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
{
EndpointState epState = endpointStateMap.get(forEndpoint);
EndpointState reqdEndpointState = null;
if (epState != null)
{
HeartBeatState heartBeatState = epState.getHeartBeatState();
int localHbGeneration = heartBeatState.getGeneration();
int localHbVersion = heartBeatState.getHeartBeatVersion();
if (localHbVersion > version)
{
reqdEndpointState = new EndpointState(new HeartBeatState(localHbGeneration, localHbVersion));
if (logger.isTraceEnabled())
logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion, version, forEndpoint);
}
Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
for (Entry<ApplicationState, VersionedValue> entry : epState.states())
{
VersionedValue value = entry.getValue();
if (value.version > version)
{
if (reqdEndpointState == null)
{
reqdEndpointState = new EndpointState(new HeartBeatState(localHbGeneration, localHbVersion));
}
final ApplicationState key = entry.getKey();
if (logger.isTraceEnabled())
logger.trace("Adding state {}: {}" , key, value.value);
states.put(key, value);
}
}
if (reqdEndpointState != null)
reqdEndpointState.addApplicationStates(states);
}
return reqdEndpointState;
}
public int compareEndpointStartup(InetAddress addr1, InetAddress addr2)
{
EndpointState ep1 = getEndpointStateForEndpoint(addr1);
EndpointState ep2 = getEndpointStateForEndpoint(addr2);
assert ep1 != null && ep2 != null;
return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration();
}
void notifyFailureDetector(Map<InetAddress, EndpointState> remoteEpStateMap)
{
for (Entry<InetAddress, EndpointState> entry : remoteEpStateMap.entrySet())
{
notifyFailureDetector(entry.getKey(), entry.getValue());
}
}
void notifyFailureDetector(InetAddress endpoint, EndpointState remoteEndpointState)
{
EndpointState localEndpointState = endpointStateMap.get(endpoint);
if (localEndpointState != null)
{
IFailureDetector fd = FailureDetector.instance;
int localGeneration = localEndpointState.getHeartBeatState().getGeneration();
int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
if (remoteGeneration > localGeneration)
{
localEndpointState.updateTimestamp();
if (!localEndpointState.isAlive())
{
logger.debug("Clearing interval times for {} due to generation change", endpoint);
fd.remove(endpoint);
}
fd.report(endpoint);
return;
}
if (remoteGeneration == localGeneration)
{
int localVersion = getMaxEndpointStateVersion(localEndpointState);
int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
if (remoteVersion > localVersion)
{
localEndpointState.updateTimestamp();
fd.report(endpoint);
}
}
}
}
private void markAlive(final InetAddress addr, final EndpointState localState)
{
if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20)
{
realMarkAlive(addr, localState);
return;
}
localState.markDead();
MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);
logger.trace("Sending a EchoMessage to {}", addr);
IAsyncCallback echoHandler = new IAsyncCallback()
{
public boolean isLatencyForSnitch()
{
return false;
}
public void response(MessageIn msg)
{
runInGossipStageBlocking(() -> realMarkAlive(addr, localState));
}
};
MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
}
@VisibleForTesting
public void realMarkAlive(final InetAddress addr, final EndpointState localState)
{
checkProperThreadForStateMutation();
if (logger.isTraceEnabled())
logger.trace("marking as alive {}", addr);
localState.markAlive();
localState.updateTimestamp();
liveEndpoints.add(addr);
unreachableEndpoints.remove(addr);
expireTimeEndpointMap.remove(addr);
logger.debug("removing expire time for endpoint : {}", addr);
logger.info("InetAddress {} is now UP", addr);
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onAlive(addr, localState);
if (logger.isTraceEnabled())
logger.trace("Notified {}", subscribers);
}
@VisibleForTesting
public void markDead(InetAddress addr, EndpointState localState)
{
checkProperThreadForStateMutation();
if (logger.isTraceEnabled())
logger.trace("marking as down {}", addr);
localState.markDead();
liveEndpoints.remove(addr);
unreachableEndpoints.put(addr, System.nanoTime());
logger.info("InetAddress {} is now DOWN", addr);
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onDead(addr, localState);
if (logger.isTraceEnabled())
logger.trace("Notified {}", subscribers);
}
private void handleMajorStateChange(InetAddress ep, EndpointState epState)
{
checkProperThreadForStateMutation();
EndpointState localEpState = endpointStateMap.get(ep);
if (!isDeadState(epState))
{
if (localEpState != null)
logger.info("Node {} has restarted, now UP", ep);
else
logger.info("Node {} is now part of the cluster", ep);
}
if (logger.isTraceEnabled())
logger.trace("Adding endpoint state for {}", ep);
endpointStateMap.put(ep, epState);
if (localEpState != null)
{
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onRestart(ep, localEpState);
}
if (!isDeadState(epState))
markAlive(ep, epState);
else
{
logger.debug("Not marking {} alive due to dead state", ep);
markDead(ep, epState);
}
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onJoin(ep, epState);
if (isShutdown(ep))
markAsShutdown(ep);
}
public boolean isAlive(InetAddress endpoint)
{
EndpointState epState = getEndpointStateForEndpoint(endpoint);
if (epState == null)
return false;
return epState.isAlive() && !isDeadState(epState);
}
public boolean isDeadState(EndpointState epState)
{
String status = getGossipStatus(epState);
if (status.isEmpty())
return false;
return DEAD_STATES.contains(status);
}
public boolean isSilentShutdownState(EndpointState epState)
{
String status = getGossipStatus(epState);
if (status.isEmpty())
return false;
return SILENT_SHUTDOWN_STATES.contains(status);
}
private static String getGossipStatus(EndpointState epState)
{
if (epState == null || epState.getApplicationState(ApplicationState.STATUS) == null)
return "";
String value = epState.getApplicationState(ApplicationState.STATUS).value;
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
assert (pieces.length > 0);
return pieces[0];
}
void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
{
checkProperThreadForStateMutation();
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound())
continue;
if (justRemovedEndpoints.containsKey(ep))
{
if (logger.isTraceEnabled())
logger.trace("Ignoring gossip for {} because it is quarantined", ep);
continue;
}
EndpointState localEpStatePtr = endpointStateMap.get(ep);
EndpointState remoteState = entry.getValue();
if (localEpStatePtr != null)
{
int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
long localTime = System.currentTimeMillis()/1000;
if (logger.isTraceEnabled())
logger.trace("{} local generation {}, remote generation {}", ep, localGeneration, remoteGeneration);
if (remoteGeneration > localTime + MAX_GENERATION_DIFFERENCE)
{
logger.warn("received an invalid gossip generation for peer {}; local time = {}, received generation = {}", ep, localTime, remoteGeneration);
}
else if (remoteGeneration > localGeneration)
{
if (logger.isTraceEnabled())
logger.trace("Updating heartbeat state generation to {} from {} for {}", remoteGeneration, localGeneration, ep);
handleMajorStateChange(ep, remoteState);
}
else if (remoteGeneration == localGeneration)
{
int localMaxVersion = getMaxEndpointStateVersion(localEpStatePtr);
int remoteMaxVersion = getMaxEndpointStateVersion(remoteState);
if (remoteMaxVersion > localMaxVersion)
{
applyNewStates(ep, localEpStatePtr, remoteState);
}
else if (logger.isTraceEnabled())
logger.trace("Ignoring remote version {} <= {} for {}", remoteMaxVersion, localMaxVersion, ep);
if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr))
markAlive(ep, localEpStatePtr);
}
else
{
if (logger.isTraceEnabled())
logger.trace("Ignoring remote generation {} < {}", remoteGeneration, localGeneration);
}
}
else
{
FailureDetector.instance.report(ep);
handleMajorStateChange(ep, remoteState);
}
}
boolean any30 = anyEndpointOn30();
if (any30 != anyNodeOn30)
{
logger.info(any30
? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version"
: "There are no 3.0 nodes in the cluster - will store and announce real schema version");
anyNodeOn30 = any30;
executor.submit(Schema.instance::updateVersionAndAnnounce);
}
}
private boolean anyEndpointOn30()
{
return endpointStateMap.values()
.stream()
.map(EndpointState::getReleaseVersion)
.filter(Objects::nonNull)
.anyMatch(CassandraVersion::is30);
}
private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
{
int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
localState.setHeartBeatState(remoteState.getHeartBeatState());
if (logger.isTraceEnabled())
logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr);
Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
VersionedValue local = localState.getApplicationState(entry.getKey());
return (local == null || local.version < entry.getValue().version);
}).collect(Collectors.toSet());
if (logger.isTraceEnabled() && updatedStates.size() > 0)
{
for (Entry<ApplicationState, VersionedValue> entry : updatedStates)
{
logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
}
}
localState.addApplicationStates(updatedStates);
for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
}
private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue)
{
for (IEndpointStateChangeSubscriber subscriber : subscribers)
{
subscriber.beforeChange(addr, epState, apState, newValue);
}
}
private void doOnChangeNotifications(InetAddress addr, ApplicationState state, VersionedValue value)
{
for (IEndpointStateChangeSubscriber subscriber : subscribers)
{
subscriber.onChange(addr, state, value);
}
}
private void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
{
deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0));
if (logger.isTraceEnabled())
logger.trace("requestAll for {}", gDigest.getEndpoint());
}
private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap, int maxRemoteVersion)
{
EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion);
if (localEpStatePtr != null)
deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);
}
void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
{
if (gDigestList.size() == 0)
{
logger.debug("Shadow request received, adding all states");
for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
{
gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
}
}
for ( GossipDigest gDigest : gDigestList )
{
int remoteGeneration = gDigest.getGeneration();
int maxRemoteVersion = gDigest.getMaxVersion();
EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint());
if (epStatePtr != null)
{
int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr);
if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion)
continue;
if (remoteGeneration > localGeneration)
{
requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
}
else if (remoteGeneration < localGeneration)
{
sendAll(gDigest, deltaEpStateMap, 0);
}
else if (remoteGeneration == localGeneration)
{
if (maxRemoteVersion > maxLocalVersion)
{
deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion));
}
else if (maxRemoteVersion < maxLocalVersion)
{
sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
}
}
}
else
{
requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
}
}
}
public void start(int generationNumber)
{
start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
}
public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
{
buildSeedsList();
maybeInitializeLocalState(generationNbr);
EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
localState.addApplicationStates(preloadLocalStates);
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
if (logger.isTraceEnabled())
logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());
scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
Gossiper.intervalInMillis,
Gossiper.intervalInMillis,
TimeUnit.MILLISECONDS);
}
public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
return doShadowRound(Collections.EMPTY_SET);
}
public synchronized Map<InetAddress, EndpointState> doShadowRound(Set<InetAddress> peers)
{
buildSeedsList();
if (seeds.isEmpty() && peers.isEmpty())
return endpointShadowStateMap;
boolean isSeed = DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
int shadowRoundDelay = isSeed ? StorageService.RING_DELAY : StorageService.RING_DELAY * 2;
seedsInShadowRound.clear();
endpointShadowStateMap.clear();
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
DatabaseDescriptor.getPartitionerName(),
gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
digestSynMessage,
GossipDigestSyn.serializer);
inShadowRound = true;
boolean includePeers = false;
int slept = 0;
try
{
while (true)
{
if (slept % 5000 == 0)
{
logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds);
for (InetAddress seed : seeds)
MessagingService.instance().sendOneWay(message, seed);
if (includePeers)
{
logger.trace("Sending shadow round GOSSIP DIGEST SYN to known peers {}", peers);
for (InetAddress peer : peers)
MessagingService.instance().sendOneWay(message, peer);
}
includePeers = true;
}
Thread.sleep(1000);
if (!inShadowRound)
break;
slept += 1000;
if (slept > shadowRoundDelay)
{
if (!isSeed)
throw new RuntimeException("Unable to gossip with any peers");
inShadowRound = false;
break;
}
}
}
catch (InterruptedException wtf)
{
throw new RuntimeException(wtf);
}
return ImmutableMap.copyOf(endpointShadowStateMap);
}
@VisibleForTesting
void buildSeedsList()
{
for (InetAddress seed : DatabaseDescriptor.getSeeds())
{
if (seed.equals(FBUtilities.getBroadcastAddress()))
continue;
seeds.add(seed);
}
}
public void maybeInitializeLocalState(int generationNbr)
{
HeartBeatState hbState = new HeartBeatState(generationNbr);
EndpointState localState = new EndpointState(hbState);
localState.markAlive();
endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState);
}
public void forceNewerGeneration()
{
EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddress());
epstate.getHeartBeatState().forceNewerGenerationUnsafe();
}
public void addSavedEndpoint(InetAddress ep)
{
checkProperThreadForStateMutation();
if (ep.equals(FBUtilities.getBroadcastAddress()))
{
logger.debug("Attempt to add self as saved endpoint");
return;
}
EndpointState epState = endpointStateMap.get(ep);
if (epState != null)
{
logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState);
epState.setHeartBeatState(new HeartBeatState(0));
}
else
{
epState = new EndpointState(new HeartBeatState(0));
}
epState.markDead();
endpointStateMap.put(ep, epState);
unreachableEndpoints.put(ep, System.nanoTime());
if (logger.isTraceEnabled())
logger.trace("Adding saved endpoint {} {}", ep, epState.getHeartBeatState().getGeneration());
}
private void addLocalApplicationStateInternal(ApplicationState state, VersionedValue value)
{
assert taskLock.isHeldByCurrentThread();
EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
InetAddress epAddr = FBUtilities.getBroadcastAddress();
assert epState != null;
doBeforeChangeNotifications(epAddr, epState, state, value);
value = StorageService.instance.valueFactory.cloneWithHigherVersion(value);
epState.addApplicationState(state, value);
doOnChangeNotifications(epAddr, state, value);
}
public void addLocalApplicationState(ApplicationState applicationState, VersionedValue value)
{
addLocalApplicationStates(Arrays.asList(Pair.create(applicationState, value)));
}
public void addLocalApplicationStates(List<Pair<ApplicationState, VersionedValue>> states)
{
taskLock.lock();
try
{
for (Pair<ApplicationState, VersionedValue> pair : states)
{
addLocalApplicationStateInternal(pair.left, pair.right);
}
}
finally
{
taskLock.unlock();
}
}
public void stop()
{
EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddress());
if (mystate != null && !isSilentShutdownState(mystate) && StorageService.instance.isJoined())
{
logger.info("Announcing shutdown");
addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN);
for (InetAddress ep : liveEndpoints)
MessagingService.instance().sendOneWay(message, ep);
Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS);
}
else
logger.warn("No local state, state is in silent shutdown, or node hasn't joined, not announcing shutdown");
if (scheduledGossipTask != null)
scheduledGossipTask.cancel(false);
}
public boolean isEnabled()
{
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
public boolean isAnyNodeOn30()
{
return anyNodeOn30;
}
protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
{
if (!isInShadowRound)
{
if (!seeds.contains(respondent))
logger.warn("Received an ack from {}, who isn't a seed. Ensure your seed list includes a live node. Exiting shadow round",
respondent);
logger.debug("Received a regular ack from {}, can now exit shadow round", respondent);
endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
seedsInShadowRound.clear();
}
else
{
logger.debug("Received an ack from {} indicating it is also in shadow round", respondent);
seedsInShadowRound.add(respondent);
if (seedsInShadowRound.containsAll(seeds))
{
logger.debug("All seeds are in a shadow round, clearing this node to exit its own");
inShadowRound = false;
seedsInShadowRound.clear();
}
}
}
}
protected boolean isInShadowRound()
{
return inShadowRound;
}
@VisibleForTesting
public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
{
HeartBeatState hbState = new HeartBeatState(generationNbr);
EndpointState newState = new EndpointState(hbState);
newState.markAlive();
EndpointState oldState = endpointStateMap.putIfAbsent(addr, newState);
EndpointState localState = oldState == null ? newState : oldState;
Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
localState.addApplicationStates(states);
}
@VisibleForTesting
public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value)
{
EndpointState localState = endpointStateMap.get(endpoint);
localState.addApplicationState(state, value);
}
public long getEndpointDowntime(String address) throws UnknownHostException
{
return getEndpointDowntime(InetAddress.getByName(address));
}
public int getCurrentGenerationNumber(String address) throws UnknownHostException
{
return getCurrentGenerationNumber(InetAddress.getByName(address));
}
public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime)
{
if (logger.isDebugEnabled())
{
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime);
}
expireTimeEndpointMap.put(endpoint, expireTime);
}
public static long computeExpireTime()
{
return System.currentTimeMillis() + Gossiper.aVeryLongTime;
}
@Nullable
public CassandraVersion getReleaseVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
return state != null ? state.getReleaseVersion() : null;
}
@Nullable
public UUID getSchemaVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
return state != null ? state.getSchemaVersion() : null;
}
public static void waitToSettle()
{
int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
if (forceAfter == 0)
{
return;
}
final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
logger.info("Waiting for gossip to settle...");
Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
int totalPolls = 0;
int numOkay = 0;
int epSize = Gossiper.instance.getEndpointStates().size();
while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
{
Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
int currentSize = Gossiper.instance.getEndpointStates().size();
totalPolls++;
if (currentSize == epSize)
{
logger.debug("Gossip looks settled.");
numOkay++;
}
else
{
logger.info("Gossip not settled after {} polls.", totalPolls);
numOkay = 0;
}
epSize = currentSize;
if (forceAfter > 0 && totalPolls > forceAfter)
{
logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
totalPolls);
break;
}
}
if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
else
logger.info("No gossip backlog; proceeding");
}
@VisibleForTesting
public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
stop();
ExecutorUtils.shutdownAndWait(timeout, unit, executor);
}
}