123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- 'use strict';
- const Query = require('../connection/commands').Query;
- const Msg = require('../connection/msg').Msg;
- const MongoError = require('../error').MongoError;
- const getReadPreference = require('./shared').getReadPreference;
- const isSharded = require('./shared').isSharded;
- const databaseNamespace = require('./shared').databaseNamespace;
- const isTransactionCommand = require('../transactions').isTransactionCommand;
- const applySession = require('../sessions').applySession;
- function command(server, ns, cmd, options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = options || {};
- if (cmd == null) {
- return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`));
- }
- const bson = server.s.bson;
- const pool = server.s.pool;
- const readPreference = getReadPreference(cmd, options);
- const shouldUseOpMsg = supportsOpMsg(server);
- const session = options.session;
- let clusterTime = server.clusterTime;
- let finalCmd = Object.assign({}, cmd);
- if (hasSessionSupport(server) && session) {
- if (
- session.clusterTime &&
- session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)
- ) {
- clusterTime = session.clusterTime;
- }
- const err = applySession(session, finalCmd, options);
- if (err) {
- return callback(err);
- }
- }
- // if we have a known cluster time, gossip it
- if (clusterTime) {
- finalCmd.$clusterTime = clusterTime;
- }
- if (
- isSharded(server) &&
- !shouldUseOpMsg &&
- readPreference &&
- readPreference.preference !== 'primary'
- ) {
- finalCmd = {
- $query: finalCmd,
- $readPreference: readPreference.toJSON()
- };
- }
- const commandOptions = Object.assign(
- {
- command: true,
- numberToSkip: 0,
- numberToReturn: -1,
- checkKeys: false
- },
- options
- );
- // This value is not overridable
- commandOptions.slaveOk = readPreference.slaveOk();
- const cmdNs = `${databaseNamespace(ns)}.$cmd`;
- const message = shouldUseOpMsg
- ? new Msg(bson, cmdNs, finalCmd, commandOptions)
- : new Query(bson, cmdNs, finalCmd, commandOptions);
- const inTransaction = session && (session.inTransaction() || isTransactionCommand(finalCmd));
- const commandResponseHandler = inTransaction
- ? function(err) {
- if (
- !cmd.commitTransaction &&
- err &&
- err instanceof MongoError &&
- err.hasErrorLabel('TransientTransactionError')
- ) {
- session.transaction.unpinServer();
- }
- return callback.apply(null, arguments);
- }
- : callback;
- try {
- pool.write(message, commandOptions, commandResponseHandler);
- } catch (err) {
- commandResponseHandler(err);
- }
- }
- function hasSessionSupport(topology) {
- if (topology == null) return false;
- if (topology.description) {
- return topology.description.maxWireVersion >= 6;
- }
- return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6;
- }
- function supportsOpMsg(topologyOrServer) {
- const description = topologyOrServer.ismaster
- ? topologyOrServer.ismaster
- : topologyOrServer.description;
- if (description == null) {
- return false;
- }
- return description.maxWireVersion >= 6 && description.__nodejs_mock_server__ == null;
- }
- module.exports = command;
|