aggregation_cursor.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. 'use strict';
  2. const inherits = require('util').inherits;
  3. const MongoError = require('mongodb-core').MongoError;
  4. const Readable = require('stream').Readable;
  5. const CoreCursor = require('./cursor');
  6. const deprecate = require('util').deprecate;
  7. const SUPPORTS = require('./utils').SUPPORTS;
  8. /**
  9. * @fileOverview The **AggregationCursor** class is an internal class that embodies an aggregation cursor on MongoDB
  10. * allowing for iteration over the results returned from the underlying query. It supports
  11. * one by one document iteration, conversion to an array or can be iterated as a Node 4.X
  12. * or higher stream
  13. *
  14. * **AGGREGATIONCURSOR Cannot directly be instantiated**
  15. * @example
  16. * const MongoClient = require('mongodb').MongoClient;
  17. * const test = require('assert');
  18. * // Connection url
  19. * const url = 'mongodb://localhost:27017';
  20. * // Database Name
  21. * const dbName = 'test';
  22. * // Connect using MongoClient
  23. * MongoClient.connect(url, function(err, client) {
  24. * // Create a collection we want to drop later
  25. * const col = client.db(dbName).collection('createIndexExample1');
  26. * // Insert a bunch of documents
  27. * col.insert([{a:1, b:1}
  28. * , {a:2, b:2}, {a:3, b:3}
  29. * , {a:4, b:4}], {w:1}, function(err, result) {
  30. * test.equal(null, err);
  31. * // Show that duplicate records got dropped
  32. * col.aggregation({}, {cursor: {}}).toArray(function(err, items) {
  33. * test.equal(null, err);
  34. * test.equal(4, items.length);
  35. * client.close();
  36. * });
  37. * });
  38. * });
  39. */
  40. /**
  41. * Namespace provided by the browser.
  42. * @external Readable
  43. */
  44. /**
  45. * Creates a new Aggregation Cursor instance (INTERNAL TYPE, do not instantiate directly)
  46. * @class AggregationCursor
  47. * @extends external:Readable
  48. * @fires AggregationCursor#data
  49. * @fires AggregationCursor#end
  50. * @fires AggregationCursor#close
  51. * @fires AggregationCursor#readable
  52. * @return {AggregationCursor} an AggregationCursor instance.
  53. */
  54. var AggregationCursor = function(bson, ns, cmd, options, topology, topologyOptions) {
  55. CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
  56. var state = AggregationCursor.INIT;
  57. var streamOptions = {};
  58. // MaxTimeMS
  59. var maxTimeMS = null;
  60. // Get the promiseLibrary
  61. var promiseLibrary = options.promiseLibrary || Promise;
  62. // Set up
  63. Readable.call(this, { objectMode: true });
  64. // Internal state
  65. this.s = {
  66. // MaxTimeMS
  67. maxTimeMS: maxTimeMS,
  68. // State
  69. state: state,
  70. // Stream options
  71. streamOptions: streamOptions,
  72. // BSON
  73. bson: bson,
  74. // Namespace
  75. ns: ns,
  76. // Command
  77. cmd: cmd,
  78. // Options
  79. options: options,
  80. // Topology
  81. topology: topology,
  82. // Topology Options
  83. topologyOptions: topologyOptions,
  84. // Promise library
  85. promiseLibrary: promiseLibrary,
  86. // Optional ClientSession
  87. session: options.session
  88. };
  89. };
  90. /**
  91. * AggregationCursor stream data event, fired for each document in the cursor.
  92. *
  93. * @event AggregationCursor#data
  94. * @type {object}
  95. */
  96. /**
  97. * AggregationCursor stream end event
  98. *
  99. * @event AggregationCursor#end
  100. * @type {null}
  101. */
  102. /**
  103. * AggregationCursor stream close event
  104. *
  105. * @event AggregationCursor#close
  106. * @type {null}
  107. */
  108. /**
  109. * AggregationCursor stream readable event
  110. *
  111. * @event AggregationCursor#readable
  112. * @type {null}
  113. */
  114. // Inherit from Readable
  115. inherits(AggregationCursor, Readable);
  116. // Extend the Cursor
  117. for (var name in CoreCursor.prototype) {
  118. AggregationCursor.prototype[name] = CoreCursor.prototype[name];
  119. }
  120. if (SUPPORTS.ASYNC_ITERATOR) {
  121. AggregationCursor.prototype[
  122. Symbol.asyncIterator
  123. ] = require('./async/async_iterator').asyncIterator;
  124. }
  125. /**
  126. * Set the batch size for the cursor.
  127. * @method
  128. * @param {number} value The batchSize for the cursor.
  129. * @throws {MongoError}
  130. * @return {AggregationCursor}
  131. */
  132. AggregationCursor.prototype.batchSize = function(value) {
  133. if (this.s.state === AggregationCursor.CLOSED || this.isDead())
  134. throw MongoError.create({ message: 'Cursor is closed', driver: true });
  135. if (typeof value !== 'number')
  136. throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
  137. if (this.s.cmd.cursor) this.s.cmd.cursor.batchSize = value;
  138. this.setCursorBatchSize(value);
  139. return this;
  140. };
  141. /**
  142. * Add a geoNear stage to the aggregation pipeline
  143. * @method
  144. * @param {object} document The geoNear stage document.
  145. * @return {AggregationCursor}
  146. */
  147. AggregationCursor.prototype.geoNear = deprecate(function(document) {
  148. this.s.cmd.pipeline.push({ $geoNear: document });
  149. return this;
  150. }, 'The `$geoNear` stage is deprecated in MongoDB 4.0, and removed in version 4.2.');
  151. /**
  152. * Add a group stage to the aggregation pipeline
  153. * @method
  154. * @param {object} document The group stage document.
  155. * @return {AggregationCursor}
  156. */
  157. AggregationCursor.prototype.group = function(document) {
  158. this.s.cmd.pipeline.push({ $group: document });
  159. return this;
  160. };
  161. /**
  162. * Add a limit stage to the aggregation pipeline
  163. * @method
  164. * @param {number} value The state limit value.
  165. * @return {AggregationCursor}
  166. */
  167. AggregationCursor.prototype.limit = function(value) {
  168. this.s.cmd.pipeline.push({ $limit: value });
  169. return this;
  170. };
  171. /**
  172. * Add a match stage to the aggregation pipeline
  173. * @method
  174. * @param {object} document The match stage document.
  175. * @return {AggregationCursor}
  176. */
  177. AggregationCursor.prototype.match = function(document) {
  178. this.s.cmd.pipeline.push({ $match: document });
  179. return this;
  180. };
  181. /**
  182. * Add a maxTimeMS stage to the aggregation pipeline
  183. * @method
  184. * @param {number} value The state maxTimeMS value.
  185. * @return {AggregationCursor}
  186. */
  187. AggregationCursor.prototype.maxTimeMS = function(value) {
  188. if (this.s.topology.lastIsMaster().minWireVersion > 2) {
  189. this.s.cmd.maxTimeMS = value;
  190. }
  191. return this;
  192. };
  193. /**
  194. * Add a out stage to the aggregation pipeline
  195. * @method
  196. * @param {number} destination The destination name.
  197. * @return {AggregationCursor}
  198. */
  199. AggregationCursor.prototype.out = function(destination) {
  200. this.s.cmd.pipeline.push({ $out: destination });
  201. return this;
  202. };
  203. /**
  204. * Add a project stage to the aggregation pipeline
  205. * @method
  206. * @param {object} document The project stage document.
  207. * @return {AggregationCursor}
  208. */
  209. AggregationCursor.prototype.project = function(document) {
  210. this.s.cmd.pipeline.push({ $project: document });
  211. return this;
  212. };
  213. /**
  214. * Add a lookup stage to the aggregation pipeline
  215. * @method
  216. * @param {object} document The lookup stage document.
  217. * @return {AggregationCursor}
  218. */
  219. AggregationCursor.prototype.lookup = function(document) {
  220. this.s.cmd.pipeline.push({ $lookup: document });
  221. return this;
  222. };
  223. /**
  224. * Add a redact stage to the aggregation pipeline
  225. * @method
  226. * @param {object} document The redact stage document.
  227. * @return {AggregationCursor}
  228. */
  229. AggregationCursor.prototype.redact = function(document) {
  230. this.s.cmd.pipeline.push({ $redact: document });
  231. return this;
  232. };
  233. /**
  234. * Add a skip stage to the aggregation pipeline
  235. * @method
  236. * @param {number} value The state skip value.
  237. * @return {AggregationCursor}
  238. */
  239. AggregationCursor.prototype.skip = function(value) {
  240. this.s.cmd.pipeline.push({ $skip: value });
  241. return this;
  242. };
  243. /**
  244. * Add a sort stage to the aggregation pipeline
  245. * @method
  246. * @param {object} document The sort stage document.
  247. * @return {AggregationCursor}
  248. */
  249. AggregationCursor.prototype.sort = function(document) {
  250. this.s.cmd.pipeline.push({ $sort: document });
  251. return this;
  252. };
  253. /**
  254. * Add a unwind stage to the aggregation pipeline
  255. * @method
  256. * @param {number} field The unwind field name.
  257. * @return {AggregationCursor}
  258. */
  259. AggregationCursor.prototype.unwind = function(field) {
  260. this.s.cmd.pipeline.push({ $unwind: field });
  261. return this;
  262. };
  263. /**
  264. * Return the cursor logger
  265. * @method
  266. * @return {Logger} return the cursor logger
  267. * @ignore
  268. */
  269. AggregationCursor.prototype.getLogger = function() {
  270. return this.logger;
  271. };
  272. AggregationCursor.prototype.get = AggregationCursor.prototype.toArray;
  273. /**
  274. * Get the next available document from the cursor, returns null if no more documents are available.
  275. * @function AggregationCursor.prototype.next
  276. * @param {AggregationCursor~resultCallback} [callback] The result callback.
  277. * @throws {MongoError}
  278. * @return {Promise} returns Promise if no callback passed
  279. */
  280. /**
  281. * Check if there is any document still available in the cursor
  282. * @function AggregationCursor.prototype.hasNext
  283. * @param {AggregationCursor~resultCallback} [callback] The result callback.
  284. * @throws {MongoError}
  285. * @return {Promise} returns Promise if no callback passed
  286. */
  287. /**
  288. * The callback format for results
  289. * @callback AggregationCursor~toArrayResultCallback
  290. * @param {MongoError} error An error instance representing the error during the execution.
  291. * @param {object[]} documents All the documents the satisfy the cursor.
  292. */
  293. /**
  294. * Returns an array of documents. The caller is responsible for making sure that there
  295. * is enough memory to store the results. Note that the array only contain partial
  296. * results when this cursor had been previously accessed. In that case,
  297. * cursor.rewind() can be used to reset the cursor.
  298. * @method AggregationCursor.prototype.toArray
  299. * @param {AggregationCursor~toArrayResultCallback} [callback] The result callback.
  300. * @throws {MongoError}
  301. * @return {Promise} returns Promise if no callback passed
  302. */
  303. /**
  304. * The callback format for results
  305. * @callback AggregationCursor~resultCallback
  306. * @param {MongoError} error An error instance representing the error during the execution.
  307. * @param {(object|null)} result The result object if the command was executed successfully.
  308. */
  309. /**
  310. * Iterates over all the documents for this cursor. As with **{cursor.toArray}**,
  311. * not all of the elements will be iterated if this cursor had been previously accessed.
  312. * In that case, **{cursor.rewind}** can be used to reset the cursor. However, unlike
  313. * **{cursor.toArray}**, the cursor will only hold a maximum of batch size elements
  314. * at any given time if batch size is specified. Otherwise, the caller is responsible
  315. * for making sure that the entire result can fit the memory.
  316. * @method AggregationCursor.prototype.each
  317. * @param {AggregationCursor~resultCallback} callback The result callback.
  318. * @throws {MongoError}
  319. * @return {null}
  320. */
  321. /**
  322. * Close the cursor, sending a AggregationCursor command and emitting close.
  323. * @method AggregationCursor.prototype.close
  324. * @param {AggregationCursor~resultCallback} [callback] The result callback.
  325. * @return {Promise} returns Promise if no callback passed
  326. */
  327. /**
  328. * Is the cursor closed
  329. * @method AggregationCursor.prototype.isClosed
  330. * @return {boolean}
  331. */
  332. /**
  333. * Execute the explain for the cursor
  334. * @method AggregationCursor.prototype.explain
  335. * @param {AggregationCursor~resultCallback} [callback] The result callback.
  336. * @return {Promise} returns Promise if no callback passed
  337. */
  338. /**
  339. * Clone the cursor
  340. * @function AggregationCursor.prototype.clone
  341. * @return {AggregationCursor}
  342. */
  343. /**
  344. * Resets the cursor
  345. * @function AggregationCursor.prototype.rewind
  346. * @return {AggregationCursor}
  347. */
  348. /**
  349. * The callback format for the forEach iterator method
  350. * @callback AggregationCursor~iteratorCallback
  351. * @param {Object} doc An emitted document for the iterator
  352. */
  353. /**
  354. * The callback error format for the forEach iterator method
  355. * @callback AggregationCursor~endCallback
  356. * @param {MongoError} error An error instance representing the error during the execution.
  357. */
  358. /*
  359. * Iterates over all the documents for this cursor using the iterator, callback pattern.
  360. * @method AggregationCursor.prototype.forEach
  361. * @param {AggregationCursor~iteratorCallback} iterator The iteration callback.
  362. * @param {AggregationCursor~endCallback} callback The end callback.
  363. * @throws {MongoError}
  364. * @return {null}
  365. */
  366. AggregationCursor.INIT = 0;
  367. AggregationCursor.OPEN = 1;
  368. AggregationCursor.CLOSED = 2;
  369. module.exports = AggregationCursor;