package com.datastax.oss.driver.internal.core;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.metadata.DseNodeProperties;
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
public class DefaultProtocolVersionRegistry implements ProtocolVersionRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DefaultProtocolVersionRegistry.class);
private static final List<ProtocolVersion> allVersions =
ImmutableList.<ProtocolVersion>builder()
.add(DefaultProtocolVersion.values())
.add(DseProtocolVersion.values())
.build();
@VisibleForTesting
static final Version DSE_4_7_0 = Objects.requireNonNull(Version.parse("4.7.0"));
@VisibleForTesting
static final Version DSE_5_0_0 = Objects.requireNonNull(Version.parse("5.0.0"));
@VisibleForTesting
static final Version DSE_5_1_0 = Objects.requireNonNull(Version.parse("5.1.0"));
@VisibleForTesting
static final Version DSE_6_0_0 = Objects.requireNonNull(Version.parse("6.0.0"));
private final String logPrefix;
public DefaultProtocolVersionRegistry(String logPrefix) {
this.logPrefix = logPrefix;
}
@Override
public ProtocolVersion fromName(String name) {
try {
return DefaultProtocolVersion.valueOf(name);
} catch (IllegalArgumentException noOssVersion) {
try {
return DseProtocolVersion.valueOf(name);
} catch (IllegalArgumentException noDseVersion) {
throw new IllegalArgumentException("Unknown protocol version name: " + name);
}
}
}
@Override
public ProtocolVersion highestNonBeta() {
ProtocolVersion highest = allVersions.get(allVersions.size() - 1);
if (!highest.isBeta()) {
return highest;
} else {
return downgrade(highest)
.orElseThrow(() -> new AssertionError("There should be at least one non-beta version"));
}
}
@Override
public Optional<ProtocolVersion> downgrade(ProtocolVersion version) {
int index = allVersions.indexOf(version);
if (index < 0) {
throw new AssertionError(version + " is not a known version");
} else if (index == 0) {
return Optional.empty();
} else {
ProtocolVersion previousVersion = allVersions.get(index - 1);
return previousVersion.isBeta() ? downgrade(previousVersion) : Optional.of(previousVersion);
}
}
@Override
public ProtocolVersion highestCommon(Collection<Node> nodes) {
if (nodes == null || nodes.isEmpty()) {
throw new IllegalArgumentException("Expected at least one node");
}
Set<ProtocolVersion> candidates = new LinkedHashSet<>();
for (ProtocolVersion version : allVersions) {
if (!version.isBeta()) {
candidates.add(version);
}
}
ImmutableList<ProtocolVersion> initialCandidates = ImmutableList.copyOf(candidates);
for (Node node : nodes) {
Version dseVersion = (Version) node.getExtras().get(DseNodeProperties.DSE_VERSION);
if (dseVersion != null) {
LOG.debug("[{}] Node {} reports DSE version {}", logPrefix, node.getEndPoint(), dseVersion);
dseVersion = dseVersion.nextStable();
if (dseVersion.compareTo(DSE_4_7_0) < 0) {
throw new UnsupportedProtocolVersionException(
node.getEndPoint(),
String.format(
"Node %s reports DSE version %s, "
+ "but the driver only supports 4.7.0 and above",
node.getEndPoint(), dseVersion),
initialCandidates);
} else if (dseVersion.compareTo(DSE_5_0_0) < 0) {
removeHigherThan(DefaultProtocolVersion.V3, null, candidates);
} else if (dseVersion.compareTo(DSE_5_1_0) < 0) {
removeHigherThan(DefaultProtocolVersion.V4, null, candidates);
} else if (dseVersion.compareTo(DSE_6_0_0) < 0) {
removeHigherThan(DefaultProtocolVersion.V4, DseProtocolVersion.DSE_V1, candidates);
} else {
removeHigherThan(DefaultProtocolVersion.V4, DseProtocolVersion.DSE_V2, candidates);
}
} else {
Version cassandraVersion = node.getCassandraVersion();
if (cassandraVersion == null) {
LOG.warn(
"[{}] Node {} reports neither DSE version nor Cassandra version, "
+ "ignoring it from optimal protocol version computation",
logPrefix,
node.getEndPoint());
continue;
}
cassandraVersion = cassandraVersion.nextStable();
LOG.debug(
"[{}] Node {} reports Cassandra version {}",
logPrefix,
node.getEndPoint(),
cassandraVersion);
if (cassandraVersion.compareTo(Version.V2_1_0) < 0) {
throw new UnsupportedProtocolVersionException(
node.getEndPoint(),
String.format(
"Node %s reports Cassandra version %s, "
+ "but the driver only supports 2.1.0 and above",
node.getEndPoint(), cassandraVersion),
ImmutableList.of(DefaultProtocolVersion.V3, DefaultProtocolVersion.V4));
} else if (cassandraVersion.compareTo(Version.V2_2_0) < 0) {
removeHigherThan(DefaultProtocolVersion.V3, null, candidates);
} else {
removeHigherThan(DefaultProtocolVersion.V4, null, candidates);
}
}
}
ProtocolVersion max = null;
for (ProtocolVersion candidate : candidates) {
if (max == null || max.getCode() < candidate.getCode()) {
max = candidate;
}
}
if (max == null) {
throw new UnsupportedProtocolVersionException(
null,
String.format(
"Could not determine a common protocol version, "
+ "enable DEBUG logs for '%s' for more details",
LOG.getName()),
initialCandidates);
} else {
return max;
}
}
private void removeHigherThan(
DefaultProtocolVersion maxOssVersion,
DseProtocolVersion maxDseVersion,
Set<ProtocolVersion> candidates) {
for (DefaultProtocolVersion ossVersion : DefaultProtocolVersion.values()) {
if (ossVersion.compareTo(maxOssVersion) > 0 && candidates.remove(ossVersion)) {
LOG.debug("[{}] Excluding protocol {}", logPrefix, ossVersion);
}
}
for (DseProtocolVersion dseVersion : DseProtocolVersion.values()) {
if ((maxDseVersion == null || dseVersion.compareTo(maxDseVersion) > 0)
&& candidates.remove(dseVersion)) {
LOG.debug("[{}] Excluding protocol {}", logPrefix, dseVersion);
}
}
}
@Override
public boolean supports(ProtocolVersion version, ProtocolFeature feature) {
int code = version.getCode();
if (DefaultProtocolFeature.SMALLINT_AND_TINYINT_TYPES.equals(feature)
|| DefaultProtocolFeature.DATE_TYPE.equals(feature)
|| DefaultProtocolFeature.UNSET_BOUND_VALUES.equals(feature)) {
return DefaultProtocolVersion.V4.getCode() <= code;
} else if (DefaultProtocolFeature.PER_REQUEST_KEYSPACE.equals(feature)) {
return (DefaultProtocolVersion.V5.getCode() <= code
&& code < DseProtocolVersion.DSE_V1.getCode())
|| DseProtocolVersion.DSE_V2.getCode() <= code;
} else if (DefaultProtocolFeature.NOW_IN_SECONDS.equals(feature)
|| DefaultProtocolFeature.MODERN_FRAMING.equals(feature)) {
return DefaultProtocolVersion.V5.getCode() <= code
&& code < DseProtocolVersion.DSE_V1.getCode();
} else if (DseProtocolFeature.CONTINUOUS_PAGING.equals(feature)) {
return DseProtocolVersion.DSE_V1.getCode() <= code;
} else {
throw new IllegalArgumentException("Unhandled protocol feature: " + feature);
}
}
}