package com.mongodb.internal.connection;
import com.mongodb.MongoNamespace;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.connection.ByteBufferBsonOutput;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.QueryResult;
import com.mongodb.diagnostics.logging.Logger;
import com.mongodb.diagnostics.logging.Loggers;
import com.mongodb.event.CommandListener;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Decoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.mongodb.internal.connection.ProtocolHelper.encodeMessageWithMetadata;
import static com.mongodb.internal.connection.ProtocolHelper.getMessageSettings;
import static com.mongodb.internal.connection.ProtocolHelper.getQueryFailureException;
import static com.mongodb.internal.connection.ProtocolHelper.sendCommandFailedEvent;
import static com.mongodb.internal.connection.ProtocolHelper.sendCommandStartedEvent;
import static com.mongodb.internal.connection.ProtocolHelper.sendCommandSucceededEvent;
import static java.lang.String.format;
class QueryProtocol<T> implements LegacyProtocol<QueryResult<T>> {
public static final Logger LOGGER = Loggers.getLogger("protocol.query");
private static final String FIND_COMMAND_NAME = "find";
private static final String EXPLAIN_COMMAND_NAME = "explain";
private final int skip;
private final int limit;
private final int batchSize;
private final int numberToReturn;
private final boolean withLimitAndBatchSize;
private final BsonDocument queryDocument;
private final BsonDocument fields;
private final Decoder<T> resultDecoder;
private final MongoNamespace namespace;
private boolean tailableCursor;
private boolean slaveOk;
private boolean oplogReplay;
private boolean noCursorTimeout;
private boolean awaitData;
private boolean partial;
private CommandListener commandListener;
QueryProtocol(final MongoNamespace namespace, final int skip, final int numberToReturn, final BsonDocument queryDocument,
final BsonDocument fields, final Decoder<T> resultDecoder) {
this.namespace = namespace;
this.skip = skip;
this.withLimitAndBatchSize = false;
this.numberToReturn = numberToReturn;
this.limit = 0;
this.batchSize = 0;
this.queryDocument = queryDocument;
this.fields = fields;
this.resultDecoder = resultDecoder;
}
QueryProtocol(final MongoNamespace namespace, final int skip, final int limit, final int batchSize,
final BsonDocument queryDocument, final BsonDocument fields, final Decoder<T> resultDecoder) {
this.namespace = namespace;
this.skip = skip;
this.withLimitAndBatchSize = true;
this.numberToReturn = 0;
this.limit = limit;
this.batchSize = batchSize;
this.queryDocument = queryDocument;
this.fields = fields;
this.resultDecoder = resultDecoder;
}
public void setCommandListener(final CommandListener commandListener) {
this.commandListener = commandListener;
}
public CommandListener getCommandListener() {
return commandListener;
}
public boolean isTailableCursor() {
return tailableCursor;
}
public QueryProtocol<T> tailableCursor(final boolean tailableCursor) {
this.tailableCursor = tailableCursor;
return this;
}
public boolean isSlaveOk() {
return slaveOk;
}
public QueryProtocol<T> slaveOk(final boolean slaveOk) {
this.slaveOk = slaveOk;
return this;
}
public boolean isOplogReplay() {
return oplogReplay;
}
public QueryProtocol<T> oplogReplay(final boolean oplogReplay) {
this.oplogReplay = oplogReplay;
return this;
}
public boolean isNoCursorTimeout() {
return noCursorTimeout;
}
public QueryProtocol<T> noCursorTimeout(final boolean noCursorTimeout) {
this.noCursorTimeout = noCursorTimeout;
return this;
}
public boolean isAwaitData() {
return awaitData;
}
public QueryProtocol<T> awaitData(final boolean awaitData) {
this.awaitData = awaitData;
return this;
}
public boolean isPartial() {
return partial;
}
public QueryProtocol<T> partial(final boolean partial) {
this.partial = partial;
return this;
}
@Override
public QueryResult<T> execute(final InternalConnection connection) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Sending query of namespace %s on connection [%s] to server %s", namespace,
connection.getDescription().getConnectionId(), connection.getDescription().getServerAddress()));
}
long startTimeNanos = System.nanoTime();
QueryMessage message = null;
try {
boolean isExplain = false;
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(connection);
try {
message = createQueryMessage(connection.getDescription());
message.encode(bsonOutput, NoOpSessionContext.INSTANCE);
isExplain = sendQueryStartedEvent(connection, message, bsonOutput, message.getEncodingMetadata());
connection.sendMessage(bsonOutput.getByteBuffers(), message.getId());
} finally {
bsonOutput.close();
}
ResponseBuffers responseBuffers = connection.receiveMessage(message.getId());
try {
if (responseBuffers.getReplyHeader().isQueryFailure()) {
BsonDocument errorDocument = new ReplyMessage<BsonDocument>(responseBuffers,
new BsonDocumentCodec(),
message.getId()).getDocuments().get(0);
throw getQueryFailureException(errorDocument, connection.getDescription().getServerAddress());
}
ReplyMessage<T> replyMessage = new ReplyMessage<T>(responseBuffers, resultDecoder, message.getId());
QueryResult<T> result = new QueryResult<T>(namespace, replyMessage.getDocuments(),
replyMessage.getReplyHeader().getCursorId(), connection.getDescription().getServerAddress());
sendQuerySucceededEvent(connection.getDescription(), startTimeNanos, message, isExplain, responseBuffers, result);
LOGGER.debug("Query completed");
return result;
} finally {
responseBuffers.close();
}
} catch (RuntimeException e) {
if (commandListener != null) {
sendCommandFailedEvent(message, FIND_COMMAND_NAME, connection.getDescription(), System.nanoTime() - startTimeNanos, e,
commandListener);
}
throw e;
}
}
@Override
public void executeAsync(final InternalConnection connection, final SingleResultCallback<QueryResult<T>> callback) {
long startTimeNanos = System.nanoTime();
QueryMessage message = createQueryMessage(connection.getDescription());
boolean sentStartedEvent = true;
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Asynchronously sending query of namespace %s on connection [%s] to server %s", namespace,
connection.getDescription().getConnectionId(), connection.getDescription().getServerAddress()));
}
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(connection);
RequestMessage.EncodingMetadata metadata = encodeMessageWithMetadata(message, bsonOutput);
boolean isExplainEvent = sendQueryStartedEvent(connection, message, bsonOutput, metadata);
sentStartedEvent = true;
SingleResultCallback<ResponseBuffers> receiveCallback = new QueryResultCallback(callback, message.getId(),
startTimeNanos, message, isExplainEvent, connection.getDescription());
connection.sendMessageAsync(bsonOutput.getByteBuffers(), message.getId(),
new SendMessageCallback<QueryResult<T>>(connection, bsonOutput, message,
getCommandName(isExplainEvent), startTimeNanos, commandListener, callback,
receiveCallback));
} catch (Throwable t) {
if (commandListener != null) {
sendCommandFailedEvent(message, FIND_COMMAND_NAME, connection.getDescription(), System.nanoTime() - startTimeNanos, t,
commandListener);
}
callback.onResult(null, t);
}
}
private boolean sendQueryStartedEvent(final InternalConnection connection, final QueryMessage message,
final ByteBufferBsonOutput bsonOutput, final RequestMessage.EncodingMetadata metadata) {
boolean isExplainEvent = false;
if (commandListener != null) {
BsonDocument command = asFindCommandDocument(bsonOutput, metadata.getFirstDocumentPosition());
isExplainEvent = command.keySet().iterator().next().equals(EXPLAIN_COMMAND_NAME);
sendCommandStartedEvent(message, namespace.getDatabaseName(),
getCommandName(isExplainEvent),
command,
connection.getDescription(), commandListener);
}
return isExplainEvent;
}
private String getCommandName(final boolean isExplainEvent) {
return isExplainEvent ? EXPLAIN_COMMAND_NAME : FIND_COMMAND_NAME;
}
private void sendQuerySucceededEvent(final ConnectionDescription connectionDescription, final long startTimeNanos,
final QueryMessage message,
final boolean isExplainEvent, final ResponseBuffers responseBuffers,
final QueryResult<T> queryResult) {
if (commandListener != null) {
BsonDocument response = asFindCommandResponseDocument(responseBuffers, queryResult, isExplainEvent);
sendCommandSucceededEvent(message, getCommandName(isExplainEvent), response, connectionDescription,
System.nanoTime() - startTimeNanos, commandListener);
}
}
private QueryMessage createQueryMessage(final ConnectionDescription connectionDescription) {
return (QueryMessage) new QueryMessage(namespace.getFullName(), skip, getNumberToReturn(), queryDocument, fields,
getMessageSettings(connectionDescription))
.tailableCursor(isTailableCursor())
.slaveOk(isSlaveOk())
.oplogReplay(isOplogReplay())
.noCursorTimeout(isNoCursorTimeout())
.awaitData(isAwaitData())
.partial(isPartial());
}
private int getNumberToReturn() {
if (withLimitAndBatchSize) {
if (limit < 0) {
return limit;
} else if (limit == 0) {
return batchSize;
} else if (batchSize == 0) {
return limit;
} else if (limit < Math.abs(batchSize)) {
return limit;
} else {
return batchSize;
}
} else {
return numberToReturn;
}
}
private static final Map<String, String> META_OPERATOR_TO_COMMAND_FIELD_MAP = new HashMap<String, String>();
static {
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$query", "filter");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$orderby", "sort");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$hint", "hint");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$comment", "comment");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$maxScan", "maxScan");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$maxTimeMS", "maxTimeMS");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$max", "max");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$min", "min");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$returnKey", "returnKey");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$showDiskLoc", "showRecordId");
META_OPERATOR_TO_COMMAND_FIELD_MAP.put("$snapshot", "snapshot");
}
private BsonDocument asFindCommandDocument(final ByteBufferBsonOutput bsonOutput, final int firstDocumentPosition) {
BsonDocument command = new BsonDocument(FIND_COMMAND_NAME, new BsonString(namespace.getCollectionName()));
boolean isExplain = false;
List<ByteBufBsonDocument> documents = ByteBufBsonDocument.createList(bsonOutput, firstDocumentPosition);
ByteBufBsonDocument rawQueryDocument = documents.get(0);
for (Map.Entry<String, BsonValue> cur : rawQueryDocument.entrySet()) {
String commandFieldName = META_OPERATOR_TO_COMMAND_FIELD_MAP.get(cur.getKey());
if (commandFieldName != null) {
command.append(commandFieldName, cur.getValue());
} else if (cur.getKey().equals("$explain")) {
isExplain = true;
}
}
if (command.size() == 1) {
command.append("filter", rawQueryDocument);
}
if (documents.size() == 2) {
command.append("projection", documents.get(1));
}
if (skip != 0) {
command.append("skip", new BsonInt32(skip));
}
if (withLimitAndBatchSize) {
if (limit != 0) {
command.append("limit", new BsonInt32(limit));
}
if (batchSize != 0) {
command.append("batchSize", new BsonInt32(batchSize));
}
}
if (tailableCursor) {
command.append("tailable", BsonBoolean.valueOf(tailableCursor));
}
if (noCursorTimeout) {
command.append("noCursorTimeout", BsonBoolean.valueOf(noCursorTimeout));
}
if (oplogReplay) {
command.append("oplogReplay", BsonBoolean.valueOf(oplogReplay));
}
if (awaitData) {
command.append("awaitData", BsonBoolean.valueOf(awaitData));
}
if (partial) {
command.append("allowPartialResults", BsonBoolean.valueOf(partial));
}
if (isExplain) {
command = new BsonDocument(EXPLAIN_COMMAND_NAME, command);
}
return command;
}
private BsonDocument asFindCommandResponseDocument(final ResponseBuffers responseBuffers, final QueryResult<T> queryResult,
final boolean isExplain) {
List<ByteBufBsonDocument> rawResultDocuments = Collections.emptyList();
if (responseBuffers.getReplyHeader().getNumberReturned() > 0) {
responseBuffers.reset();
rawResultDocuments = ByteBufBsonDocument.createList(responseBuffers);
}
if (isExplain) {
BsonDocument explainCommandResponseDocument = new BsonDocument("ok", new BsonDouble(1));
explainCommandResponseDocument.putAll(rawResultDocuments.get(0));
return explainCommandResponseDocument;
} else {
BsonDocument cursorDocument = new BsonDocument("id",
queryResult.getCursor() == null
? new BsonInt64(0) : new BsonInt64(queryResult.getCursor().getId()))
.append("ns", new BsonString(namespace.getFullName()))
.append("firstBatch", new BsonArray(rawResultDocuments));
return new BsonDocument("cursor", cursorDocument)
.append("ok", new BsonDouble(1));
}
}
class QueryResultCallback extends ResponseCallback {
private final SingleResultCallback<QueryResult<T>> callback;
private final ConnectionDescription connectionDescription;
private final long startTimeNanos;
private final QueryMessage message;
private final boolean isExplainEvent;
QueryResultCallback(final SingleResultCallback<QueryResult<T>> callback, final int requestId, final long startTimeNanos,
final QueryMessage message, final boolean isExplainEvent, final ConnectionDescription connectionDescription) {
super(requestId, connectionDescription.getServerAddress());
this.callback = callback;
this.startTimeNanos = startTimeNanos;
this.message = message;
this.isExplainEvent = isExplainEvent;
this.connectionDescription = connectionDescription;
}
@Override
protected void callCallback(final ResponseBuffers responseBuffers, final Throwable throwableFromCallback) {
try {
if (throwableFromCallback != null) {
throw throwableFromCallback;
} else if (responseBuffers.getReplyHeader().isQueryFailure()) {
BsonDocument errorDocument = new ReplyMessage<BsonDocument>(responseBuffers, new BsonDocumentCodec(),
getRequestId()).getDocuments().get(0);
throw getQueryFailureException(errorDocument, getServerAddress());
} else {
ReplyMessage<T> replyMessage = new ReplyMessage<T>(responseBuffers, resultDecoder, getRequestId());
QueryResult<T> result = new QueryResult<T>(namespace, replyMessage.getDocuments(),
replyMessage.getReplyHeader().getCursorId(), getServerAddress());
sendQuerySucceededEvent(connectionDescription, startTimeNanos, message, isExplainEvent, responseBuffers, result);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Query results received %s documents with cursor %s",
result.getResults().size(),
result.getCursor()));
}
callback.onResult(result, null);
}
} catch (Throwable t) {
if (commandListener != null) {
sendCommandFailedEvent(message, FIND_COMMAND_NAME, connectionDescription, System.nanoTime() - startTimeNanos, t,
commandListener);
}
callback.onResult(null, t);
} finally {
try {
if (responseBuffers != null) {
responseBuffers.close();
}
} catch (Throwable t1) {
LOGGER.debug("GetMore ResponseBuffer close exception", t1);
}
}
}
}
}