aggregate.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. 'use strict';
  2. const AggregationCursor = require('../aggregation_cursor');
  3. const applyWriteConcern = require('../utils').applyWriteConcern;
  4. const decorateWithCollation = require('../utils').decorateWithCollation;
  5. const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
  6. const handleCallback = require('../utils').handleCallback;
  7. const MongoError = require('mongodb-core').MongoError;
  8. const resolveReadPreference = require('../utils').resolveReadPreference;
  9. const toError = require('../utils').toError;
  10. const DB_AGGREGATE_COLLECTION = 1;
  11. /**
  12. * Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information.
  13. *
  14. * @method
  15. * @param {Db} db A Db instance.
  16. * @param {Collection|string} coll A collection instance or the string '1', used for db.aggregate.
  17. * @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
  18. * @param {object} [options] Optional settings. See Collection.prototype.aggregate or Db.prototype.aggregate for a list of options.
  19. * @param {Db~aggregationCallback|Collection~aggregationCallback} callback The command result callback
  20. */
  21. function aggregate(db, coll, pipeline, options, callback) {
  22. const isDbAggregate = typeof coll === 'string';
  23. const target = isDbAggregate ? db : coll;
  24. const topology = target.s.topology;
  25. let hasOutStage = false;
  26. if (typeof options.out === 'string') {
  27. pipeline = pipeline.concat({ $out: options.out });
  28. hasOutStage = true;
  29. } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
  30. hasOutStage = true;
  31. }
  32. let command;
  33. let namespace;
  34. let optionSources;
  35. if (isDbAggregate) {
  36. command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline };
  37. namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`;
  38. optionSources = { db };
  39. } else {
  40. command = { aggregate: coll.s.name, pipeline: pipeline };
  41. namespace = coll.s.namespace;
  42. optionSources = { db: coll.s.db, collection: coll };
  43. }
  44. const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;
  45. if (!hasOutStage) {
  46. decorateWithReadConcern(command, target, options);
  47. }
  48. if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) {
  49. applyWriteConcern(command, optionSources, options);
  50. }
  51. try {
  52. decorateWithCollation(command, target, options);
  53. } catch (err) {
  54. if (typeof callback === 'function') return callback(err, null);
  55. throw err;
  56. }
  57. if (options.bypassDocumentValidation === true) {
  58. command.bypassDocumentValidation = options.bypassDocumentValidation;
  59. }
  60. if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
  61. if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
  62. if (options.hint) command.hint = options.hint;
  63. options = Object.assign({}, options);
  64. // Ensure we have the right read preference inheritance
  65. options.readPreference = resolveReadPreference(options, optionSources);
  66. if (options.explain) {
  67. if (command.readConcern || command.writeConcern) {
  68. throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
  69. }
  70. command.explain = options.explain;
  71. }
  72. if (typeof options.comment === 'string') command.comment = options.comment;
  73. // Validate that cursor options is valid
  74. if (options.cursor != null && typeof options.cursor !== 'object') {
  75. throw toError('cursor options must be an object');
  76. }
  77. options.cursor = options.cursor || {};
  78. if (options.batchSize && !hasOutStage) options.cursor.batchSize = options.batchSize;
  79. command.cursor = options.cursor;
  80. // promiseLibrary
  81. options.promiseLibrary = target.s.promiseLibrary;
  82. // Set the AggregationCursor constructor
  83. options.cursorFactory = AggregationCursor;
  84. if (typeof callback !== 'function') {
  85. if (!topology.capabilities()) {
  86. throw new MongoError('cannot connect to server');
  87. }
  88. return topology.cursor(namespace, command, options);
  89. }
  90. return handleCallback(callback, null, topology.cursor(namespace, command, options));
  91. }
  92. module.exports = {
  93. aggregate
  94. };