command.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. 'use strict';
  2. const Query = require('../connection/commands').Query;
  3. const Msg = require('../connection/msg').Msg;
  4. const MongoError = require('../error').MongoError;
  5. const getReadPreference = require('./shared').getReadPreference;
  6. const isSharded = require('./shared').isSharded;
  7. const databaseNamespace = require('./shared').databaseNamespace;
  8. const isTransactionCommand = require('../transactions').isTransactionCommand;
  9. const applySession = require('../sessions').applySession;
  10. function command(server, ns, cmd, options, callback) {
  11. if (typeof options === 'function') (callback = options), (options = {});
  12. options = options || {};
  13. if (cmd == null) {
  14. return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`));
  15. }
  16. const bson = server.s.bson;
  17. const pool = server.s.pool;
  18. const readPreference = getReadPreference(cmd, options);
  19. const shouldUseOpMsg = supportsOpMsg(server);
  20. const session = options.session;
  21. let clusterTime = server.clusterTime;
  22. let finalCmd = Object.assign({}, cmd);
  23. if (hasSessionSupport(server) && session) {
  24. if (
  25. session.clusterTime &&
  26. session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)
  27. ) {
  28. clusterTime = session.clusterTime;
  29. }
  30. const err = applySession(session, finalCmd, options);
  31. if (err) {
  32. return callback(err);
  33. }
  34. }
  35. // if we have a known cluster time, gossip it
  36. if (clusterTime) {
  37. finalCmd.$clusterTime = clusterTime;
  38. }
  39. if (
  40. isSharded(server) &&
  41. !shouldUseOpMsg &&
  42. readPreference &&
  43. readPreference.preference !== 'primary'
  44. ) {
  45. finalCmd = {
  46. $query: finalCmd,
  47. $readPreference: readPreference.toJSON()
  48. };
  49. }
  50. const commandOptions = Object.assign(
  51. {
  52. command: true,
  53. numberToSkip: 0,
  54. numberToReturn: -1,
  55. checkKeys: false
  56. },
  57. options
  58. );
  59. // This value is not overridable
  60. commandOptions.slaveOk = readPreference.slaveOk();
  61. const cmdNs = `${databaseNamespace(ns)}.$cmd`;
  62. const message = shouldUseOpMsg
  63. ? new Msg(bson, cmdNs, finalCmd, commandOptions)
  64. : new Query(bson, cmdNs, finalCmd, commandOptions);
  65. const inTransaction = session && (session.inTransaction() || isTransactionCommand(finalCmd));
  66. const commandResponseHandler = inTransaction
  67. ? function(err) {
  68. if (
  69. !cmd.commitTransaction &&
  70. err &&
  71. err instanceof MongoError &&
  72. err.hasErrorLabel('TransientTransactionError')
  73. ) {
  74. session.transaction.unpinServer();
  75. }
  76. return callback.apply(null, arguments);
  77. }
  78. : callback;
  79. try {
  80. pool.write(message, commandOptions, commandResponseHandler);
  81. } catch (err) {
  82. commandResponseHandler(err);
  83. }
  84. }
  85. function hasSessionSupport(topology) {
  86. if (topology == null) return false;
  87. if (topology.description) {
  88. return topology.description.maxWireVersion >= 6;
  89. }
  90. return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6;
  91. }
  92. function supportsOpMsg(topologyOrServer) {
  93. const description = topologyOrServer.ismaster
  94. ? topologyOrServer.ismaster
  95. : topologyOrServer.description;
  96. if (description == null) {
  97. return false;
  98. }
  99. return description.maxWireVersion >= 6 && description.__nodejs_mock_server__ == null;
  100. }
  101. module.exports = command;