package com.mongodb.operation;
import com.mongodb.Function;
import com.mongodb.MongoClientException;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoNodeIsRecoveringException;
import com.mongodb.MongoNotPrimaryException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoWriteConcernException;
import com.mongodb.ReadPreference;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.binding.AsyncConnectionSource;
import com.mongodb.binding.AsyncReadBinding;
import com.mongodb.binding.AsyncWriteBinding;
import com.mongodb.binding.ConnectionSource;
import com.mongodb.binding.ReadBinding;
import com.mongodb.binding.WriteBinding;
import com.mongodb.connection.AsyncConnection;
import com.mongodb.connection.Connection;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.operation.WriteConcernHelper;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import com.mongodb.session.SessionContext;
import org.bson.BsonDocument;
import org.bson.FieldNameValidator;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Decoder;
import java.util.List;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.operation.OperationHelper.AsyncCallableWithConnectionAndSource;
import static com.mongodb.operation.OperationHelper.CallableWithConnectionAndSource;
import static com.mongodb.operation.OperationHelper.CallableWithSource;
import static com.mongodb.operation.OperationHelper.LOGGER;
import static com.mongodb.operation.OperationHelper.canRetryRead;
import static com.mongodb.operation.OperationHelper.canRetryWrite;
import static com.mongodb.operation.OperationHelper.releasingCallback;
import static com.mongodb.operation.OperationHelper.withAsyncConnection;
import static com.mongodb.operation.OperationHelper.withAsyncReadConnection;
import static com.mongodb.operation.OperationHelper.withReadConnectionSource;
import static com.mongodb.operation.OperationHelper.withReleasableConnection;
import static java.lang.String.format;
import static java.util.Arrays.asList;
final class CommandOperationHelper {
interface CommandReadTransformer<T, R> {
R apply(T t, ConnectionSource source, Connection connection);
}
interface CommandWriteTransformer<T, R> {
R apply(T t, Connection connection);
}
interface CommandWriteTransformerAsync<T, R> {
R apply(T t, AsyncConnection connection);
}
interface CommandReadTransformerAsync<T, R> {
R apply(T t, AsyncConnectionSource source, AsyncConnection connection);
}
static class IdentityReadTransformer<T> implements CommandReadTransformer<T, T> {
@Override
public T apply(final T t, final ConnectionSource source, final Connection connection) {
return t;
}
}
static class IdentityWriteTransformer<T> implements CommandWriteTransformer<T, T> {
@Override
public T apply(final T t, final Connection connection) {
return t;
}
}
static class IdentityWriteTransformerAsync<T> implements CommandWriteTransformerAsync<T, T> {
@Override
public T apply(final T t, final AsyncConnection connection) {
return t;
}
}
static class IdentityTransformerAsync<T> implements CommandReadTransformerAsync<T, T> {
@Override
public T apply(final T t, final AsyncConnectionSource source, final AsyncConnection connection) {
return t;
}
}
static CommandWriteTransformer<BsonDocument, Void> writeConcernErrorTransformer() {
return new CommandWriteTransformer<BsonDocument, Void>() {
@Override
public Void apply(final BsonDocument result, final Connection connection) {
WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress());
return null;
}
};
}
static CommandWriteTransformerAsync<BsonDocument, Void> writeConcernErrorWriteTransformer() {
return new CommandWriteTransformerAsync<BsonDocument, Void>() {
@Override
public Void apply(final BsonDocument result, final AsyncConnection connection) {
WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress());
return null;
}
};
}
static CommandWriteTransformerAsync<BsonDocument, Void> writeConcernErrorTransformerAsync() {
return new CommandWriteTransformerAsync<BsonDocument, Void>() {
@Override
public Void apply(final BsonDocument result, final AsyncConnection connection) {
WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress());
return null;
}
};
}
static Function<BsonDocument, BsonDocument> noOpRetryCommandModifier() {
return new Function<BsonDocument, BsonDocument>() {
@Override
public BsonDocument apply(final BsonDocument command) {
return command;
}
};
}
interface CommandCreator {
BsonDocument create(ServerDescription serverDescription, ConnectionDescription connectionDescription);
}
static BsonDocument executeCommand(final ReadBinding binding, final String database, final CommandCreator commandCreator,
final boolean retryReads) {
return executeCommand(binding, database, commandCreator, new BsonDocumentCodec(), retryReads);
}
static <T> T executeCommand(final ReadBinding binding, final String database, final CommandCreator commandCreator,
final CommandReadTransformer<BsonDocument, T> transformer, final boolean retryReads) {
return executeCommand(binding, database, commandCreator, new BsonDocumentCodec(), transformer, retryReads);
}
static <T> T executeCommand(final ReadBinding binding, final String database, final CommandCreator commandCreator,
final Decoder<T> decoder, final boolean retryReads) {
return executeCommand(binding, database, commandCreator, decoder, new IdentityReadTransformer<T>(), retryReads);
}
static <D, T> T executeCommand(final ReadBinding binding, final String database, final CommandCreator commandCreator,
final Decoder<D> decoder, final CommandReadTransformer<D, T> transformer, final boolean retryReads) {
return withReadConnectionSource(binding, new CallableWithSource<T>() {
@Override
public T call(final ConnectionSource source) {
return executeCommandWithConnection(binding, source, database, commandCreator, decoder,
transformer, retryReads, source.getConnection());
}
});
}
static <D, T> T executeCommandWithConnection(final ReadBinding binding, final ConnectionSource source, final String database,
final CommandCreator commandCreator, final Decoder<D> decoder,
final CommandReadTransformer<D, T> transformer, final boolean retryReads,
final Connection connection) {
BsonDocument command = null;
MongoException exception;
try {
command = commandCreator.create(source.getServerDescription(), connection.getDescription());
return executeCommand(database, command, decoder, source, connection, binding.getReadPreference(), transformer,
binding.getSessionContext());
} catch (MongoException e) {
exception = e;
if (!shouldAttemptToRetryRead(retryReads, e)) {
if (retryReads) {
logUnableToRetry(command.getFirstKey(), e);
}
throw exception;
}
} finally {
connection.release();
}
final MongoException originalException = exception;
return withReleasableConnection(binding, originalException, new CallableWithConnectionAndSource<T>() {
@Override
public T call(final ConnectionSource source, final Connection connection) {
try {
if (!canRetryRead(source.getServerDescription(), connection.getDescription(), binding.getSessionContext())) {
throw originalException;
}
BsonDocument retryCommand = commandCreator.create(source.getServerDescription(), connection.getDescription());
logRetryExecute(retryCommand.getFirstKey(), originalException);
return executeCommand(database, retryCommand, decoder, source, connection, binding.getReadPreference(), transformer,
binding.getSessionContext());
} finally {
connection.release();
}
}
});
}
static BsonDocument executeCommand(final WriteBinding binding, final String database, final BsonDocument command) {
return executeCommand(binding, database, command, new IdentityWriteTransformer<BsonDocument>());
}
static <T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final Decoder<T> decoder) {
return executeCommand(binding, database, command, decoder, new IdentityWriteTransformer<T>());
}
static <T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final CommandWriteTransformer<BsonDocument, T> transformer) {
return executeCommand(binding, database, command, new BsonDocumentCodec(), transformer);
}
static <D, T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final Decoder<D> decoder, final CommandWriteTransformer<D, T> transformer) {
return executeCommand(binding, database, command, new NoOpFieldNameValidator(), decoder, transformer);
}
static <T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final Connection connection, final CommandWriteTransformer<BsonDocument, T> transformer) {
return executeCommand(binding, database, command, new BsonDocumentCodec(), connection, transformer);
}
static <T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final Decoder<BsonDocument> decoder, final Connection connection,
final CommandWriteTransformer<BsonDocument, T> transformer) {
notNull("binding", binding);
return executeWriteCommand(database, command, decoder, connection, primary(), transformer, binding.getSessionContext());
}
static <T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator, final Decoder<BsonDocument> decoder,
final Connection connection, final CommandWriteTransformer<BsonDocument, T> transformer) {
notNull("binding", binding);
return executeWriteCommand(database, command, fieldNameValidator, decoder, connection, primary(), transformer,
binding.getSessionContext());
}
static <D, T> T executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator, final Decoder<D> decoder,
final CommandWriteTransformer<D, T> transformer) {
return withReleasableConnection(binding, new CallableWithConnectionAndSource<T>() {
@Override
public T call(final ConnectionSource source, final Connection connection) {
try {
return transformer.apply(executeCommand(database, command, fieldNameValidator, decoder,
source, connection, primary()), connection);
} finally {
connection.release();
}
}
});
}
static BsonDocument executeCommand(final WriteBinding binding, final String database, final BsonDocument command,
final Connection connection) {
notNull("binding", binding);
return executeWriteCommand(database, command, new BsonDocumentCodec(), connection, primary(),
binding.getSessionContext());
}
private static <T> T executeCommand(final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator, final Decoder<T> decoder,
final ConnectionSource source, final Connection connection,
final ReadPreference readPreference) {
return executeCommand(database, command, fieldNameValidator, decoder, source, connection,
readPreference, new IdentityReadTransformer<T>(), source.getSessionContext());
}
private static <D, T> T executeCommand(final String database, final BsonDocument command,
final Decoder<D> decoder, final ConnectionSource source, final Connection connection,
final ReadPreference readPreference,
final CommandReadTransformer<D, T> transformer, final SessionContext sessionContext) {
return executeCommand(database, command, new NoOpFieldNameValidator(), decoder, source, connection,
readPreference, transformer, sessionContext);
}
private static <D, T> T executeCommand(final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator, final Decoder<D> decoder,
final ConnectionSource source, final Connection connection, final ReadPreference readPreference,
final CommandReadTransformer<D, T> transformer, final SessionContext sessionContext) {
return transformer.apply(connection.command(database, command, fieldNameValidator, readPreference, decoder, sessionContext),
source, connection);
}
private static <T> T executeWriteCommand(final String database, final BsonDocument command,
final Decoder<T> decoder, final Connection connection,
final ReadPreference readPreference, final SessionContext sessionContext) {
return executeWriteCommand(database, command, new NoOpFieldNameValidator(), decoder, connection,
readPreference, new IdentityWriteTransformer<T>(), sessionContext);
}
private static <D, T> T executeWriteCommand(final String database, final BsonDocument command,
final Decoder<D> decoder, final Connection connection,
final ReadPreference readPreference,
final CommandWriteTransformer<D, T> transformer, final SessionContext sessionContext) {
return executeWriteCommand(database, command, new NoOpFieldNameValidator(), decoder, connection,
readPreference, transformer, sessionContext);
}
private static <D, T> T executeWriteCommand(final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator, final Decoder<D> decoder,
final Connection connection, final ReadPreference readPreference,
final CommandWriteTransformer<D, T> transformer, final SessionContext sessionContext) {
return transformer.apply(connection.command(database, command, fieldNameValidator, readPreference, decoder, sessionContext),
connection);
}
static void executeCommandAsync(final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
final boolean retryReads,
final SingleResultCallback<BsonDocument> callback) {
executeCommandAsync(binding, database, commandCreator, new BsonDocumentCodec(), retryReads, callback);
}
static <T> void executeCommandAsync(final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
final Decoder<T> decoder,
final boolean retryReads,
final SingleResultCallback<T> callback) {
executeCommandAsync(binding, database, commandCreator, decoder, new IdentityTransformerAsync<T>(), retryReads, callback);
}
static <T> void executeCommandAsync(final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
final CommandReadTransformerAsync<BsonDocument, T> transformer,
final boolean retryReads,
final SingleResultCallback<T> callback) {
executeCommandAsync(binding, database, commandCreator, new BsonDocumentCodec(), transformer, retryReads, callback);
}
static <D, T> void executeCommandAsync(final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
final Decoder<D> decoder,
final CommandReadTransformerAsync<D, T> transformer,
final boolean retryReads,
final SingleResultCallback<T> originalCallback) {
final SingleResultCallback<T> errorHandlingCallback = errorHandlingCallback(originalCallback, LOGGER);
withAsyncReadConnection(binding, new AsyncCallableWithConnectionAndSource() {
@Override
public void call(final AsyncConnectionSource source, final AsyncConnection connection, final Throwable t) {
if (t != null) {
releasingCallback(errorHandlingCallback, source, connection).onResult(null, t);
} else {
executeCommandAsyncWithConnection(binding, source, database, commandCreator, decoder, transformer,
retryReads, connection, errorHandlingCallback);
}
}
});
}
static <D, T> void executeCommandAsync(final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
final Decoder<D> decoder,
final CommandReadTransformerAsync<D, T> transformer,
final boolean retryReads,
final AsyncConnection connection,
final SingleResultCallback<T> originalCallback) {
final SingleResultCallback<T> errorHandlingCallback = errorHandlingCallback(originalCallback, LOGGER);
binding.getReadConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {
@Override
public void onResult(final AsyncConnectionSource source, final Throwable t) {
executeCommandAsyncWithConnection(binding, source, database, commandCreator, decoder, transformer, retryReads,
connection, errorHandlingCallback);
}
});
}
static <D, T> void executeCommandAsyncWithConnection(final AsyncReadBinding binding,
final AsyncConnectionSource source,
final String database,
final CommandCreator commandCreator,
final Decoder<D> decoder,
final CommandReadTransformerAsync<D, T> transformer,
final boolean retryReads,
final AsyncConnection connection,
final SingleResultCallback<T> callback) {
try {
BsonDocument command = commandCreator.create(source.getServerDescription(), connection.getDescription());
connection.commandAsync(database, command, new NoOpFieldNameValidator(), binding.getReadPreference(), decoder,
binding.getSessionContext(),
createCommandCallback(binding, source, connection, database, binding.getReadPreference(),
command, commandCreator, new NoOpFieldNameValidator(), decoder, transformer, retryReads, callback));
} catch (IllegalArgumentException e) {
connection.release();
callback.onResult(null, e);
}
}
private static <T, R> SingleResultCallback<T> createCommandCallback(final AsyncReadBinding binding,
final AsyncConnectionSource oldSource,
final AsyncConnection oldConnection,
final String database,
final ReadPreference readPreference,
final BsonDocument originalCommand,
final CommandCreator commandCreator,
final FieldNameValidator fieldNameValidator,
final Decoder<T> commandResultDecoder,
final CommandReadTransformerAsync<T, R> transformer,
final boolean retryReads,
final SingleResultCallback<R> callback) {
return new SingleResultCallback<T>() {
@Override
public void onResult(final T result, final Throwable originalError) {
SingleResultCallback<R> releasingCallback = releasingCallback(callback, oldSource, oldConnection);
if (originalError != null) {
checkRetryableException(originalError, releasingCallback);
} else {
try {
releasingCallback.onResult(transformer.apply(result, oldSource, oldConnection), null);
} catch (Throwable transformError) {
checkRetryableException(transformError, releasingCallback);
}
}
}
private void checkRetryableException(final Throwable originalError, final SingleResultCallback<R> callback) {
if (!shouldAttemptToRetryRead(retryReads, originalError)) {
if (retryReads) {
logUnableToRetry(originalCommand.getFirstKey(), originalError);
}
callback.onResult(null, originalError);
} else {
oldSource.release();
oldConnection.release();
retryableCommand(originalError);
}
}
private void retryableCommand(final Throwable originalError) {
withAsyncReadConnection(binding, new AsyncCallableWithConnectionAndSource() {
@Override
public void call(final AsyncConnectionSource source, final AsyncConnection connection, final Throwable t) {
if (t != null) {
callback.onResult(null, originalError);
} else if (!canRetryRead(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext())) {
releasingCallback(callback, source, connection).onResult(null, originalError);
} else {
BsonDocument retryCommand = commandCreator.create(source.getServerDescription(), connection.getDescription());
logRetryExecute(retryCommand.getFirstKey(), originalError);
connection.commandAsync(database, retryCommand, fieldNameValidator, readPreference,
commandResultDecoder, binding.getSessionContext(),
new TransformingReadResultCallback<T, R>(transformer, source, connection,
releasingCallback(callback, source, connection)));
}
}
});
}
};
}
static class TransformingReadResultCallback<T, R> implements SingleResultCallback<T> {
private final CommandReadTransformerAsync<T, R> transformer;
private final AsyncConnectionSource source;
private final AsyncConnection connection;
private final SingleResultCallback<R> callback;
TransformingReadResultCallback(final CommandReadTransformerAsync<T, R> transformer, final AsyncConnectionSource source,
final AsyncConnection connection, final SingleResultCallback<R> callback) {
this.transformer = transformer;
this.source = source;
this.connection = connection;
this.callback = callback;
}
@Override
public void onResult(final T result, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
try {
R transformedResult = transformer.apply(result, source, connection);
callback.onResult(transformedResult, null);
} catch (Throwable transformError) {
callback.onResult(null, transformError);
}
}
}
}
static void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final SingleResultCallback<BsonDocument> callback) {
executeCommandAsync(binding, database, command, new BsonDocumentCodec(), callback);
}
static <T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final Decoder<T> decoder,
final SingleResultCallback<T> callback) {
executeCommandAsync(binding, database, command, decoder, new IdentityWriteTransformerAsync<T>(), callback);
}
static <T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final CommandWriteTransformerAsync<BsonDocument, T> transformer,
final SingleResultCallback<T> callback) {
executeCommandAsync(binding, database, command, new BsonDocumentCodec(), transformer, callback);
}
static <D, T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database, final BsonDocument command,
final Decoder<D> decoder,
final CommandWriteTransformerAsync<D, T> transformer,
final SingleResultCallback<T> callback) {
executeCommandAsync(binding, database, command, new NoOpFieldNameValidator(), decoder, transformer, callback);
}
static <T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final Decoder<BsonDocument> decoder,
final AsyncConnection connection,
final CommandWriteTransformerAsync<BsonDocument, T> transformer,
final SingleResultCallback<T> callback) {
notNull("binding", binding);
executeCommandAsync(database, command, decoder, connection, primary(), transformer, binding.getSessionContext(),
callback);
}
static <T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final FieldNameValidator fieldNameValidator,
final Decoder<BsonDocument> decoder,
final AsyncConnection connection,
final CommandWriteTransformerAsync<BsonDocument, T> transformer,
final SingleResultCallback<T> callback) {
notNull("binding", binding);
executeCommandAsync(database, command, fieldNameValidator, decoder, connection, primary(), transformer,
binding.getSessionContext(), callback);
}
static <D, T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator,
final Decoder<D> decoder,
final CommandWriteTransformerAsync<D, T> transformer,
final SingleResultCallback<T> callback) {
binding.getWriteConnectionSource(new CommandProtocolExecutingCallback<D, T>(database, command, fieldNameValidator, decoder,
primary(), transformer, binding.getSessionContext(), errorHandlingCallback(callback, LOGGER)));
}
static void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final AsyncConnection connection,
final SingleResultCallback<BsonDocument> callback) {
executeCommandAsync(binding, database, command, connection, new IdentityWriteTransformerAsync<BsonDocument>(), callback);
}
static <T> void executeCommandAsync(final AsyncWriteBinding binding,
final String database,
final BsonDocument command,
final AsyncConnection connection,
final CommandWriteTransformerAsync<BsonDocument, T> transformer,
final SingleResultCallback<T> callback) {
notNull("binding", binding);
executeCommandAsync(database, command, new BsonDocumentCodec(), connection, primary(), transformer,
binding.getSessionContext(), callback);
}
private static <D, T> void executeCommandAsync(final String database, final BsonDocument command,
final Decoder<D> decoder, final AsyncConnection connection,
final ReadPreference readPreference,
final CommandWriteTransformerAsync<D, T> transformer,
final SessionContext sessionContext,
final SingleResultCallback<T> callback) {
connection.commandAsync(database, command, new NoOpFieldNameValidator(), readPreference, decoder, sessionContext,
new SingleResultCallback<D>() {
@Override
public void onResult(final D result, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
try {
T transformedResult = transformer.apply(result, connection);
callback.onResult(transformedResult, null);
} catch (Exception e) {
callback.onResult(null, e);
}
}
}
});
}
private static <D, T> void executeCommandAsync(final String database, final BsonDocument command,
final FieldNameValidator fieldNameValidator,
final Decoder<D> decoder, final AsyncConnection connection,
final ReadPreference readPreference,
final CommandWriteTransformerAsync<D, T> transformer,
final SessionContext sessionContext,
final SingleResultCallback<T> callback) {
connection.commandAsync(database, command, fieldNameValidator, readPreference, decoder, sessionContext, true, null, null,
new SingleResultCallback<D>() {
@Override
public void onResult(final D result, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
try {
T transformedResult = transformer.apply(result, connection);
callback.onResult(transformedResult, null);
} catch (Exception e) {
callback.onResult(null, e);
}
}
}
});
}
static <T, R> R executeRetryableCommand(final WriteBinding binding, final String database, final ReadPreference readPreference,
final FieldNameValidator fieldNameValidator, final Decoder<T> commandResultDecoder,
final CommandCreator commandCreator, final CommandWriteTransformer<T, R> transformer) {
return executeRetryableCommand(binding, database, readPreference, fieldNameValidator, commandResultDecoder, commandCreator,
transformer, noOpRetryCommandModifier());
}
static <T, R> R executeRetryableCommand(final WriteBinding binding, final String database, final ReadPreference readPreference,
final FieldNameValidator fieldNameValidator, final Decoder<T> commandResultDecoder,
final CommandCreator commandCreator, final CommandWriteTransformer<T, R> transformer,
final Function<BsonDocument, BsonDocument> retryCommandModifier) {
return withReleasableConnection(binding, new CallableWithConnectionAndSource<R>() {
@Override
public R call(final ConnectionSource source, final Connection connection) {
BsonDocument command = null;
MongoException exception;
try {
command = commandCreator.create(source.getServerDescription(), connection.getDescription());
return transformer.apply(connection.command(database, command, fieldNameValidator, readPreference,
commandResultDecoder, binding.getSessionContext()), connection);
} catch (MongoException e) {
exception = e;
if (!shouldAttemptToRetryWrite(command, e)) {
if (isRetryWritesEnabled(command)) {
logUnableToRetry(command.getFirstKey(), e);
}
throw transformWriteException(exception);
}
} finally {
connection.release();
}
if (binding.getSessionContext().hasActiveTransaction()) {
binding.getSessionContext().unpinServerAddress();
}
final BsonDocument originalCommand = command;
final MongoException originalException = exception;
return withReleasableConnection(binding, originalException, new CallableWithConnectionAndSource<R>() {
@Override
public R call(final ConnectionSource source, final Connection connection) {
try {
if (!canRetryWrite(source.getServerDescription(), connection.getDescription(), binding.getSessionContext())) {
throw originalException;
}
BsonDocument retryCommand = retryCommandModifier.apply(originalCommand);
logRetryExecute(retryCommand.getFirstKey(), originalException);
return transformer.apply(connection.command(database, retryCommand, fieldNameValidator,
readPreference, commandResultDecoder, binding.getSessionContext()),
connection);
} finally {
connection.release();
}
}
});
}
});
}
static <T, R> void executeRetryableCommand(final AsyncWriteBinding binding, final String database, final ReadPreference readPreference,
final FieldNameValidator fieldNameValidator, final Decoder<T> commandResultDecoder,
final CommandCreator commandCreator,
final CommandWriteTransformerAsync<T, R> transformer,
final SingleResultCallback<R> originalCallback) {
executeRetryableCommand(binding, database, readPreference, fieldNameValidator, commandResultDecoder, commandCreator, transformer,
noOpRetryCommandModifier(), originalCallback);
}
static <T, R> void executeRetryableCommand(final AsyncWriteBinding binding, final String database, final ReadPreference readPreference,
final FieldNameValidator fieldNameValidator, final Decoder<T> commandResultDecoder,
final CommandCreator commandCreator,
final CommandWriteTransformerAsync<T, R> transformer,
final Function<BsonDocument, BsonDocument> retryCommandModifier,
final SingleResultCallback<R> originalCallback) {
final SingleResultCallback<R> errorHandlingCallback = errorHandlingCallback(originalCallback, LOGGER);
binding.getWriteConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {
@Override
public void onResult(final AsyncConnectionSource source, final Throwable t) {
if (t != null) {
errorHandlingCallback.onResult(null, t);
} else {
source.getConnection(new SingleResultCallback<AsyncConnection>() {
@Override
public void onResult(final AsyncConnection connection, final Throwable t) {
if (t != null) {
releasingCallback(errorHandlingCallback, source).onResult(null, t);
} else {
try {
BsonDocument command = commandCreator.create(source.getServerDescription(),
connection.getDescription());
connection.commandAsync(database, command, fieldNameValidator, readPreference,
commandResultDecoder, binding.getSessionContext(),
createCommandCallback(binding, source, connection, database, readPreference,
command, fieldNameValidator, commandResultDecoder, transformer,
retryCommandModifier, errorHandlingCallback));
} catch (Throwable t1) {
releasingCallback(errorHandlingCallback, source, connection).onResult(null, t1);
}
}
}
});
}
}
});
}
private static <T, R> SingleResultCallback<T> createCommandCallback(final AsyncWriteBinding binding,
final AsyncConnectionSource oldSource,
final AsyncConnection oldConnection,
final String database,
final ReadPreference readPreference,
final BsonDocument command,
final FieldNameValidator fieldNameValidator,
final Decoder<T> commandResultDecoder,
final CommandWriteTransformerAsync<T, R> transformer,
final Function<BsonDocument, BsonDocument> retryCommandModifier,
final SingleResultCallback<R> callback) {
return new SingleResultCallback<T>() {
@Override
public void onResult(final T result, final Throwable originalError) {
SingleResultCallback<R> releasingCallback = releasingCallback(callback, oldSource, oldConnection);
if (originalError != null) {
checkRetryableException(originalError, releasingCallback);
} else {
try {
releasingCallback.onResult(transformer.apply(result, oldConnection), null);
} catch (Throwable transformError) {
checkRetryableException(transformError, releasingCallback);
}
}
}
private void checkRetryableException(final Throwable originalError, final SingleResultCallback<R> releasingCallback) {
if (!shouldAttemptToRetryWrite(command, originalError)) {
if (isRetryWritesEnabled(command)) {
logUnableToRetry(command.getFirstKey(), originalError);
}
releasingCallback.onResult(null, originalError instanceof MongoException
? transformWriteException((MongoException) originalError) : originalError);
} else {
oldConnection.release();
oldSource.release();
if (binding.getSessionContext().hasActiveTransaction()) {
binding.getSessionContext().unpinServerAddress();
}
retryableCommand(originalError);
}
}
private void retryableCommand(final Throwable originalError) {
final BsonDocument retryCommand = retryCommandModifier.apply(command);
logRetryExecute(retryCommand.getFirstKey(), originalError);
withAsyncConnection(binding, new AsyncCallableWithConnectionAndSource() {
@Override
public void call(final AsyncConnectionSource source, final AsyncConnection connection, final Throwable t) {
if (t != null) {
callback.onResult(null, originalError);
} else if (!canRetryWrite(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext())) {
releasingCallback(callback, source, connection).onResult(null, originalError);
} else {
connection.commandAsync(database, retryCommand, fieldNameValidator, readPreference,
commandResultDecoder, binding.getSessionContext(),
new TransformingWriteResultCallback<T, R>(transformer, connection,
releasingCallback(callback, source, connection)));
}
}
});
}
};
}
static class TransformingWriteResultCallback<T, R> implements SingleResultCallback<T> {
private final CommandWriteTransformerAsync<T, R> transformer;
private final AsyncConnection connection;
private final SingleResultCallback<R> callback;
TransformingWriteResultCallback(final CommandWriteTransformerAsync<T, R> transformer,
final AsyncConnection connection, final SingleResultCallback<R> callback) {
this.transformer = transformer;
this.connection = connection;
this.callback = callback;
}
@Override
public void onResult(final T result, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
try {
R transformedResult = transformer.apply(result, connection);
callback.onResult(transformedResult, null);
} catch (Throwable transformError) {
callback.onResult(null, transformError);
}
}
}
}
private static final List<Integer> RETRYABLE_ERROR_CODES = asList(6, 7, 89, 91, 189, 9001, 13436, 13435, 11602, 11600, 10107);
static boolean isRetryableException(final Throwable t) {
if (!(t instanceof MongoException)) {
return false;
}
if (t instanceof MongoSocketException || t instanceof MongoNotPrimaryException || t instanceof MongoNodeIsRecoveringException) {
return true;
}
String errorMessage = t.getMessage();
if (t instanceof MongoWriteConcernException) {
errorMessage = ((MongoWriteConcernException) t).getWriteConcernError().getMessage();
}
if (errorMessage.contains("not master") || errorMessage.contains("node is recovering")) {
return true;
}
return RETRYABLE_ERROR_CODES.contains(((MongoException) t).getCode());
}
static void rethrowIfNotNamespaceError(final MongoCommandException e) {
rethrowIfNotNamespaceError(e, null);
}
static <T> T rethrowIfNotNamespaceError(final MongoCommandException e, final T defaultValue) {
if (!isNamespaceError(e)) {
throw e;
}
return defaultValue;
}
static boolean isNamespaceError(final Throwable t) {
if (t instanceof MongoCommandException) {
MongoCommandException e = (MongoCommandException) t;
return (e.getErrorMessage().contains("ns not found") || e.getErrorCode() == 26);
} else {
return false;
}
}
private static class CommandProtocolExecutingCallback<D, R> implements SingleResultCallback<AsyncConnectionSource> {
private final String database;
private final BsonDocument command;
private final Decoder<D> decoder;
private final ReadPreference readPreference;
private final FieldNameValidator fieldNameValidator;
private final CommandWriteTransformerAsync<D, R> transformer;
private final SingleResultCallback<R> callback;
private final SessionContext sessionContext;
CommandProtocolExecutingCallback(final String database, final BsonDocument command, final FieldNameValidator fieldNameValidator,
final Decoder<D> decoder, final ReadPreference readPreference,
final CommandWriteTransformerAsync<D, R> transformer, final SessionContext sessionContext,
final SingleResultCallback<R> callback) {
this.database = database;
this.command = command;
this.fieldNameValidator = fieldNameValidator;
this.decoder = decoder;
this.readPreference = readPreference;
this.transformer = transformer;
this.sessionContext = sessionContext;
this.callback = callback;
}
@Override
public void onResult(final AsyncConnectionSource source, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
source.getConnection(new SingleResultCallback<AsyncConnection>() {
@Override
public void onResult(final AsyncConnection connection, final Throwable t) {
if (t != null) {
callback.onResult(null, t);
} else {
final SingleResultCallback<R> wrappedCallback = releasingCallback(callback, source, connection);
connection.commandAsync(database, command, fieldNameValidator, readPreference, decoder, sessionContext,
new SingleResultCallback<D>() {
@Override
public void onResult(final D response, final Throwable t) {
if (t != null) {
wrappedCallback.onResult(null, t);
} else {
wrappedCallback.onResult(transformer.apply(response, connection), null);
}
}
});
}
}
});
}
}
}
private static boolean shouldAttemptToRetryRead(final boolean retryReadsEnabled, final Throwable exception) {
return retryReadsEnabled && isRetryableException(exception);
}
private static boolean shouldAttemptToRetryWrite(@Nullable final BsonDocument command, final Throwable exception) {
return isRetryWritesEnabled(command) && isRetryableException(exception);
}
private static boolean isRetryWritesEnabled(@Nullable final BsonDocument command) {
return (command != null && (command.containsKey("txnNumber")
|| command.getFirstKey().equals("commitTransaction") || command.getFirstKey().equals("abortTransaction")));
}
static boolean shouldAttemptToRetryWrite(final boolean retryWritesEnabled, final Throwable exception) {
return retryWritesEnabled && isRetryableException(exception);
}
static void logRetryExecute(final String operation, final Throwable originalError) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Retrying operation %s due to an error \"%s\"", operation, originalError));
}
}
static void logUnableToRetry(final String operation, final Throwable originalError) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Unable to retry operation %s due to error \"%s\"", operation, originalError));
}
}
static MongoException transformWriteException(final MongoException exception) {
if (exception.getCode() == 20 && exception.getMessage().contains("Transaction numbers")) {
return new MongoClientException("This MongoDB deployment does not support retryable writes. "
+ "Please add retryWrites=false to your connection string.", exception);
}
return exception;
}
private CommandOperationHelper() {
}
}