/*
 * Copyright DataStax, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datastax.oss.driver.internal.core.context;

import com.datastax.dse.driver.api.core.config.DseDriverOption;
import com.datastax.dse.driver.api.core.type.codec.DseTypeCodecs;
import com.datastax.dse.driver.internal.core.InsightsClientLifecycleListener;
import com.datastax.dse.driver.internal.core.cql.continuous.ContinuousCqlRequestAsyncProcessor;
import com.datastax.dse.driver.internal.core.cql.continuous.ContinuousCqlRequestSyncProcessor;
import com.datastax.dse.driver.internal.core.cql.continuous.reactive.ContinuousCqlRequestReactiveProcessor;
import com.datastax.dse.driver.internal.core.cql.reactive.CqlRequestReactiveProcessor;
import com.datastax.dse.driver.internal.core.graph.GraphRequestAsyncProcessor;
import com.datastax.dse.driver.internal.core.graph.GraphRequestSyncProcessor;
import com.datastax.dse.driver.internal.core.graph.GraphSupportChecker;
import com.datastax.dse.driver.internal.core.graph.reactive.ReactiveGraphRequestProcessor;
import com.datastax.dse.driver.internal.core.tracker.MultiplexingRequestTracker;
import com.datastax.dse.protocol.internal.DseProtocolV1ClientCodecs;
import com.datastax.dse.protocol.internal.DseProtocolV2ClientCodecs;
import com.datastax.dse.protocol.internal.ProtocolV4ClientCodecsForDse;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.datastax.oss.driver.internal.core.ConsistencyLevelRegistry;
import com.datastax.oss.driver.internal.core.DefaultConsistencyLevelRegistry;
import com.datastax.oss.driver.internal.core.DefaultProtocolVersionRegistry;
import com.datastax.oss.driver.internal.core.ProtocolVersionRegistry;
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
import com.datastax.oss.driver.internal.core.channel.DefaultWriteCoalescer;
import com.datastax.oss.driver.internal.core.channel.WriteCoalescer;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
import com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor;
import com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor;
import com.datastax.oss.driver.internal.core.metadata.CloudTopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.DefaultTopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.DefaultSchemaParserFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.queries.DefaultSchemaQueriesFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
import com.datastax.oss.driver.internal.core.metadata.token.DefaultReplicationStrategyFactory;
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenFactoryRegistry;
import com.datastax.oss.driver.internal.core.metadata.token.ReplicationStrategyFactory;
import com.datastax.oss.driver.internal.core.metadata.token.TokenFactoryRegistry;
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
import com.datastax.oss.driver.internal.core.protocol.BuiltInCompressors;
import com.datastax.oss.driver.internal.core.protocol.ByteBufPrimitiveCodec;
import com.datastax.oss.driver.internal.core.servererrors.DefaultWriteTypeRegistry;
import com.datastax.oss.driver.internal.core.servererrors.WriteTypeRegistry;
import com.datastax.oss.driver.internal.core.session.PoolManager;
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
import com.datastax.oss.driver.internal.core.ssl.JdkSslHandlerFactory;
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
import com.datastax.oss.driver.internal.core.tracker.RequestLogFormatter;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.datastax.oss.driver.internal.core.util.DependencyCheck;
import com.datastax.oss.driver.internal.core.util.Reflection;
import com.datastax.oss.driver.internal.core.util.concurrent.CycleDetector;
import com.datastax.oss.driver.internal.core.util.concurrent.LazyReference;
import com.datastax.oss.protocol.internal.Compressor;
import com.datastax.oss.protocol.internal.FrameCodec;
import com.datastax.oss.protocol.internal.PrimitiveCodec;
import com.datastax.oss.protocol.internal.ProtocolV3ClientCodecs;
import com.datastax.oss.protocol.internal.ProtocolV5ClientCodecs;
import com.datastax.oss.protocol.internal.SegmentCodec;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.netty.buffer.ByteBuf;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Default implementation of the driver context.

All non-constant components are initialized lazily. Some components depend on others, so there might be deadlocks or stack overflows if the dependency graph is badly designed. This can be checked automatically with the system property -Dcom.datastax.oss.driver.DETECT_CYCLES=true (this might have a slight impact on startup time, so the check is disabled by default).

This is DIY dependency injection. We stayed away from DI frameworks for simplicity, to avoid an extra dependency, and because end users might want to access some of these components in their own implementations (which wouldn't work well with compile-time approaches like Dagger).

This also provides extension points for stuff that is too low-level for the driver configuration: the intent is that someone can extend this class, override one (or more) of the buildXxx methods, and initialize the cluster with this new implementation.

/** * Default implementation of the driver context. * * <p>All non-constant components are initialized lazily. Some components depend on others, so there * might be deadlocks or stack overflows if the dependency graph is badly designed. This can be * checked automatically with the system property {@code * -Dcom.datastax.oss.driver.DETECT_CYCLES=true} (this might have a slight impact on startup time, * so the check is disabled by default). * * <p>This is DIY dependency injection. We stayed away from DI frameworks for simplicity, to avoid * an extra dependency, and because end users might want to access some of these components in their * own implementations (which wouldn't work well with compile-time approaches like Dagger). * * <p>This also provides extension points for stuff that is too low-level for the driver * configuration: the intent is that someone can extend this class, override one (or more) of the * buildXxx methods, and initialize the cluster with this new implementation. */
@ThreadSafe public class DefaultDriverContext implements InternalDriverContext { private static final Logger LOG = LoggerFactory.getLogger(InternalDriverContext.class); private static final AtomicInteger SESSION_NAME_COUNTER = new AtomicInteger(); protected final CycleDetector cycleDetector = new CycleDetector("Detected cycle in context initialization"); private final LazyReference<Map<String, LoadBalancingPolicy>> loadBalancingPoliciesRef = new LazyReference<>("loadBalancingPolicies", this::buildLoadBalancingPolicies, cycleDetector); private final LazyReference<ReconnectionPolicy> reconnectionPolicyRef = new LazyReference<>("reconnectionPolicy", this::buildReconnectionPolicy, cycleDetector); private final LazyReference<Map<String, RetryPolicy>> retryPoliciesRef = new LazyReference<>("retryPolicies", this::buildRetryPolicies, cycleDetector); private final LazyReference<Map<String, SpeculativeExecutionPolicy>> speculativeExecutionPoliciesRef = new LazyReference<>( "speculativeExecutionPolicies", this::buildSpeculativeExecutionPolicies, cycleDetector); private final LazyReference<TimestampGenerator> timestampGeneratorRef = new LazyReference<>("timestampGenerator", this::buildTimestampGenerator, cycleDetector); private final LazyReference<AddressTranslator> addressTranslatorRef = new LazyReference<>("addressTranslator", this::buildAddressTranslator, cycleDetector); private final LazyReference<Optional<SslEngineFactory>> sslEngineFactoryRef; private final LazyReference<EventBus> eventBusRef = new LazyReference<>("eventBus", this::buildEventBus, cycleDetector); private final LazyReference<Compressor<ByteBuf>> compressorRef = new LazyReference<>("compressor", this::buildCompressor, cycleDetector); private final LazyReference<PrimitiveCodec<ByteBuf>> primitiveCodecRef = new LazyReference<>("primitiveCodec", this::buildPrimitiveCodec, cycleDetector); private final LazyReference<FrameCodec<ByteBuf>> frameCodecRef = new LazyReference<>("frameCodec", this::buildFrameCodec, cycleDetector); private final LazyReference<SegmentCodec<ByteBuf>> segmentCodecRef = new LazyReference<>("segmentCodec", this::buildSegmentCodec, cycleDetector); private final LazyReference<ProtocolVersionRegistry> protocolVersionRegistryRef = new LazyReference<>( "protocolVersionRegistry", this::buildProtocolVersionRegistry, cycleDetector); private final LazyReference<ConsistencyLevelRegistry> consistencyLevelRegistryRef = new LazyReference<>( "consistencyLevelRegistry", this::buildConsistencyLevelRegistry, cycleDetector); private final LazyReference<WriteTypeRegistry> writeTypeRegistryRef = new LazyReference<>("writeTypeRegistry", this::buildWriteTypeRegistry, cycleDetector); private final LazyReference<NettyOptions> nettyOptionsRef = new LazyReference<>("nettyOptions", this::buildNettyOptions, cycleDetector); private final LazyReference<WriteCoalescer> writeCoalescerRef = new LazyReference<>("writeCoalescer", this::buildWriteCoalescer, cycleDetector); private final LazyReference<Optional<SslHandlerFactory>> sslHandlerFactoryRef = new LazyReference<>("sslHandlerFactory", this::buildSslHandlerFactory, cycleDetector); private final LazyReference<ChannelFactory> channelFactoryRef = new LazyReference<>("channelFactory", this::buildChannelFactory, cycleDetector); private final LazyReference<TopologyMonitor> topologyMonitorRef = new LazyReference<>("topologyMonitor", this::buildTopologyMonitor, cycleDetector); private final LazyReference<MetadataManager> metadataManagerRef = new LazyReference<>("metadataManager", this::buildMetadataManager, cycleDetector); private final LazyReference<LoadBalancingPolicyWrapper> loadBalancingPolicyWrapperRef = new LazyReference<>( "loadBalancingPolicyWrapper", this::buildLoadBalancingPolicyWrapper, cycleDetector); private final LazyReference<ControlConnection> controlConnectionRef = new LazyReference<>("controlConnection", this::buildControlConnection, cycleDetector); private final LazyReference<RequestProcessorRegistry> requestProcessorRegistryRef = new LazyReference<>( "requestProcessorRegistry", this::buildRequestProcessorRegistry, cycleDetector); private final LazyReference<SchemaQueriesFactory> schemaQueriesFactoryRef = new LazyReference<>("schemaQueriesFactory", this::buildSchemaQueriesFactory, cycleDetector); private final LazyReference<SchemaParserFactory> schemaParserFactoryRef = new LazyReference<>("schemaParserFactory", this::buildSchemaParserFactory, cycleDetector); private final LazyReference<TokenFactoryRegistry> tokenFactoryRegistryRef = new LazyReference<>("tokenFactoryRegistry", this::buildTokenFactoryRegistry, cycleDetector); private final LazyReference<ReplicationStrategyFactory> replicationStrategyFactoryRef = new LazyReference<>( "replicationStrategyFactory", this::buildReplicationStrategyFactory, cycleDetector); private final LazyReference<PoolManager> poolManagerRef = new LazyReference<>("poolManager", this::buildPoolManager, cycleDetector); private final LazyReference<MetricsFactory> metricsFactoryRef = new LazyReference<>("metricsFactory", this::buildMetricsFactory, cycleDetector); private final LazyReference<RequestThrottler> requestThrottlerRef = new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector); private final LazyReference<Map<String, String>> startupOptionsRef = new LazyReference<>("startupOptions", this::buildStartupOptions, cycleDetector); private final LazyReference<NodeStateListener> nodeStateListenerRef; private final LazyReference<SchemaChangeListener> schemaChangeListenerRef; private final LazyReference<RequestTracker> requestTrackerRef; private final LazyReference<Optional<AuthProvider>> authProviderRef; private final LazyReference<List<LifecycleListener>> lifecycleListenersRef = new LazyReference<>("lifecycleListeners", this::buildLifecycleListeners, cycleDetector); private final DriverConfig config; private final DriverConfigLoader configLoader; private final ChannelPoolFactory channelPoolFactory = new ChannelPoolFactory(); private final CodecRegistry codecRegistry; private final String sessionName; private final NodeStateListener nodeStateListenerFromBuilder; private final SchemaChangeListener schemaChangeListenerFromBuilder; private final RequestTracker requestTrackerFromBuilder; private final Map<String, String> localDatacentersFromBuilder; private final Map<String, Predicate<Node>> nodeFiltersFromBuilder; private final ClassLoader classLoader; private final InetSocketAddress cloudProxyAddress; private final LazyReference<RequestLogFormatter> requestLogFormatterRef = new LazyReference<>("requestLogFormatter", this::buildRequestLogFormatter, cycleDetector); private final UUID startupClientId; private final String startupApplicationName; private final String startupApplicationVersion; private final Object metricRegistry; // A stack trace captured in the constructor. Used to extract information about the client // application. private final StackTraceElement[] initStackTrace; public DefaultDriverContext( DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) { this.config = configLoader.getInitialConfig(); this.configLoader = configLoader; DriverExecutionProfile defaultProfile = config.getDefaultProfile(); if (defaultProfile.isDefined(DefaultDriverOption.SESSION_NAME)) { this.sessionName = defaultProfile.getString(DefaultDriverOption.SESSION_NAME); } else { this.sessionName = "s" + SESSION_NAME_COUNTER.getAndIncrement(); } this.localDatacentersFromBuilder = programmaticArguments.getLocalDatacenters(); this.codecRegistry = buildCodecRegistry(programmaticArguments); this.nodeStateListenerFromBuilder = programmaticArguments.getNodeStateListener(); this.nodeStateListenerRef = new LazyReference<>( "nodeStateListener", () -> buildNodeStateListener(nodeStateListenerFromBuilder), cycleDetector); this.schemaChangeListenerFromBuilder = programmaticArguments.getSchemaChangeListener(); this.schemaChangeListenerRef = new LazyReference<>( "schemaChangeListener", () -> buildSchemaChangeListener(schemaChangeListenerFromBuilder), cycleDetector); this.requestTrackerFromBuilder = programmaticArguments.getRequestTracker(); this.authProviderRef = new LazyReference<>( "authProvider", () -> buildAuthProvider(programmaticArguments.getAuthProvider()), cycleDetector); this.requestTrackerRef = new LazyReference<>( "requestTracker", () -> buildRequestTracker(requestTrackerFromBuilder), cycleDetector); this.sslEngineFactoryRef = new LazyReference<>( "sslEngineFactory", () -> buildSslEngineFactory(programmaticArguments.getSslEngineFactory()), cycleDetector); this.nodeFiltersFromBuilder = programmaticArguments.getNodeFilters(); this.classLoader = programmaticArguments.getClassLoader(); this.cloudProxyAddress = programmaticArguments.getCloudProxyAddress(); this.startupClientId = programmaticArguments.getStartupClientId(); this.startupApplicationName = programmaticArguments.getStartupApplicationName(); this.startupApplicationVersion = programmaticArguments.getStartupApplicationVersion(); StackTraceElement[] stackTrace; try { stackTrace = Thread.currentThread().getStackTrace(); } catch (Exception ex) { // ignore and use empty stackTrace = new StackTraceElement[] {}; } this.initStackTrace = stackTrace; this.metricRegistry = programmaticArguments.getMetricRegistry(); }
Deprecated:this constructor only exists for backward compatibility. Please use DefaultDriverContext(DriverConfigLoader, ProgrammaticArguments) instead.
/** * @deprecated this constructor only exists for backward compatibility. Please use {@link * #DefaultDriverContext(DriverConfigLoader, ProgrammaticArguments)} instead. */
@Deprecated public DefaultDriverContext( DriverConfigLoader configLoader, List<TypeCodec<?>> typeCodecs, NodeStateListener nodeStateListener, SchemaChangeListener schemaChangeListener, RequestTracker requestTracker, Map<String, String> localDatacenters, Map<String, Predicate<Node>> nodeFilters, ClassLoader classLoader) { this( configLoader, ProgrammaticArguments.builder() .addTypeCodecs(typeCodecs.toArray(new TypeCodec<?>[0])) .withNodeStateListener(nodeStateListener) .withSchemaChangeListener(schemaChangeListener) .withRequestTracker(requestTracker) .withLocalDatacenters(localDatacenters) .withNodeFilters(nodeFilters) .withClassLoader(classLoader) .build()); }
Builds a map of options to send in a Startup message.
See Also:
  • getStartupOptions()
