123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- 'use strict';
- const AggregationCursor = require('../aggregation_cursor');
- const applyWriteConcern = require('../utils').applyWriteConcern;
- const decorateWithCollation = require('../utils').decorateWithCollation;
- const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
- const handleCallback = require('../utils').handleCallback;
- const MongoError = require('mongodb-core').MongoError;
- const resolveReadPreference = require('../utils').resolveReadPreference;
- const toError = require('../utils').toError;
- const DB_AGGREGATE_COLLECTION = 1;
- /**
- * Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information.
- *
- * @method
- * @param {Db} db A Db instance.
- * @param {Collection|string} coll A collection instance or the string '1', used for db.aggregate.
- * @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
- * @param {object} [options] Optional settings. See Collection.prototype.aggregate or Db.prototype.aggregate for a list of options.
- * @param {Db~aggregationCallback|Collection~aggregationCallback} callback The command result callback
- */
- function aggregate(db, coll, pipeline, options, callback) {
- const isDbAggregate = typeof coll === 'string';
- const target = isDbAggregate ? db : coll;
- const topology = target.s.topology;
- let hasOutStage = false;
- if (typeof options.out === 'string') {
- pipeline = pipeline.concat({ $out: options.out });
- hasOutStage = true;
- } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
- hasOutStage = true;
- }
- let command;
- let namespace;
- let optionSources;
- if (isDbAggregate) {
- command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline };
- namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`;
- optionSources = { db };
- } else {
- command = { aggregate: coll.s.name, pipeline: pipeline };
- namespace = coll.s.namespace;
- optionSources = { db: coll.s.db, collection: coll };
- }
- const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;
- if (!hasOutStage) {
- decorateWithReadConcern(command, target, options);
- }
- if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) {
- applyWriteConcern(command, optionSources, options);
- }
- try {
- decorateWithCollation(command, target, options);
- } catch (err) {
- if (typeof callback === 'function') return callback(err, null);
- throw err;
- }
- if (options.bypassDocumentValidation === true) {
- command.bypassDocumentValidation = options.bypassDocumentValidation;
- }
- if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
- if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
- if (options.hint) command.hint = options.hint;
- options = Object.assign({}, options);
- // Ensure we have the right read preference inheritance
- options.readPreference = resolveReadPreference(options, optionSources);
- if (options.explain) {
- if (command.readConcern || command.writeConcern) {
- throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
- }
- command.explain = options.explain;
- }
- if (typeof options.comment === 'string') command.comment = options.comment;
- // Validate that cursor options is valid
- if (options.cursor != null && typeof options.cursor !== 'object') {
- throw toError('cursor options must be an object');
- }
- options.cursor = options.cursor || {};
- if (options.batchSize && !hasOutStage) options.cursor.batchSize = options.batchSize;
- command.cursor = options.cursor;
- // promiseLibrary
- options.promiseLibrary = target.s.promiseLibrary;
- // Set the AggregationCursor constructor
- options.cursorFactory = AggregationCursor;
- if (typeof callback !== 'function') {
- if (!topology.capabilities()) {
- throw new MongoError('cannot connect to server');
- }
- return topology.cursor(namespace, command, options);
- }
- return handleCallback(callback, null, topology.cursor(namespace, command, options));
- }
- module.exports = {
- aggregate
- };
|