package com.datastax.dse.driver.internal.core.cql.continuous;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
import com.datastax.dse.driver.internal.core.cql.DseConversions;
import com.datastax.dse.protocol.internal.request.Revise;
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.api.core.metrics.NodeMetric;
import com.datastax.oss.driver.api.core.metrics.SessionMetric;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.retry.RetryVerdict;
import com.datastax.oss.driver.api.core.servererrors.BootstrappingException;
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
import com.datastax.oss.driver.api.core.servererrors.FunctionFailureException;
import com.datastax.oss.driver.api.core.servererrors.ProtocolError;
import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.cql.Conversions;
import com.datastax.oss.driver.internal.core.cql.DefaultExecutionInfo;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.request.Prepare;
import com.datastax.oss.protocol.internal.response.Error;
import com.datastax.oss.protocol.internal.response.Result;
import com.datastax.oss.protocol.internal.response.error.Unprepared;
import com.datastax.oss.protocol.internal.response.result.Rows;
import com.datastax.oss.protocol.internal.response.result.Void;
import com.datastax.oss.protocol.internal.util.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class ContinuousRequestHandlerBase<StatementT extends Request, ResultSetT>
implements Throttled {
private static final Logger LOG = LoggerFactory.getLogger(ContinuousRequestHandlerBase.class);
protected final String logPrefix;
protected final StatementT initialStatement;
protected final DefaultSession session;
private final CqlIdentifier keyspace;
protected final InternalDriverContext context;
private final Queue<Node> queryPlan;
protected final RequestThrottler throttler;
private final boolean protocolBackpressureAvailable;
private final Timer timer;
private final SessionMetricUpdater sessionMetricUpdater;
private final boolean specExecEnabled;
private final SessionMetric clientTimeoutsMetric;
private final SessionMetric continuousRequestsMetric;
private final NodeMetric messagesMetric;
private final List<Timeout> scheduledExecutions;
protected final List<Map.Entry<Node, Throwable>> errors = new CopyOnWriteArrayList<>();
private final List<NodeResponseCallback> inFlightCallbacks = new CopyOnWriteArrayList<>();
private final CompletableFuture<NodeResponseCallback> chosenCallback = new CompletableFuture<>();
private final AtomicInteger activeExecutionsCount = new AtomicInteger(0);
protected final AtomicInteger startedSpeculativeExecutionsCount = new AtomicInteger(0);
private final long startTimeNanos;
private volatile Timeout globalTimeout;
private final Class<ResultSetT> resultSetClass;
public ContinuousRequestHandlerBase(
@NonNull StatementT statement,
@NonNull DefaultSession session,
@NonNull InternalDriverContext context,
@NonNull String sessionLogPrefix,
@NonNull Class<ResultSetT> resultSetClass,
boolean specExecEnabled,
SessionMetric clientTimeoutsMetric,
SessionMetric continuousRequestsMetric,
NodeMetric messagesMetric) {
this.resultSetClass = resultSetClass;
ProtocolVersion protocolVersion = context.getProtocolVersion();
if (!context
.getProtocolVersionRegistry()
.supports(protocolVersion, DseProtocolFeature.CONTINUOUS_PAGING)) {
throw new IllegalStateException(
"Cannot execute continuous paging requests with protocol version " + protocolVersion);
}
this.clientTimeoutsMetric = clientTimeoutsMetric;
this.continuousRequestsMetric = continuousRequestsMetric;
this.messagesMetric = messagesMetric;
this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
LOG.trace("[{}] Creating new continuous handler for request {}", logPrefix, statement);
this.initialStatement = statement;
this.session = session;
this.keyspace = session.getKeyspace().orElse(null);
this.context = context;
DriverExecutionProfile executionProfile =
Conversions.resolveExecutionProfile(statement, context);
this.queryPlan =
statement.getNode() != null
? new SimpleQueryPlan(statement.getNode())
: context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(statement, executionProfile.getName(), session);
this.timer = context.getNettyOptions().getTimer();
this.protocolBackpressureAvailable =
protocolVersion.getCode() >= DseProtocolVersion.DSE_V2.getCode();
this.throttler = context.getRequestThrottler();
this.sessionMetricUpdater = session.getMetricUpdater();
this.startTimeNanos = System.nanoTime();
this.specExecEnabled = specExecEnabled;
this.scheduledExecutions = this.specExecEnabled ? new CopyOnWriteArrayList<>() : null;
}
@NonNull
protected abstract Duration getGlobalTimeout();
@NonNull
protected abstract Duration getPageTimeout(@NonNull StatementT statement, int pageNumber);
@NonNull
protected abstract Duration getReviseRequestTimeout(@NonNull StatementT statement);
protected abstract int getMaxEnqueuedPages(@NonNull StatementT statement);
protected abstract int getMaxPages(@NonNull StatementT statement);
@NonNull
protected abstract Message getMessage(@NonNull StatementT statement);
protected abstract boolean isTracingEnabled(@NonNull StatementT statement);
@NonNull
protected abstract Map<String, ByteBuffer> createPayload(@NonNull StatementT statement);
@NonNull
protected abstract ResultSetT createEmptyResultSet(@NonNull ExecutionInfo executionInfo);
protected abstract int pageNumber(@NonNull ResultSetT resultSet);
@NonNull
protected abstract ResultSetT createResultSet(
@NonNull StatementT statement,
@NonNull Rows rows,
@NonNull ExecutionInfo executionInfo,
@NonNull ColumnDefinitions columnDefinitions)
throws IOException;
@Override
public void onThrottleReady(boolean wasDelayed) {
DriverExecutionProfile executionProfile =
Conversions.resolveExecutionProfile(initialStatement, context);
if (wasDelayed
&& sessionMetricUpdater.isEnabled(
DefaultSessionMetric.THROTTLING_DELAY, executionProfile.getName())) {
session
.getMetricUpdater()
.updateTimer(
DefaultSessionMetric.THROTTLING_DELAY,
executionProfile.getName(),
System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
}
activeExecutionsCount.incrementAndGet();
sendRequest(initialStatement, null, 0, 0, specExecEnabled);
}
@Override
public void onThrottleFailure(@NonNull RequestThrottlingException error) {
DriverExecutionProfile executionProfile =
Conversions.resolveExecutionProfile(initialStatement, context);
session
.getMetricUpdater()
.incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS, executionProfile.getName());
abortGlobalRequestOrChosenCallback(error);
}
private void abortGlobalRequestOrChosenCallback(@NonNull Throwable error) {
if (!chosenCallback.completeExceptionally(error)) {
chosenCallback.thenAccept(callback -> callback.abort(error, false));
}
}
public CompletionStage<ResultSetT> handle() {
globalTimeout = scheduleGlobalTimeout();
return fetchNextPage();
}
public CompletionStage<ResultSetT> fetchNextPage() {
CompletableFuture<ResultSetT> result = new CompletableFuture<>();
chosenCallback.whenComplete(
(callback, callbackError) -> {
if (callbackError != null) {
result.completeExceptionally(callbackError);
} else {
callback
.dequeueOrCreatePending()
.whenComplete(
(resultSet, resultSetError) -> {
if (resultSetError != null) {
result.completeExceptionally(resultSetError);
} else {
result.complete(resultSet);
}
});
}
});
result.whenComplete(
(rs, t) -> {
if (t instanceof CancellationException) {
cancel();
}
});
return result;
}
private void sendRequest(
StatementT statement,
@Nullable Node node,
int currentExecutionIndex,
int retryCount,
boolean scheduleSpeculativeExecution) {
DriverChannel channel = null;
if (node == null || (channel = session.getChannel(node, logPrefix)) == null) {
while ((node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
break;
}
}
}
if (channel == null) {
if (activeExecutionsCount.decrementAndGet() == 0) {
abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors));
}
} else if (!chosenCallback.isDone()) {
NodeResponseCallback nodeResponseCallback =
new NodeResponseCallback(
statement,
node,
channel,
currentExecutionIndex,
retryCount,
scheduleSpeculativeExecution,
logPrefix);
inFlightCallbacks.add(nodeResponseCallback);
channel
.write(
getMessage(statement),
isTracingEnabled(statement),
createPayload(statement),
nodeResponseCallback)
.addListener(nodeResponseCallback);
}
}
private Timeout scheduleGlobalTimeout() {
Duration globalTimeout = getGlobalTimeout();
if (globalTimeout.toNanos() <= 0) {
return null;
}
LOG.trace("[{}] Scheduling global timeout for pages in {}", logPrefix, globalTimeout);
return timer.newTimeout(
timeout ->
abortGlobalRequestOrChosenCallback(
new DriverTimeoutException("Query timed out after " + globalTimeout)),
globalTimeout.toNanos(),
TimeUnit.NANOSECONDS);
}
public void cancel() {
chosenCallback.cancel(true);
cancelScheduledTasks(null);
cancelGlobalTimeout();
}
private void cancelGlobalTimeout() {
if (globalTimeout != null) {
globalTimeout.cancel();
}
}
private void cancelScheduledTasks(@Nullable NodeResponseCallback toIgnore) {
if (scheduledExecutions != null) {
for (Timeout scheduledExecution : scheduledExecutions) {
scheduledExecution.cancel();
}
}
for (NodeResponseCallback callback : inFlightCallbacks) {
if (toIgnore == null || toIgnore != callback) {
callback.cancel();
}
}
}
@VisibleForTesting
int getState() {
try {
return chosenCallback.get().getState();
} catch (CancellationException e) {
return NodeResponseCallback.STATE_FAILED;
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("Unexpected error", e);
}
}
@VisibleForTesting
CompletableFuture<ResultSetT> getPendingResult() {
try {
return chosenCallback.get().getPendingResult();
} catch (Exception e) {
throw new AssertionError("Expected callback to be chosen at this point");
}
}
private class NodeResponseCallback
implements ResponseCallback, GenericFutureListener<Future<java.lang.Void>> {
private final long messageStartTimeNanos = System.nanoTime();
private final StatementT statement;
private final Node node;
private final DriverChannel channel;
private final int executionIndex;
private final String logPrefix;
private final boolean scheduleSpeculativeExecution;
private final DriverExecutionProfile executionProfile;
private final ReentrantLock lock = new ReentrantLock();
@GuardedBy("lock")
private Queue<Object> queue;
@GuardedBy("lock")
private CompletableFuture<ResultSetT> pendingResult;
@GuardedBy("lock")
private int numPagesRequested;
@GuardedBy("lock")
private int state = 1;
@GuardedBy("lock")
private boolean sawLastResponse;
@GuardedBy("lock")
private boolean sentCancelRequest;
private static final int STATE_FINISHED = -1;
private static final int STATE_FAILED = -2;
@GuardedBy("lock")
private int streamId = -1;
private volatile ColumnDefinitions columnDefinitions;
private volatile Timeout pageTimeout;
private final int retryCount;
private final AtomicBoolean stopNodeMessageTimerReported = new AtomicBoolean(false);
private final AtomicBoolean nodeErrorReported = new AtomicBoolean(false);
private final AtomicBoolean nodeSuccessReported = new AtomicBoolean(false);
public NodeResponseCallback(
StatementT statement,
Node node,
DriverChannel channel,
int executionIndex,
int retryCount,
boolean scheduleSpeculativeExecution,
String logPrefix) {
this.statement = statement;
this.node = node;
this.channel = channel;
this.executionIndex = executionIndex;
this.retryCount = retryCount;
this.scheduleSpeculativeExecution = scheduleSpeculativeExecution;
this.logPrefix = logPrefix + "|" + executionIndex;
this.executionProfile = Conversions.resolveExecutionProfile(statement, context);
}
@Override
public void onStreamIdAssigned(int streamId) {
LOG.trace("[{}] Assigned streamId {} on node {}", logPrefix, streamId, node);
lock.lock();
try {
this.streamId = streamId;
if (state < 0) {
releaseStreamId();
}
} finally {
lock.unlock();
}
}
@Override
public boolean isLastResponse(@NonNull Frame responseFrame) {
lock.lock();
try {
Message message = responseFrame.message;
boolean isLastResponse;
if (sentCancelRequest) {
if (message instanceof Error) {
Error error = (Error) message;
isLastResponse =
(error.code == ProtocolConstants.ErrorCode.SERVER_ERROR)
&& error.message.contains("Session cancelled by the user");
} else {
isLastResponse = false;
}
} else if (message instanceof Rows) {
Rows rows = (Rows) message;
DseRowsMetadata metadata = (DseRowsMetadata) rows.getMetadata();
isLastResponse = metadata.isLastContinuousPage;
} else {
isLastResponse = message instanceof Error;
}
if (isLastResponse) {
sawLastResponse = true;
}
return isLastResponse;
} finally {
lock.unlock();
}
}
@Override
public void operationComplete(@NonNull Future<java.lang.Void> future) {
if (!future.isSuccess()) {
Throwable error = future.cause();
if (error instanceof EncoderException
&& error.getCause() instanceof FrameTooLongException) {
trackNodeError(node, error.getCause());
lock.lock();
try {
abort(error.getCause(), false);
} finally {
lock.unlock();
}
} else {
LOG.trace(
"[{}] Failed to send request on {}, trying next node (cause: {})",
logPrefix,
channel,
error);
((DefaultNode) node)
.getMetricUpdater()
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
recordError(node, error);
trackNodeError(node, error.getCause());
sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution);
}
} else {
LOG.trace("[{}] Request sent on {}", logPrefix, channel);
if (scheduleSpeculativeExecution && Conversions.resolveIdempotence(statement, context)) {
int nextExecution = executionIndex + 1;
long nextDelay =
Conversions.resolveSpeculativeExecutionPolicy(statement, context)
.nextExecution(node, keyspace, statement, nextExecution);
if (nextDelay >= 0) {
scheduleSpeculativeExecution(nextExecution, nextDelay);
} else {
LOG.trace(
"[{}] Speculative execution policy returned {}, no next execution",
logPrefix,
nextDelay);
}
}
pageTimeout = schedulePageTimeout(1);
}
}
private void scheduleSpeculativeExecution(int nextExecutionIndex, long delay) {
LOG.trace(
"[{}] Scheduling speculative execution {} in {} ms",
logPrefix,
nextExecutionIndex,
delay);
try {
scheduledExecutions.add(
timer.newTimeout(
(Timeout timeout) -> {
if (!chosenCallback.isDone()) {
LOG.trace(
"[{}] Starting speculative execution {}", logPrefix, nextExecutionIndex);
activeExecutionsCount.incrementAndGet();
startedSpeculativeExecutionsCount.incrementAndGet();
NodeMetricUpdater nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
if (nodeMetricUpdater.isEnabled(
DefaultNodeMetric.SPECULATIVE_EXECUTIONS, executionProfile.getName())) {
nodeMetricUpdater.incrementCounter(
DefaultNodeMetric.SPECULATIVE_EXECUTIONS, executionProfile.getName());
}
sendRequest(statement, null, nextExecutionIndex, 0, true);
}
},
delay,
TimeUnit.MILLISECONDS));
} catch (IllegalStateException e) {
logTimeoutSchedulingError(e);
}
}
private Timeout schedulePageTimeout(int expectedPage) {
if (expectedPage < 0) {
return null;
}
Duration timeout = getPageTimeout(statement, expectedPage);
if (timeout.toNanos() <= 0) {
return null;
}
LOG.trace("[{}] Scheduling timeout for page {} in {}", logPrefix, expectedPage, timeout);
return timer.newTimeout(
t -> onPageTimeout(expectedPage), timeout.toNanos(), TimeUnit.NANOSECONDS);
}
private void onPageTimeout(int expectedPage) {
lock.lock();
try {
if (state == expectedPage) {
abort(
new DriverTimeoutException(
String.format("Timed out waiting for page %d", expectedPage)),
false);
} else {
LOG.trace(
"[{}] Timeout fired for page {} but query already at state {}, skipping",
logPrefix,
expectedPage,
state);
}
} finally {
lock.unlock();
}
}
@Override
public void onResponse(@NonNull Frame response) {
stopNodeMessageTimer();
cancelTimeout(pageTimeout);
lock.lock();
try {
if (state < 0) {
LOG.trace("[{}] Got result but the request has been cancelled, ignoring", logPrefix);
return;
}
try {
Message responseMessage = response.message;
if (responseMessage instanceof Result) {
LOG.trace("[{}] Got result", logPrefix);
processResultResponse((Result) responseMessage, response);
} else if (responseMessage instanceof Error) {
LOG.trace("[{}] Got error response", logPrefix);
processErrorResponse((Error) responseMessage);
} else {
IllegalStateException error =
new IllegalStateException("Unexpected response " + responseMessage);
trackNodeError(node, error);
abort(error, false);
}
} catch (Throwable t) {
trackNodeError(node, t);
abort(t, false);
}
} finally {
lock.unlock();
}
}
@Override
public void onFailure(@NonNull Throwable error) {
cancelTimeout(pageTimeout);
LOG.trace(String.format("[%s] Request failure", logPrefix), error);
RetryVerdict verdict;
if (!Conversions.resolveIdempotence(statement, context)
|| error instanceof FrameTooLongException) {
verdict = RetryVerdict.RETHROW;
} else {
try {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
verdict = retryPolicy.onRequestAbortedVerdict(statement, error, retryCount);
} catch (Throwable cause) {
abort(
new IllegalStateException("Unexpected error while invoking the retry policy", cause),
false);
return;
}
}
updateErrorMetrics(
((DefaultNode) node).getMetricUpdater(),
verdict,
DefaultNodeMetric.ABORTED_REQUESTS,
DefaultNodeMetric.RETRIES_ON_ABORTED,
DefaultNodeMetric.IGNORES_ON_ABORTED);
lock.lock();
try {
processRetryVerdict(verdict, error);
} finally {
lock.unlock();
}
}
@SuppressWarnings("GuardedBy")
private void processResultResponse(@NonNull Result result, @Nullable Frame frame) {
assert lock.isHeldByCurrentThread();
try {
ExecutionInfo executionInfo = createExecutionInfo(result, frame);
if (result instanceof Rows) {
DseRowsMetadata rowsMetadata = (DseRowsMetadata) ((Rows) result).getMetadata();
if (columnDefinitions == null) {
columnDefinitions = Conversions.toColumnDefinitions(rowsMetadata, context);
}
int pageNumber = rowsMetadata.continuousPageNumber;
int currentPage = state;
if (pageNumber != currentPage) {
abort(
new IllegalStateException(
String.format(
"Received page %d but was expecting %d", pageNumber, currentPage)),
false);
} else {
int pageSize = ((Rows) result).getData().size();
ResultSetT resultSet =
createResultSet(statement, (Rows) result, executionInfo, columnDefinitions);
if (rowsMetadata.isLastContinuousPage) {
LOG.trace("[{}] Received last page ({} - {} rows)", logPrefix, pageNumber, pageSize);
state = STATE_FINISHED;
reenableAutoReadIfNeeded();
enqueueOrCompletePending(resultSet);
stopGlobalRequestTimer();
cancelTimeout(globalTimeout);
} else {
LOG.trace("[{}] Received page {} ({} rows)", logPrefix, pageNumber, pageSize);
if (currentPage > 0) {
state = currentPage + 1;
}
enqueueOrCompletePending(resultSet);
}
}
} else {
assert result instanceof Void;
ResultSetT resultSet = createEmptyResultSet(executionInfo);
LOG.trace(
"[{}] Continuous paging interrupted by retry policy decision to ignore error",
logPrefix);
state = STATE_FINISHED;
reenableAutoReadIfNeeded();
enqueueOrCompletePending(resultSet);
stopGlobalRequestTimer();
cancelTimeout(globalTimeout);
}
} catch (Throwable error) {
abort(error, false);
}
}
@SuppressWarnings("GuardedBy")
private void processErrorResponse(@NonNull Error errorMessage) {
assert lock.isHeldByCurrentThread();
if (errorMessage instanceof Unprepared) {
processUnprepared((Unprepared) errorMessage);
} else {
CoordinatorException error = DseConversions.toThrowable(node, errorMessage, context);
if (error instanceof BootstrappingException) {
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
recordError(node, error);
trackNodeError(node, error);
sendRequest(statement, null, executionIndex, retryCount, false);
} else if (error instanceof QueryValidationException
|| error instanceof FunctionFailureException
|| error instanceof ProtocolError
|| state > 1) {
LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix);
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
metricUpdater.incrementCounter(
DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
trackNodeError(node, error);
abort(error, true);
} else {
try {
processRecoverableError(error);
} catch (Throwable cause) {
abort(cause, false);
}
}
}
}
private void processRecoverableError(@NonNull CoordinatorException error) {
assert lock.isHeldByCurrentThread();
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
RetryVerdict verdict;
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(statement, context);
if (error instanceof ReadTimeoutException) {
ReadTimeoutException readTimeout = (ReadTimeoutException) error;
verdict =
retryPolicy.onReadTimeoutVerdict(
statement,
readTimeout.getConsistencyLevel(),
readTimeout.getBlockFor(),
readTimeout.getReceived(),
readTimeout.wasDataPresent(),
retryCount);
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.READ_TIMEOUTS,
DefaultNodeMetric.RETRIES_ON_READ_TIMEOUT,
DefaultNodeMetric.IGNORES_ON_READ_TIMEOUT);
} else if (error instanceof WriteTimeoutException) {
WriteTimeoutException writeTimeout = (WriteTimeoutException) error;
if (Conversions.resolveIdempotence(statement, context)) {
verdict =
retryPolicy.onWriteTimeoutVerdict(
statement,
writeTimeout.getConsistencyLevel(),
writeTimeout.getWriteType(),
writeTimeout.getBlockFor(),
writeTimeout.getReceived(),
retryCount);
} else {
verdict = RetryVerdict.RETHROW;
}
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.WRITE_TIMEOUTS,
DefaultNodeMetric.RETRIES_ON_WRITE_TIMEOUT,
DefaultNodeMetric.IGNORES_ON_WRITE_TIMEOUT);
} else if (error instanceof UnavailableException) {
UnavailableException unavailable = (UnavailableException) error;
verdict =
retryPolicy.onUnavailableVerdict(
statement,
unavailable.getConsistencyLevel(),
unavailable.getRequired(),
unavailable.getAlive(),
retryCount);
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.UNAVAILABLES,
DefaultNodeMetric.RETRIES_ON_UNAVAILABLE,
DefaultNodeMetric.IGNORES_ON_UNAVAILABLE);
} else {
verdict =
Conversions.resolveIdempotence(statement, context)
? retryPolicy.onErrorResponseVerdict(statement, error, retryCount)
: RetryVerdict.RETHROW;
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.OTHER_ERRORS,
DefaultNodeMetric.RETRIES_ON_OTHER_ERROR,
DefaultNodeMetric.IGNORES_ON_OTHER_ERROR);
}
processRetryVerdict(verdict, error);
}
@SuppressWarnings("GuardedBy")
private void processUnprepared(@NonNull Unprepared errorMessage) {
assert lock.isHeldByCurrentThread();
ByteBuffer idToReprepare = ByteBuffer.wrap(errorMessage.id);
LOG.trace(
"[{}] Statement {} is not prepared on {}, re-preparing",
logPrefix,
Bytes.toHexString(idToReprepare),
node);
RepreparePayload repreparePayload = session.getRepreparePayloads().get(idToReprepare);
if (repreparePayload == null) {
throw new IllegalStateException(
String.format(
"Tried to execute unprepared query %s but we don't have the data to re-prepare it",
Bytes.toHexString(idToReprepare)));
}
Prepare prepare = repreparePayload.toMessage();
Duration timeout = executionProfile.getDuration(DefaultDriverOption.REQUEST_TIMEOUT);
ThrottledAdminRequestHandler.prepare(
channel,
true,
prepare,
repreparePayload.customPayload,
timeout,
throttler,
sessionMetricUpdater,
logPrefix)
.start()
.whenComplete(
(repreparedId, exception) -> {
Throwable fatalError = null;
if (exception == null) {
if (!repreparedId.equals(idToReprepare)) {
IllegalStateException illegalStateException =
new IllegalStateException(
String.format(
"ID mismatch while trying to reprepare (expected %s, got %s). "
+ "This prepared statement won't work anymore. "
+ "This usually happens when you run a 'USE...' query after "
+ "the statement was prepared.",
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
trackNodeError(node, illegalStateException);
fatalError = illegalStateException;
} else {
LOG.trace(
"[{}] Re-prepare successful, retrying on the same node ({})",
logPrefix,
node);
sendRequest(statement, node, executionIndex, retryCount, false);
}
} else {
if (exception instanceof UnexpectedResponseException) {
Message prepareErrorMessage = ((UnexpectedResponseException) exception).message;
if (prepareErrorMessage instanceof Error) {
CoordinatorException prepareError =
DseConversions.toThrowable(node, (Error) prepareErrorMessage, context);
if (prepareError instanceof QueryValidationException
|| prepareError instanceof FunctionFailureException
|| prepareError instanceof ProtocolError) {
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
trackNodeError(node, prepareError);
fatalError = prepareError;
}
}
} else if (exception instanceof RequestThrottlingException) {
trackNodeError(node, exception);
fatalError = exception;
}
if (fatalError == null) {
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
recordError(node, exception);
trackNodeError(node, exception);
sendRequest(statement, null, executionIndex, retryCount, false);
}
}
if (fatalError != null) {
lock.lock();
try {
abort(fatalError, true);
} finally {
lock.unlock();
}
}
});
}
private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwable error) {
assert lock.isHeldByCurrentThread();
LOG.trace("[{}] Processing retry decision {}", logPrefix, verdict);
switch (verdict.getRetryDecision()) {
case RETRY_SAME:
recordError(node, error);
trackNodeError(node, error);
sendRequest(
verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false);
break;
case RETRY_NEXT:
recordError(node, error);
trackNodeError(node, error);
sendRequest(
verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false);
break;
case RETHROW:
trackNodeError(node, error);
abort(error, true);
break;
case IGNORE:
processResultResponse(Void.INSTANCE, null);
break;
}
}
@SuppressWarnings("GuardedBy")
private void enqueueOrCompletePending(@NonNull Object pageOrError) {
assert lock.isHeldByCurrentThread();
if (queue == null) {
if (!chosenCallback.complete(this)) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Trying to enqueue {} but another callback was already chosen, aborting",
logPrefix,
asTraceString(pageOrError));
}
return;
}
queue = new ArrayDeque<>(getMaxEnqueuedPages(statement));
numPagesRequested = protocolBackpressureAvailable ? getMaxEnqueuedPages(statement) : 0;
cancelScheduledTasks(this);
}
if (pendingResult != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Client was waiting on empty queue, completing with {}",
logPrefix,
asTraceString(pageOrError));
}
CompletableFuture<ResultSetT> tmp = pendingResult;
pendingResult = null;
completeResultSetFuture(tmp, pageOrError);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("[{}] Enqueuing {}", logPrefix, asTraceString(pageOrError));
}
queue.add(pageOrError);
if (!protocolBackpressureAvailable
&& queue.size() == getMaxEnqueuedPages(statement)
&& state > 0) {
LOG.trace(
"[{}] Exceeded {} queued response pages, disabling auto-read",
logPrefix,
queue.size());
channel.config().setAutoRead(false);
}
}
}
@NonNull
public CompletableFuture<ResultSetT> dequeueOrCreatePending() {
lock.lock();
try {
assert pendingResult == null;
Object head = null;
if (queue != null) {
head = queue.poll();
if (!protocolBackpressureAvailable
&& head != null
&& queue.size() == getMaxEnqueuedPages(statement) - 1) {
LOG.trace(
"[{}] Back to {} queued response pages, re-enabling auto-read",
logPrefix,
queue.size());
channel.config().setAutoRead(true);
}
maybeRequestMore();
}
if (head != null) {
if (state == STATE_FAILED && !(head instanceof Throwable)) {
LOG.trace(
"[{}] Client requested next page on cancelled queue, discarding page and returning cancelled future",
logPrefix);
return cancelledResultSetFuture();
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Client requested next page on non-empty queue, returning immediate future of {}",
logPrefix,
asTraceString(head));
}
return immediateResultSetFuture(head);
}
} else {
if (state == STATE_FAILED) {
LOG.trace(
"[{}] Client requested next page on cancelled empty queue, returning cancelled future",
logPrefix);
return cancelledResultSetFuture();
} else {
LOG.trace(
"[{}] Client requested next page but queue is empty, installing future", logPrefix);
pendingResult = new CompletableFuture<>();
if (state > 1) {
pageTimeout = schedulePageTimeout(state);
}
return pendingResult;
}
}
} finally {
lock.unlock();
}
}
@SuppressWarnings("GuardedBy")
private void maybeRequestMore() {
assert lock.isHeldByCurrentThread();
if (state < 2 || streamId == -1 || !protocolBackpressureAvailable) {
return;
}
int maxPages = getMaxPages(statement);
if (maxPages > 0 && numPagesRequested >= maxPages) {
return;
}
int received = state - 1;
int requested = numPagesRequested;
int freeSpace = getMaxEnqueuedPages(statement) - queue.size();
int inFlight = requested - received;
int numPagesFittingInQueue = freeSpace - inFlight;
if (numPagesFittingInQueue > 0
&& numPagesFittingInQueue >= getMaxEnqueuedPages(statement) / 2) {
LOG.trace("[{}] Requesting more {} pages", logPrefix, numPagesFittingInQueue);
numPagesRequested = requested + numPagesFittingInQueue;
sendMorePagesRequest(numPagesFittingInQueue);
}
}
@SuppressWarnings("GuardedBy")
private void sendMorePagesRequest(int nextPages) {
assert lock.isHeldByCurrentThread();
assert channel != null : "expected valid connection in order to request more pages";
assert protocolBackpressureAvailable;
assert streamId != -1;
LOG.trace("[{}] Sending request for more pages", logPrefix);
ThrottledAdminRequestHandler.query(
channel,
true,
Revise.requestMoreContinuousPages(streamId, nextPages),
statement.getCustomPayload(),
getReviseRequestTimeout(statement),
throttler,
session.getMetricUpdater(),
logPrefix,
"request " + nextPages + " more pages for id " + streamId)
.start()
.handle(
(result, error) -> {
if (error != null) {
Loggers.warnWithException(
LOG, "[{}] Error requesting more pages, aborting.", logPrefix, error);
lock.lock();
try {
abort(error, false);
} finally {
lock.unlock();
}
}
return null;
});
}
private void cancelTimeout(Timeout timeout) {
if (timeout != null) {
LOG.trace("[{}] Cancelling timeout", logPrefix);
timeout.cancel();
}
}
public void cancel() {
lock.lock();
try {
if (state < 0) {
return;
} else {
LOG.trace(
"[{}] Cancelling continuous paging session with state {} on node {}",
logPrefix,
state,
node);
state = STATE_FAILED;
if (pendingResult != null) {
pendingResult.cancel(true);
}
releaseStreamId();
}
} finally {
lock.unlock();
}
reenableAutoReadIfNeeded();
}
@SuppressWarnings("GuardedBy")
private void releaseStreamId() {
assert lock.isHeldByCurrentThread();
if (streamId >= 0 && !sawLastResponse && !channel.closeFuture().isDone()) {
channel.cancel(this);
sendCancelRequest();
}
}
@SuppressWarnings("GuardedBy")
private void sendCancelRequest() {
assert lock.isHeldByCurrentThread();
LOG.trace("[{}] Sending cancel request", logPrefix);
ThrottledAdminRequestHandler.query(
channel,
true,
Revise.cancelContinuousPaging(streamId),
statement.getCustomPayload(),
getReviseRequestTimeout(statement),
throttler,
session.getMetricUpdater(),
logPrefix,
"cancel request")
.start()
.handle(
(result, error) -> {
if (error != null) {
Loggers.warnWithException(
LOG,
"[{}] Error sending cancel request. "
+ "This is not critical (the request will eventually time out server-side).",
logPrefix,
error);
} else {
LOG.trace("[{}] Continuous paging session cancelled successfully", logPrefix);
}
return null;
});
sentCancelRequest = true;
}
private void reenableAutoReadIfNeeded() {
LOG.trace("[{}] Re-enabling auto-read", logPrefix);
if (!protocolBackpressureAvailable) {
channel.config().setAutoRead(true);
}
}
private void recordError(@NonNull Node node, @NonNull Throwable error) {
errors.add(new AbstractMap.SimpleEntry<>(node, error));
}
private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
if (nodeErrorReported.compareAndSet(false, true)) {
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
context
.getRequestTracker()
.onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix);
}
}
@SuppressWarnings("GuardedBy")
private void abort(@NonNull Throwable error, boolean fromServer) {
assert lock.isHeldByCurrentThread();
LOG.trace(
"[{}] Aborting due to {} ({})",
logPrefix,
error.getClass().getSimpleName(),
error.getMessage());
if (channel == null) {
enqueueOrCompletePending(error);
state = STATE_FAILED;
} else if (state > 0) {
enqueueOrCompletePending(error);
if (fromServer) {
state = STATE_FAILED;
reenableAutoReadIfNeeded();
} else {
cancel();
}
}
stopGlobalRequestTimer();
cancelTimeout(globalTimeout);
}
private void stopNodeMessageTimer() {
if (stopNodeMessageTimerReported.compareAndSet(false, true)) {
((DefaultNode) node)
.getMetricUpdater()
.updateTimer(
messagesMetric,
executionProfile.getName(),
System.nanoTime() - messageStartTimeNanos,
TimeUnit.NANOSECONDS);
}
}
private void stopGlobalRequestTimer() {
session
.getMetricUpdater()
.updateTimer(
continuousRequestsMetric,
null,
System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
}
private void updateErrorMetrics(
@NonNull NodeMetricUpdater metricUpdater,
@NonNull RetryVerdict verdict,
@NonNull DefaultNodeMetric error,
@NonNull DefaultNodeMetric retriesOnError,
@NonNull DefaultNodeMetric ignoresOnError) {
metricUpdater.incrementCounter(error, executionProfile.getName());
switch (verdict.getRetryDecision()) {
case RETRY_SAME:
case RETRY_NEXT:
metricUpdater.incrementCounter(DefaultNodeMetric.RETRIES, executionProfile.getName());
metricUpdater.incrementCounter(retriesOnError, executionProfile.getName());
break;
case IGNORE:
metricUpdater.incrementCounter(DefaultNodeMetric.IGNORES, executionProfile.getName());
metricUpdater.incrementCounter(ignoresOnError, executionProfile.getName());
break;
case RETHROW:
}
}
@NonNull
private CompletableFuture<ResultSetT> immediateResultSetFuture(@NonNull Object pageOrError) {
CompletableFuture<ResultSetT> future = new CompletableFuture<>();
completeResultSetFuture(future, pageOrError);
return future;
}
@NonNull
private CompletableFuture<ResultSetT> cancelledResultSetFuture() {
return immediateResultSetFuture(
new CancellationException(
"Can't get more results because the continuous query has failed already. "
+ "Most likely this is because the query was cancelled"));
}
private void completeResultSetFuture(
@NonNull CompletableFuture<ResultSetT> future, @NonNull Object pageOrError) {
long now = System.nanoTime();
long totalLatencyNanos = now - startTimeNanos;
long nodeLatencyNanos = now - messageStartTimeNanos;
if (resultSetClass.isInstance(pageOrError)) {
if (future.complete(resultSetClass.cast(pageOrError))) {
throttler.signalSuccess(ContinuousRequestHandlerBase.this);
if (nodeSuccessReported.compareAndSet(false, true)) {
context
.getRequestTracker()
.onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix);
}
context
.getRequestTracker()
.onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix);
}
} else {
Throwable error = (Throwable) pageOrError;
if (future.completeExceptionally(error)) {
context
.getRequestTracker()
.onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix);
if (error instanceof DriverTimeoutException) {
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
session
.getMetricUpdater()
.incrementCounter(clientTimeoutsMetric, executionProfile.getName());
} else if (!(error instanceof RequestThrottlingException)) {
throttler.signalError(ContinuousRequestHandlerBase.this, error);
}
}
}
}
@NonNull
private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) {
ByteBuffer pagingState =
result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null;
return new DefaultExecutionInfo(
statement,
node,
startedSpeculativeExecutionsCount.get(),
executionIndex,
errors,
pagingState,
response,
true,
session,
context,
executionProfile);
}
private void logTimeoutSchedulingError(IllegalStateException timeoutError) {
if (!"cannot be started once stopped".equals(timeoutError.getMessage())) {
Loggers.warnWithException(
LOG, "[{}] Error while scheduling timeout", logPrefix, timeoutError);
}
}
@NonNull
private String asTraceString(@NonNull Object pageOrError) {
return resultSetClass.isInstance(pageOrError)
? "page " + pageNumber(resultSetClass.cast(pageOrError))
: ((Exception) pageOrError).getClass().getSimpleName();
}
private int getState() {
lock.lock();
try {
return state;
} finally {
lock.unlock();
}
}
private CompletableFuture<ResultSetT> getPendingResult() {
lock.lock();
try {
return pendingResult;
} finally {
lock.unlock();
}
}
}
}