/** * Builds a map of options to send in a Startup message. * * @see #getStartupOptions() */
protected Map<String, String> buildStartupOptions() { return new StartupOptionsBuilder(this) .withClientId(startupClientId) .withApplicationName(startupApplicationName) .withApplicationVersion(startupApplicationVersion) .build(); } protected Map<String, LoadBalancingPolicy> buildLoadBalancingPolicies() { return Reflection.buildFromConfigProfiles( this, DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, DefaultDriverOption.LOAD_BALANCING_POLICY, LoadBalancingPolicy.class, "com.datastax.oss.driver.internal.core.loadbalancing", "com.datastax.dse.driver.internal.core.loadbalancing"); } protected Map<String, RetryPolicy> buildRetryPolicies() { return Reflection.buildFromConfigProfiles( this, DefaultDriverOption.RETRY_POLICY_CLASS, DefaultDriverOption.RETRY_POLICY, RetryPolicy.class, "com.datastax.oss.driver.internal.core.retry"); } protected Map<String, SpeculativeExecutionPolicy> buildSpeculativeExecutionPolicies() { return Reflection.buildFromConfigProfiles( this, DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY, SpeculativeExecutionPolicy.class, "com.datastax.oss.driver.internal.core.specex"); } protected TimestampGenerator buildTimestampGenerator() { return Reflection.buildFromConfig( this, DefaultDriverOption.TIMESTAMP_GENERATOR_CLASS, TimestampGenerator.class, "com.datastax.oss.driver.internal.core.time") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing timestamp generator, check your configuration (%s)", DefaultDriverOption.TIMESTAMP_GENERATOR_CLASS))); } protected ReconnectionPolicy buildReconnectionPolicy() { return Reflection.buildFromConfig( this, DefaultDriverOption.RECONNECTION_POLICY_CLASS, ReconnectionPolicy.class, "com.datastax.oss.driver.internal.core.connection") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing reconnection policy, check your configuration (%s)", DefaultDriverOption.RECONNECTION_POLICY_CLASS))); } protected AddressTranslator buildAddressTranslator() { return Reflection.buildFromConfig( this, DefaultDriverOption.ADDRESS_TRANSLATOR_CLASS, AddressTranslator.class, "com.datastax.oss.driver.internal.core.addresstranslation") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing address translator, check your configuration (%s)", DefaultDriverOption.ADDRESS_TRANSLATOR_CLASS))); } protected Optional<SslEngineFactory> buildSslEngineFactory(SslEngineFactory factoryFromBuilder) { return (factoryFromBuilder != null) ? Optional.of(factoryFromBuilder) : Reflection.buildFromConfig( this, DefaultDriverOption.SSL_ENGINE_FACTORY_CLASS, SslEngineFactory.class, "com.datastax.oss.driver.internal.core.ssl"); } protected EventBus buildEventBus() { return new EventBus(getSessionName()); } protected Compressor<ByteBuf> buildCompressor() { DriverExecutionProfile defaultProfile = getConfig().getDefaultProfile(); String name = defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none"); assert name != null : "should use default value"; return BuiltInCompressors.newInstance(name, this); } protected PrimitiveCodec<ByteBuf> buildPrimitiveCodec() { return new ByteBufPrimitiveCodec(getNettyOptions().allocator()); } protected FrameCodec<ByteBuf> buildFrameCodec() { return new FrameCodec<>( getPrimitiveCodec(), getCompressor(), new ProtocolV3ClientCodecs(), new ProtocolV4ClientCodecsForDse(), new ProtocolV5ClientCodecs(), new DseProtocolV1ClientCodecs(), new DseProtocolV2ClientCodecs()); } protected SegmentCodec<ByteBuf> buildSegmentCodec() { return new SegmentCodec<>(getPrimitiveCodec(), getCompressor()); } protected ProtocolVersionRegistry buildProtocolVersionRegistry() { return new DefaultProtocolVersionRegistry(getSessionName()); } protected ConsistencyLevelRegistry buildConsistencyLevelRegistry() { return new DefaultConsistencyLevelRegistry(); } protected WriteTypeRegistry buildWriteTypeRegistry() { return new DefaultWriteTypeRegistry(); } protected NettyOptions buildNettyOptions() { return new DefaultNettyOptions(this); } protected Optional<SslHandlerFactory> buildSslHandlerFactory() { // If a JDK-based factory was provided through the public API, wrap it return getSslEngineFactory().map(JdkSslHandlerFactory::new); // For more advanced options (like using Netty's native OpenSSL support instead of the JDK), // extend DefaultDriverContext and override this method } protected WriteCoalescer buildWriteCoalescer() { return new DefaultWriteCoalescer(this); } protected ChannelFactory buildChannelFactory() { return new ChannelFactory(this); } protected TopologyMonitor buildTopologyMonitor() { if (cloudProxyAddress == null) { return new DefaultTopologyMonitor(this); } return new CloudTopologyMonitor(this, cloudProxyAddress); } protected MetadataManager buildMetadataManager() { return new MetadataManager(this); } protected LoadBalancingPolicyWrapper buildLoadBalancingPolicyWrapper() { return new LoadBalancingPolicyWrapper(this, getLoadBalancingPolicies()); } protected ControlConnection buildControlConnection() { return new ControlConnection(this); } protected RequestProcessorRegistry buildRequestProcessorRegistry() { String logPrefix = getSessionName(); List<RequestProcessor<?, ?>> processors = new ArrayList<>(); // regular requests (sync and async) CqlRequestAsyncProcessor cqlRequestAsyncProcessor = new CqlRequestAsyncProcessor(); CqlRequestSyncProcessor cqlRequestSyncProcessor = new CqlRequestSyncProcessor(cqlRequestAsyncProcessor); processors.add(cqlRequestAsyncProcessor); processors.add(cqlRequestSyncProcessor); // prepare requests (sync and async) CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor = new CqlPrepareAsyncProcessor(); CqlPrepareSyncProcessor cqlPrepareSyncProcessor = new CqlPrepareSyncProcessor(cqlPrepareAsyncProcessor); processors.add(cqlPrepareAsyncProcessor); processors.add(cqlPrepareSyncProcessor); // continuous requests (sync and async) ContinuousCqlRequestAsyncProcessor continuousCqlRequestAsyncProcessor = new ContinuousCqlRequestAsyncProcessor(); ContinuousCqlRequestSyncProcessor continuousCqlRequestSyncProcessor = new ContinuousCqlRequestSyncProcessor(continuousCqlRequestAsyncProcessor); processors.add(continuousCqlRequestAsyncProcessor); processors.add(continuousCqlRequestSyncProcessor); // graph requests (sync and async) GraphRequestAsyncProcessor graphRequestAsyncProcessor = null; if (DependencyCheck.TINKERPOP.isPresent()) { graphRequestAsyncProcessor = new GraphRequestAsyncProcessor(this, new GraphSupportChecker()); GraphRequestSyncProcessor graphRequestSyncProcessor = new GraphRequestSyncProcessor(graphRequestAsyncProcessor); processors.add(graphRequestAsyncProcessor); processors.add(graphRequestSyncProcessor); } else { LOG.info( "Could not register Graph extensions; " + "this is normal if Tinkerpop was explicitly excluded from classpath"); } // reactive requests (regular, continuous and graph) if (DependencyCheck.REACTIVE_STREAMS.isPresent()) { CqlRequestReactiveProcessor cqlRequestReactiveProcessor = new CqlRequestReactiveProcessor(cqlRequestAsyncProcessor); ContinuousCqlRequestReactiveProcessor continuousCqlRequestReactiveProcessor = new ContinuousCqlRequestReactiveProcessor(continuousCqlRequestAsyncProcessor); processors.add(cqlRequestReactiveProcessor); processors.add(continuousCqlRequestReactiveProcessor); if (graphRequestAsyncProcessor != null) { ReactiveGraphRequestProcessor reactiveGraphRequestProcessor = new ReactiveGraphRequestProcessor(graphRequestAsyncProcessor); processors.add(reactiveGraphRequestProcessor); } } else { LOG.info( "Could not register Reactive extensions; " + "this is normal if Reactive Streams was explicitly excluded from classpath"); } return new RequestProcessorRegistry(logPrefix, processors.toArray(new RequestProcessor[0])); } protected CodecRegistry buildCodecRegistry(ProgrammaticArguments arguments) { MutableCodecRegistry registry = arguments.getCodecRegistry(); if (registry == null) { registry = new DefaultCodecRegistry(this.sessionName); } registry.register(arguments.getTypeCodecs()); registry.register(DseTypeCodecs.DATE_RANGE); if (DependencyCheck.ESRI.isPresent()) { registry.register(DseTypeCodecs.LINE_STRING, DseTypeCodecs.POINT, DseTypeCodecs.POLYGON); } else { LOG.info( "Could not register Geo codecs; " + "this is normal if ESRI was explicitly excluded from classpath"); } return registry; } protected SchemaQueriesFactory buildSchemaQueriesFactory() { return new DefaultSchemaQueriesFactory(this); } protected SchemaParserFactory buildSchemaParserFactory() { return new DefaultSchemaParserFactory(this); } protected TokenFactoryRegistry buildTokenFactoryRegistry() { return new DefaultTokenFactoryRegistry(this); } protected ReplicationStrategyFactory buildReplicationStrategyFactory() { return new DefaultReplicationStrategyFactory(this); } protected PoolManager buildPoolManager() { return new PoolManager(this); } protected MetricsFactory buildMetricsFactory() { return Reflection.buildFromConfig( this, DefaultDriverOption.METRICS_FACTORY_CLASS, MetricsFactory.class, "com.datastax.oss.driver.internal.core.metrics", "com.datastax.oss.driver.internal.metrics.microprofile", "com.datastax.oss.driver.internal.metrics.micrometer") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing metrics factory, check your config (%s)", DefaultDriverOption.METRICS_FACTORY_CLASS))); } protected RequestThrottler buildRequestThrottler() { return Reflection.buildFromConfig( this, DefaultDriverOption.REQUEST_THROTTLER_CLASS, RequestThrottler.class, "com.datastax.oss.driver.internal.core.session.throttling") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing request throttler, check your configuration (%s)", DefaultDriverOption.REQUEST_THROTTLER_CLASS))); } protected NodeStateListener buildNodeStateListener( NodeStateListener nodeStateListenerFromBuilder) { return (nodeStateListenerFromBuilder != null) ? nodeStateListenerFromBuilder : Reflection.buildFromConfig( this, DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASS, NodeStateListener.class, "com.datastax.oss.driver.internal.core.metadata") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing node state listener, check your configuration (%s)", DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASS))); } protected SchemaChangeListener buildSchemaChangeListener( SchemaChangeListener schemaChangeListenerFromBuilder) { return (schemaChangeListenerFromBuilder != null) ? schemaChangeListenerFromBuilder : Reflection.buildFromConfig( this, DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS, SchemaChangeListener.class, "com.datastax.oss.driver.internal.core.metadata.schema") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing schema change listener, check your configuration (%s)", DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS))); } protected RequestTracker buildRequestTracker(RequestTracker requestTrackerFromBuilder) { RequestTracker requestTrackerFromConfig = (requestTrackerFromBuilder != null) ? requestTrackerFromBuilder : Reflection.buildFromConfig( this, DefaultDriverOption.REQUEST_TRACKER_CLASS, RequestTracker.class, "com.datastax.oss.driver.internal.core.tracker") .orElseThrow( () -> new IllegalArgumentException( String.format( "Missing request tracker, check your configuration (%s)", DefaultDriverOption.REQUEST_TRACKER_CLASS))); // The default LBP needs to add its own tracker if (requestTrackerFromConfig instanceof MultiplexingRequestTracker) { return requestTrackerFromConfig; } else { MultiplexingRequestTracker multiplexingRequestTracker = new MultiplexingRequestTracker(); if (!(requestTrackerFromConfig instanceof NoopRequestTracker)) { multiplexingRequestTracker.register(requestTrackerFromConfig); } return multiplexingRequestTracker; } } protected Optional<AuthProvider> buildAuthProvider(AuthProvider authProviderFromBuilder) { return (authProviderFromBuilder != null) ? Optional.of(authProviderFromBuilder) : Reflection.buildFromConfig( this, DefaultDriverOption.AUTH_PROVIDER_CLASS, AuthProvider.class, "com.datastax.oss.driver.internal.core.auth", "com.datastax.dse.driver.internal.core.auth"); } protected List<LifecycleListener> buildLifecycleListeners() { if (DependencyCheck.JACKSON.isPresent()) { return Collections.singletonList(new InsightsClientLifecycleListener(this, initStackTrace)); } else { if (config.getDefaultProfile().getBoolean(DseDriverOption.MONITOR_REPORTING_ENABLED)) { LOG.info( "Could not initialize Insights monitoring; " + "this is normal if Jackson was explicitly excluded from classpath"); } return Collections.emptyList(); } } @NonNull @Override public String getSessionName() { return sessionName; } @NonNull @Override public DriverConfig getConfig() { return config; } @NonNull @Override public DriverConfigLoader getConfigLoader() { return configLoader; } @NonNull @Override public Map<String, LoadBalancingPolicy> getLoadBalancingPolicies() { return loadBalancingPoliciesRef.get(); } @NonNull @Override public Map<String, RetryPolicy> getRetryPolicies() { return retryPoliciesRef.get(); } @NonNull @Override public Map<String, SpeculativeExecutionPolicy> getSpeculativeExecutionPolicies() { return speculativeExecutionPoliciesRef.get(); } @NonNull @Override public TimestampGenerator getTimestampGenerator() { return timestampGeneratorRef.get(); } @NonNull @Override public ReconnectionPolicy getReconnectionPolicy() { return reconnectionPolicyRef.get(); } @NonNull @Override public AddressTranslator getAddressTranslator() { return addressTranslatorRef.get(); } @NonNull @Override public Optional<AuthProvider> getAuthProvider() { return authProviderRef.get(); } @NonNull @Override public Optional<SslEngineFactory> getSslEngineFactory() { return sslEngineFactoryRef.get(); } @NonNull @Override public EventBus getEventBus() { return eventBusRef.get(); } @NonNull @Override public Compressor<ByteBuf> getCompressor() { return compressorRef.get(); } @NonNull @Override public PrimitiveCodec<ByteBuf> getPrimitiveCodec() { return primitiveCodecRef.get(); } @NonNull @Override public FrameCodec<ByteBuf> getFrameCodec() { return frameCodecRef.get(); } @NonNull @Override public SegmentCodec<ByteBuf> getSegmentCodec() { return segmentCodecRef.get(); } @NonNull @Override public ProtocolVersionRegistry getProtocolVersionRegistry() { return protocolVersionRegistryRef.get(); } @NonNull @Override public ConsistencyLevelRegistry getConsistencyLevelRegistry() { return consistencyLevelRegistryRef.get(); } @NonNull @Override public WriteTypeRegistry getWriteTypeRegistry() { return writeTypeRegistryRef.get(); } @NonNull @Override public NettyOptions getNettyOptions() { return nettyOptionsRef.get(); } @NonNull @Override public WriteCoalescer getWriteCoalescer() { return writeCoalescerRef.get(); } @NonNull @Override public Optional<SslHandlerFactory> getSslHandlerFactory() { return sslHandlerFactoryRef.get(); } @NonNull @Override public ChannelFactory getChannelFactory() { return channelFactoryRef.get(); } @NonNull @Override public ChannelPoolFactory getChannelPoolFactory() { return channelPoolFactory; } @NonNull @Override public TopologyMonitor getTopologyMonitor() { return topologyMonitorRef.get(); } @NonNull @Override public MetadataManager getMetadataManager() { return metadataManagerRef.get(); } @NonNull @Override public LoadBalancingPolicyWrapper getLoadBalancingPolicyWrapper() { return loadBalancingPolicyWrapperRef.get(); } @NonNull @Override public ControlConnection getControlConnection() { return controlConnectionRef.get(); } @NonNull @Override public RequestProcessorRegistry getRequestProcessorRegistry() { return requestProcessorRegistryRef.get(); } @NonNull @Override public SchemaQueriesFactory getSchemaQueriesFactory() { return schemaQueriesFactoryRef.get(); } @NonNull @Override public SchemaParserFactory getSchemaParserFactory() { return schemaParserFactoryRef.get(); } @NonNull @Override public TokenFactoryRegistry getTokenFactoryRegistry() { return tokenFactoryRegistryRef.get(); } @NonNull @Override public ReplicationStrategyFactory getReplicationStrategyFactory() { return replicationStrategyFactoryRef.get(); } @NonNull @Override public PoolManager getPoolManager() { return poolManagerRef.get(); } @NonNull @Override public MetricsFactory getMetricsFactory() { return metricsFactoryRef.get(); } @NonNull @Override public RequestThrottler getRequestThrottler() { return requestThrottlerRef.get(); } @NonNull @Override public NodeStateListener getNodeStateListener() { return nodeStateListenerRef.get(); } @NonNull @Override public SchemaChangeListener getSchemaChangeListener() { return schemaChangeListenerRef.get(); } @NonNull @Override public RequestTracker getRequestTracker() { return requestTrackerRef.get(); } @Nullable @Override public String getLocalDatacenter(@NonNull String profileName) { return localDatacentersFromBuilder.get(profileName); } @Nullable @Override public Predicate<Node> getNodeFilter(@NonNull String profileName) { return nodeFiltersFromBuilder.get(profileName); } @Nullable @Override public ClassLoader getClassLoader() { return classLoader; } @NonNull @Override public CodecRegistry getCodecRegistry() { return codecRegistry; } @NonNull @Override public ProtocolVersion getProtocolVersion() { return getChannelFactory().getProtocolVersion(); } @NonNull @Override public Map<String, String> getStartupOptions() { return startupOptionsRef.get(); } protected RequestLogFormatter buildRequestLogFormatter() { return new RequestLogFormatter(this); } @NonNull @Override public RequestLogFormatter getRequestLogFormatter() { return requestLogFormatterRef.get(); } @NonNull @Override public List<LifecycleListener> getLifecycleListeners() { return lifecycleListenersRef.get(); } @Nullable @Override public Object getMetricRegistry() { return metricRegistry; } }