ChangeStream.js 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const EventEmitter = require('events').EventEmitter;
  6. /*!
  7. * ignore
  8. */
  9. class ChangeStream extends EventEmitter {
  10. constructor(model, pipeline, options) {
  11. super();
  12. this.driverChangeStream = null;
  13. this.closed = false;
  14. // This wrapper is necessary because of buffering.
  15. if (model.collection.buffer) {
  16. model.collection.addQueue(() => {
  17. if (this.closed) {
  18. return;
  19. }
  20. this.driverChangeStream = model.collection.watch(pipeline, options);
  21. this._bindEvents();
  22. this.emit('ready');
  23. });
  24. } else {
  25. this.driverChangeStream = model.collection.watch(pipeline, options);
  26. this._bindEvents();
  27. this.emit('ready');
  28. }
  29. }
  30. _bindEvents() {
  31. ['close', 'change', 'end', 'error'].forEach(ev => {
  32. this.driverChangeStream.on(ev, data => this.emit(ev, data));
  33. });
  34. }
  35. _queue(cb) {
  36. this.once('ready', () => cb());
  37. }
  38. close() {
  39. this.closed = true;
  40. if (this.driverChangeStream) {
  41. this.driverChangeStream.close();
  42. }
  43. }
  44. }
  45. /*!
  46. * ignore
  47. */
  48. module.exports = ChangeStream;