/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.loadbalancing;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeFilterHelper;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper;
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.function.Predicate;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
A basic implementation of LoadBalancingPolicy
that can serve as a building block for more advanced use cases. To activate this policy, modify the basic.load-balancing-policy
section in the driver configuration, for example:
datastax-java-driver {
basic.load-balancing-policy {
class = BasicLoadBalancingPolicy
local-datacenter = datacenter1 # optional
}
}
See reference.conf
(in the manual or core driver JAR) for more details. Local datacenter: This implementation will only define a local datacenter if it is
explicitly set either through configuration or programmatically; if the local datacenter is
unspecified, this implementation will effectively act as a datacenter-agnostic load balancing
policy and will consider all nodes in the cluster when creating query plans, regardless of their
datacenter.
Query plan: This implementation prioritizes replica nodes over non-replica ones; if
more than one replica is available, the replicas will be shuffled. Non-replica nodes will be
included in a round-robin fashion. If the local datacenter is defined (see above), query plans
will only include local nodes, never remote ones; if it is unspecified however, query plans may
contain nodes from different datacenters.
This class is not recommended for normal users who should always prefer DefaultLoadBalancingPolicy
.
/**
* A basic implementation of {@link LoadBalancingPolicy} that can serve as a building block for more
* advanced use cases.
*
* <p>To activate this policy, modify the {@code basic.load-balancing-policy} section in the driver
* configuration, for example:
*
* <pre>
* datastax-java-driver {
* basic.load-balancing-policy {
* class = BasicLoadBalancingPolicy
* local-datacenter = datacenter1 # optional
* }
* }
* </pre>
*
* See {@code reference.conf} (in the manual or core driver JAR) for more details.
*
* <p><b>Local datacenter</b>: This implementation will only define a local datacenter if it is
* explicitly set either through configuration or programmatically; if the local datacenter is
* unspecified, this implementation will effectively act as a datacenter-agnostic load balancing
* policy and will consider all nodes in the cluster when creating query plans, regardless of their
* datacenter.
*
* <p><b>Query plan</b>: This implementation prioritizes replica nodes over non-replica ones; if
* more than one replica is available, the replicas will be shuffled. Non-replica nodes will be
* included in a round-robin fashion. If the local datacenter is defined (see above), query plans
* will only include local nodes, never remote ones; if it is unspecified however, query plans may
* contain nodes from different datacenters.
*
* <p><b>This class is not recommended for normal users who should always prefer {@link
* DefaultLoadBalancingPolicy}</b>.
*/
@ThreadSafe
public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
private static final Logger LOG = LoggerFactory.getLogger(BasicLoadBalancingPolicy.class);
protected static final IntUnaryOperator INCREMENT = i -> (i == Integer.MAX_VALUE) ? 0 : i + 1;
@NonNull protected final InternalDriverContext context;
@NonNull protected final DriverExecutionProfile profile;
@NonNull protected final String logPrefix;
protected final AtomicInteger roundRobinAmount = new AtomicInteger();
protected final CopyOnWriteArraySet<Node> liveNodes = new CopyOnWriteArraySet<>();
// private because they should be set in init() and never be modified after
private volatile DistanceReporter distanceReporter;
private volatile Predicate<Node> filter;
private volatile String localDc;
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
this.context = (InternalDriverContext) context;
profile = context.getConfig().getProfile(profileName);
logPrefix = context.getSessionName() + "|" + profileName;
}
Returns: The local datacenter, if known; empty otherwise.
/** @return The local datacenter, if known; empty otherwise. */
public Optional<String> getLocalDatacenter() {
return Optional.ofNullable(localDc);
}
Returns: An immutable copy of the nodes currently considered as live; if the local datacenter is
known, this set will contain only nodes belonging to that datacenter.
/**
* @return An immutable copy of the nodes currently considered as live; if the local datacenter is
* known, this set will contain only nodes belonging to that datacenter.
*/
public Set<Node> getLiveNodes() {
return ImmutableSet.copyOf(liveNodes);
}
@Override
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
this.distanceReporter = distanceReporter;
localDc = discoverLocalDc(nodes).orElse(null);
filter = createNodeFilter(localDc, nodes);
for (Node node : nodes.values()) {
if (filter.test(node)) {
distanceReporter.setDistance(node, NodeDistance.LOCAL);
if (node.getState() != NodeState.DOWN) {
// This includes state == UNKNOWN. If the node turns out to be unreachable, this will be
// detected when we try to open a pool to it, it will get marked down and this will be
// signaled back to this policy
liveNodes.add(node);
}
} else {
distanceReporter.setDistance(node, NodeDistance.IGNORED);
}
}
}
Returns the local datacenter, if it can be discovered, or returns empty
otherwise. This method is called only once, during initialization.
Implementors may choose to throw IllegalStateException
instead of returning empty
, if they require a local datacenter to be defined in order to operate properly.
Params: - nodes – All the nodes that were known to exist in the cluster (regardless of their state)
when the load balancing policy was initialized. This argument is provided in case
implementors need to inspect the cluster topology to discover the local datacenter.
Throws: - IllegalStateException – if the local datacenter could not be discovered, and this policy
cannot operate without it.
Returns: The local datacenter, or empty
if none found.
/**
* Returns the local datacenter, if it can be discovered, or returns {@link Optional#empty empty}
* otherwise.
*
* <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map,
* LoadBalancingPolicy.DistanceReporter) initialization}.
*
* <p>Implementors may choose to throw {@link IllegalStateException} instead of returning {@link
* Optional#empty empty}, if they require a local datacenter to be defined in order to operate
* properly.
*
* @param nodes All the nodes that were known to exist in the cluster (regardless of their state)
* when the load balancing policy was initialized. This argument is provided in case
* implementors need to inspect the cluster topology to discover the local datacenter.
* @return The local datacenter, or {@link Optional#empty empty} if none found.
* @throws IllegalStateException if the local datacenter could not be discovered, and this policy
* cannot operate without it.
*/
@NonNull
protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
}
Creates a new node filter to use with this policy.
This method is called only once, during initialization, and only after local datacenter discovery has been attempted.
Params: - localDc – The local datacenter that was just discovered, or null if none found.
- nodes – All the nodes that were known to exist in the cluster (regardless of their state)
when the load balancing policy was initialized. This argument is provided in case
implementors need to inspect the cluster topology to create the node filter.
Returns: the node filter to use.
/**
* Creates a new node filter to use with this policy.
*
* <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map,
* LoadBalancingPolicy.DistanceReporter) initialization}, and only after local datacenter
* discovery has been attempted.
*
* @param localDc The local datacenter that was just discovered, or null if none found.
* @param nodes All the nodes that were known to exist in the cluster (regardless of their state)
* when the load balancing policy was initialized. This argument is provided in case
* implementors need to inspect the cluster topology to create the node filter.
* @return the node filter to use.
*/
@NonNull
protected Predicate<Node> createNodeFilter(
@Nullable String localDc, @NonNull Map<UUID, Node> nodes) {
return new DefaultNodeFilterHelper(context, profile, logPrefix)
.createNodeFilter(localDc, nodes);
}
@NonNull
@Override
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
// Take a snapshot since the set is concurrent:
Object[] currentNodes = liveNodes.toArray();
Set<Node> allReplicas = getReplicas(request, session);
int replicaCount = 0; // in currentNodes
if (!allReplicas.isEmpty()) {
// Move replicas to the beginning
for (int i = 0; i < currentNodes.length; i++) {
Node node = (Node) currentNodes[i];
if (allReplicas.contains(node)) {
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
replicaCount += 1;
}
}
if (replicaCount > 1) {
shuffleHead(currentNodes, replicaCount);
}
}
LOG.trace("[{}] Prioritizing {} local replicas", logPrefix, replicaCount);
// Round-robin the remaining nodes
ArrayUtils.rotate(
currentNodes,
replicaCount,
currentNodes.length - replicaCount,
roundRobinAmount.getAndUpdate(INCREMENT));
return new QueryPlan(currentNodes);
}
@NonNull
protected Set<Node> getReplicas(@Nullable Request request, @Nullable Session session) {
if (request == null || session == null) {
return Collections.emptySet();
}
Optional<TokenMap> maybeTokenMap = context.getMetadataManager().getMetadata().getTokenMap();
if (!maybeTokenMap.isPresent()) {
return Collections.emptySet();
}
// Note: we're on the hot path and the getXxx methods are potentially more than simple getters,
// so we only call each method when strictly necessary (which is why the code below looks a bit
// weird).
CqlIdentifier keyspace = null;
Token token = null;
ByteBuffer key = null;
try {
keyspace = request.getKeyspace();
if (keyspace == null) {
keyspace = request.getRoutingKeyspace();
}
if (keyspace == null && session.getKeyspace().isPresent()) {
keyspace = session.getKeyspace().get();
}
if (keyspace == null) {
return Collections.emptySet();
}
token = request.getRoutingToken();
key = (token == null) ? request.getRoutingKey() : null;
if (token == null && key == null) {
return Collections.emptySet();
}
} catch (Exception e) {
// Protect against poorly-implemented Request instances
LOG.error("Unexpected error while trying to compute query plan", e);
return Collections.emptySet();
}
TokenMap tokenMap = maybeTokenMap.get();
return token != null
? tokenMap.getReplicas(keyspace, token)
: tokenMap.getReplicas(keyspace, key);
}
Exposed as a protected method so that it can be accessed by tests /** Exposed as a protected method so that it can be accessed by tests */
protected void shuffleHead(Object[] currentNodes, int replicaCount) {
ArrayUtils.shuffleHead(currentNodes, replicaCount);
}
@Override
public void onAdd(@NonNull Node node) {
if (filter.test(node)) {
LOG.debug("[{}] {} was added, setting distance to LOCAL", logPrefix, node);
// Setting to a non-ignored distance triggers the session to open a pool, which will in turn
// set the node UP when the first channel gets opened.
distanceReporter.setDistance(node, NodeDistance.LOCAL);
} else {
distanceReporter.setDistance(node, NodeDistance.IGNORED);
}
}
@Override
public void onUp(@NonNull Node node) {
if (filter.test(node)) {
// Normally this is already the case, but the filter could be dynamic and have ignored the
// node previously.
distanceReporter.setDistance(node, NodeDistance.LOCAL);
if (liveNodes.add(node)) {
LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node);
}
} else {
distanceReporter.setDistance(node, NodeDistance.IGNORED);
}
}
@Override
public void onDown(@NonNull Node node) {
if (liveNodes.remove(node)) {
LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node);
}
}
@Override
public void onRemove(@NonNull Node node) {
if (liveNodes.remove(node)) {
LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node);
}
}
@Override
public void close() {
// nothing to do
}
}