package com.datastax.dse.driver.internal.core.insights;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.AUTH_PROVIDER_CLASS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.HEARTBEAT_INTERVAL;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_COMPRESSION;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_ENGINE_FACTORY_CLASS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_HOSTNAME_VALIDATION;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.internal.core.insights.PackageUtil.ClassSettingDetails;
import com.datastax.dse.driver.internal.core.insights.configuration.InsightsConfiguration;
import com.datastax.dse.driver.internal.core.insights.exceptions.InsightEventFormatException;
import com.datastax.dse.driver.internal.core.insights.schema.AuthProviderType;
import com.datastax.dse.driver.internal.core.insights.schema.Insight;
import com.datastax.dse.driver.internal.core.insights.schema.InsightMetadata;
import com.datastax.dse.driver.internal.core.insights.schema.InsightType;
import com.datastax.dse.driver.internal.core.insights.schema.InsightsStartupData;
import com.datastax.dse.driver.internal.core.insights.schema.InsightsStatusData;
import com.datastax.dse.driver.internal.core.insights.schema.PoolSizeByHostDistance;
import com.datastax.dse.driver.internal.core.insights.schema.SSL;
import com.datastax.dse.driver.internal.core.insights.schema.SessionStateForNode;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.SessionBuilder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.StartupOptionsBuilder;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.protocol.internal.request.query.QueryOptions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InsightsClient {
private static final Logger LOGGER = LoggerFactory.getLogger(InsightsClient.class);
private static final String STARTUP_MESSAGE_NAME = "driver.startup";
private static final String STATUS_MESSAGE_NAME = "driver.status";
private static final String REPORT_INSIGHT_RPC = "CALL InsightsRpc.reportInsight(?)";
private static final Map<String, String> TAGS = ImmutableMap.of("language", "java");
private static final String STARTUP_VERSION_1_ID = "v1";
private static final String STATUS_VERSION_1_ID = "v1";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_NUMBER_OF_STATUS_ERROR_LOGS = 5;
static final String DEFAULT_JAVA_APPLICATION = "Default Java Application";
private final ControlConnection controlConnection;
private final String id = Uuids.random().toString();
private final InsightsConfiguration insightsConfiguration;
private final AtomicInteger numberOfStatusEventErrors = new AtomicInteger();
private final InternalDriverContext driverContext;
private final Supplier<Long> timestampSupplier;
private final PlatformInfoFinder platformInfoFinder;
private final ReconnectionPolicyInfoFinder reconnectionPolicyInfoInfoFinder;
private final ExecutionProfilesInfoFinder executionProfilesInfoFinder;
private final ConfigAntiPatternsFinder configAntiPatternsFinder;
private final DataCentersFinder dataCentersFinder;
private final StackTraceElement[] initCallStackTrace;
private volatile ScheduledFuture<?> scheduleInsightsTask;
public static InsightsClient createInsightsClient(
InsightsConfiguration insightsConfiguration,
InternalDriverContext driverContext,
StackTraceElement[] initCallStackTrace) {
DataCentersFinder dataCentersFinder = new DataCentersFinder();
return new InsightsClient(
driverContext,
() -> new Date().getTime(),
insightsConfiguration,
new PlatformInfoFinder(),
new ReconnectionPolicyInfoFinder(),
new ExecutionProfilesInfoFinder(),
new ConfigAntiPatternsFinder(),
dataCentersFinder,
initCallStackTrace);
}
InsightsClient(
InternalDriverContext driverContext,
Supplier<Long> timestampSupplier,
InsightsConfiguration insightsConfiguration,
PlatformInfoFinder platformInfoFinder,
ReconnectionPolicyInfoFinder reconnectionPolicyInfoInfoFinder,
ExecutionProfilesInfoFinder executionProfilesInfoFinder,
ConfigAntiPatternsFinder configAntiPatternsFinder,
DataCentersFinder dataCentersFinder,
StackTraceElement[] initCallStackTrace) {
this.driverContext = driverContext;
this.controlConnection = driverContext.getControlConnection();
this.timestampSupplier = timestampSupplier;
this.insightsConfiguration = insightsConfiguration;
this.platformInfoFinder = platformInfoFinder;
this.reconnectionPolicyInfoInfoFinder = reconnectionPolicyInfoInfoFinder;
this.executionProfilesInfoFinder = executionProfilesInfoFinder;
this.configAntiPatternsFinder = configAntiPatternsFinder;
this.dataCentersFinder = dataCentersFinder;
this.initCallStackTrace = initCallStackTrace;
}
public CompletionStage<Void> sendStartupMessage() {
try {
if (!shouldSendEvent()) {
return CompletableFuture.completedFuture(null);
} else {
String startupMessage = createStartupMessage();
return sendJsonMessage(startupMessage)
.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
LOGGER.debug(
"Error while sending startup message to Insights. Message was: "
+ trimToFirst500characters(startupMessage),
throwable);
}
});
}
} catch (Exception e) {
LOGGER.debug("Unexpected error while sending startup message to Insights.", e);
return CompletableFutures.failedFuture(e);
}
}
private static String trimToFirst500characters(String startupMessage) {
return startupMessage.substring(0, Math.min(startupMessage.length(), 500));
}
public void scheduleStatusMessageSend() {
if (!shouldSendEvent()) {
return;
}
scheduleInsightsTask =
scheduleInsightsTask(
insightsConfiguration.getStatusEventDelayMillis(),
insightsConfiguration.getExecutor(),
this::sendStatusMessage);
}
public void shutdown() {
if (scheduleInsightsTask != null) {
scheduleInsightsTask.cancel(false);
}
}
@VisibleForTesting
public CompletionStage<Void> sendStatusMessage() {
try {
String statusMessage = createStatusMessage();
CompletionStage<Void> result = sendJsonMessage(statusMessage);
return result.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
if (numberOfStatusEventErrors.getAndIncrement() < MAX_NUMBER_OF_STATUS_ERROR_LOGS) {
LOGGER.debug(
"Error while sending status message to Insights. Message was: "
+ trimToFirst500characters(statusMessage),
throwable);
}
}
});
} catch (Exception e) {
LOGGER.debug("Unexpected error while sending status message to Insights.", e);
return CompletableFutures.failedFuture(e);
}
}
private CompletionStage<Void> sendJsonMessage(String jsonMessage) {
QueryOptions queryOptions = createQueryOptionsWithJson(jsonMessage);
String logPrefix = driverContext.getSessionName();
Duration timeout =
driverContext
.getConfig()
.getDefaultProfile()
.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT);
LOGGER.debug("sending JSON message: {}", jsonMessage);
Query query = new Query(REPORT_INSIGHT_RPC, queryOptions);
return AdminRequestHandler.call(controlConnection.channel(), query, timeout, logPrefix).start();
}
private QueryOptions createQueryOptionsWithJson(String json) {
TypeCodec<String> codec =
driverContext.getCodecRegistry().codecFor(DataTypes.TEXT, String.class);
ByteBuffer startupMessageSerialized = codec.encode(json, DseProtocolVersion.DSE_V2);
return new QueryOptions(
QueryOptions.DEFAULT.consistency,
Collections.singletonList(startupMessageSerialized),
QueryOptions.DEFAULT.namedValues,
QueryOptions.DEFAULT.skipMetadata,
QueryOptions.DEFAULT.pageSize,
QueryOptions.DEFAULT.pagingState,
QueryOptions.DEFAULT.serialConsistency,
QueryOptions.DEFAULT.defaultTimestamp,
QueryOptions.DEFAULT.keyspace,
QueryOptions.DEFAULT.nowInSeconds);
}
private boolean shouldSendEvent() {
try {
return insightsConfiguration.isMonitorReportingEnabled()
&& InsightsSupportVerifier.supportsInsights(
driverContext.getMetadataManager().getMetadata().getNodes().values());
} catch (Exception e) {
LOGGER.debug("Unexpected error while checking Insights support.", e);
return false;
}
}
@VisibleForTesting
String createStartupMessage() {
InsightMetadata insightMetadata = createMetadata(STARTUP_MESSAGE_NAME, STARTUP_VERSION_1_ID);
InsightsStartupData data = createStartupData();
try {
return OBJECT_MAPPER.writeValueAsString(new Insight<>(insightMetadata, data));
} catch (JsonProcessingException e) {
throw new InsightEventFormatException("Problem when creating: " + STARTUP_MESSAGE_NAME, e);
}
}
@VisibleForTesting
String createStatusMessage() {
InsightMetadata insightMetadata = createMetadata(STATUS_MESSAGE_NAME, STATUS_VERSION_1_ID);
InsightsStatusData data = createStatusData();
try {
return OBJECT_MAPPER.writeValueAsString(new Insight<>(insightMetadata, data));
} catch (JsonProcessingException e) {
throw new InsightEventFormatException("Problem when creating: " + STATUS_MESSAGE_NAME, e);
}
}
private InsightsStatusData createStatusData() {
Map<String, String> startupOptions = driverContext.getStartupOptions();
return InsightsStatusData.builder()
.withClientId(getClientId(startupOptions))
.withSessionId(id)
.withControlConnection(getControlConnectionSocketAddress())
.withConnectedNodes(getConnectedNodes())
.build();
}
private Map<String, SessionStateForNode> getConnectedNodes() {
Map<Node, ChannelPool> pools = driverContext.getPoolManager().getPools();
return pools.entrySet().stream()
.collect(
Collectors.toMap(
entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().resolve()),
this::constructSessionStateForNode));
}
private SessionStateForNode constructSessionStateForNode(Map.Entry<Node, ChannelPool> entry) {
return new SessionStateForNode(
entry.getKey().getOpenConnections(), entry.getValue().getInFlight());
}
private InsightsStartupData createStartupData() {
Map<String, String> startupOptions = driverContext.getStartupOptions();
return InsightsStartupData.builder()
.withClientId(getClientId(startupOptions))
.withSessionId(id)
.withApplicationName(getApplicationName(startupOptions))
.withApplicationVersion(getApplicationVersion(startupOptions))
.withDriverName(getDriverName(startupOptions))
.withDriverVersion(getDriverVersion(startupOptions))
.withContactPoints(
getResolvedContactPoints(
driverContext.getMetadataManager().getContactPoints().stream()
.map(n -> n.getEndPoint().resolve())
.filter(InetSocketAddress.class::isInstance)
.map(InetSocketAddress.class::cast)
.collect(Collectors.toSet())))
.withInitialControlConnection(getControlConnectionSocketAddress())
.withProtocolVersion(driverContext.getProtocolVersion().getCode())
.withLocalAddress(getLocalAddress())
.withExecutionProfiles(executionProfilesInfoFinder.getExecutionProfilesInfo(driverContext))
.withPoolSizeByHostDistance(getPoolSizeByHostDistance())
.withHeartbeatInterval(
driverContext
.getConfig()
.getDefaultProfile()
.getDuration(HEARTBEAT_INTERVAL)
.toMillis())
.withCompression(
driverContext.getConfig().getDefaultProfile().getString(PROTOCOL_COMPRESSION, "none"))
.withReconnectionPolicy(
reconnectionPolicyInfoInfoFinder.getReconnectionPolicyInfo(
driverContext.getReconnectionPolicy(),
driverContext.getConfig().getDefaultProfile()))
.withSsl(getSsl())
.withAuthProvider(getAuthProvider())
.withOtherOptions(getOtherOptions())
.withPlatformInfo(platformInfoFinder.getInsightsPlatformInfo())
.withConfigAntiPatterns(configAntiPatternsFinder.findAntiPatterns(driverContext))
.withPeriodicStatusInterval(getPeriodicStatusInterval())
.withHostName(getLocalHostName())
.withApplicationNameWasGenerated(isApplicationNameGenerated(startupOptions))
.withDataCenters(dataCentersFinder.getDataCenters(driverContext))
.build();
}
private AuthProviderType getAuthProvider() {
String authProviderClassName =
driverContext
.getConfig()
.getDefaultProfile()
.getString(AUTH_PROVIDER_CLASS, "NoAuthProvider");
ClassSettingDetails authProviderDetails =
PackageUtil.getAuthProviderDetails(authProviderClassName);
return new AuthProviderType(
authProviderDetails.getClassName(), authProviderDetails.getFullPackage());
}
private long getPeriodicStatusInterval() {
return TimeUnit.MILLISECONDS.toSeconds(insightsConfiguration.getStatusEventDelayMillis());
}
@VisibleForTesting
static Map<String, List<String>> getResolvedContactPoints(Set<InetSocketAddress> contactPoints) {
if (contactPoints == null) {
return Collections.emptyMap();
}
return contactPoints.stream()
.collect(
Collectors.groupingBy(
InetSocketAddress::getHostName,
Collectors.mapping(AddressFormatter::nullSafeToString, Collectors.toList())));
}
private String getDriverVersion(Map<String, String> startupOptions) {
return startupOptions.get(StartupOptionsBuilder.DRIVER_VERSION_KEY);
}
private String getDriverName(Map<String, String> startupOptions) {
return startupOptions.get(StartupOptionsBuilder.DRIVER_NAME_KEY);
}
private String getClientId(Map<String, String> startupOptions) {
return startupOptions.get(StartupOptionsBuilder.CLIENT_ID_KEY);
}
private boolean isApplicationNameGenerated(Map<String, String> startupOptions) {
return startupOptions.get(StartupOptionsBuilder.APPLICATION_NAME_KEY) == null;
}
private String getApplicationVersion(Map<String, String> startupOptions) {
String applicationVersion = startupOptions.get(StartupOptionsBuilder.APPLICATION_VERSION_KEY);
if (applicationVersion == null) {
return "";
}
return applicationVersion;
}
private String getApplicationName(Map<String, String> startupOptions) {
String applicationName = startupOptions.get(StartupOptionsBuilder.APPLICATION_NAME_KEY);
if (applicationName == null || applicationName.isEmpty()) {
return getClusterCreateCaller(initCallStackTrace);
}
return applicationName;
}
@VisibleForTesting
static String getClusterCreateCaller(StackTraceElement[] stackTrace) {
for (int i = 0; i < stackTrace.length - 1; i++) {
if (isClusterStackTrace(stackTrace[i])) {
int nextElement = i + 1;
if (!isClusterStackTrace(stackTrace[nextElement])) {
return stackTrace[nextElement].getClassName();
}
}
}
return DEFAULT_JAVA_APPLICATION;
}
private static boolean isClusterStackTrace(StackTraceElement stackTraceElement) {
return stackTraceElement.getClassName().equals(DefaultDriverContext.class.getName())
|| stackTraceElement.getClassName().equals(SessionBuilder.class.getName());
}
private String getLocalHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOGGER.warn("Can not resolve the name of a host, returning null", e);
return null;
}
}
private Map<String, Object> getOtherOptions() {
return Collections.emptyMap();
}
private SSL getSsl() {
boolean isSslDefined =
driverContext.getConfig().getDefaultProfile().isDefined(SSL_ENGINE_FACTORY_CLASS);
boolean certValidation =
driverContext.getConfig().getDefaultProfile().getBoolean(SSL_HOSTNAME_VALIDATION, false);
return new SSL(isSslDefined, certValidation);
}
private PoolSizeByHostDistance getPoolSizeByHostDistance() {
return new PoolSizeByHostDistance(
driverContext.getConfig().getDefaultProfile().getInt(CONNECTION_POOL_LOCAL_SIZE),
driverContext.getConfig().getDefaultProfile().getInt(CONNECTION_POOL_REMOTE_SIZE),
0);
}
private String getControlConnectionSocketAddress() {
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().resolve();
return AddressFormatter.nullSafeToString(controlConnectionAddress);
}
private String getLocalAddress() {
SocketAddress controlConnectionLocalAddress = controlConnection.channel().localAddress();
if (controlConnectionLocalAddress instanceof InetSocketAddress) {
return AddressFormatter.nullSafeToString(
((InetSocketAddress) controlConnectionLocalAddress).getAddress());
}
return null;
}
private InsightMetadata createMetadata(String messageName, String messageVersion) {
return new InsightMetadata(
messageName, timestampSupplier.get(), TAGS, InsightType.EVENT, messageVersion);
}
@VisibleForTesting
static ScheduledFuture<?> scheduleInsightsTask(
long statusEventDelayMillis,
ScheduledExecutorService scheduledTasksExecutor,
Runnable runnable) {
long initialDelay =
(long) Math.floor(statusEventDelayMillis - zeroToTenPercentRandom(statusEventDelayMillis));
return scheduledTasksExecutor.scheduleWithFixedDelay(
runnable, initialDelay, statusEventDelayMillis, TimeUnit.MILLISECONDS);
}
private static double zeroToTenPercentRandom(long statusEventDelayMillis) {
return 0.1 * statusEventDelayMillis * Math.random();
}
}