topology_base.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. 'use strict';
  2. const EventEmitter = require('events'),
  3. MongoError = require('mongodb-core').MongoError,
  4. f = require('util').format,
  5. os = require('os'),
  6. translateReadPreference = require('../utils').translateReadPreference,
  7. ClientSession = require('mongodb-core').Sessions.ClientSession;
  8. // The store of ops
  9. var Store = function(topology, storeOptions) {
  10. var self = this;
  11. var storedOps = [];
  12. storeOptions = storeOptions || { force: false, bufferMaxEntries: -1 };
  13. // Internal state
  14. this.s = {
  15. storedOps: storedOps,
  16. storeOptions: storeOptions,
  17. topology: topology
  18. };
  19. Object.defineProperty(this, 'length', {
  20. enumerable: true,
  21. get: function() {
  22. return self.s.storedOps.length;
  23. }
  24. });
  25. };
  26. Store.prototype.add = function(opType, ns, ops, options, callback) {
  27. if (this.s.storeOptions.force) {
  28. return callback(MongoError.create({ message: 'db closed by application', driver: true }));
  29. }
  30. if (this.s.storeOptions.bufferMaxEntries === 0) {
  31. return callback(
  32. MongoError.create({
  33. message: f(
  34. 'no connection available for operation and number of stored operation > %s',
  35. this.s.storeOptions.bufferMaxEntries
  36. ),
  37. driver: true
  38. })
  39. );
  40. }
  41. if (
  42. this.s.storeOptions.bufferMaxEntries > 0 &&
  43. this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
  44. ) {
  45. while (this.s.storedOps.length > 0) {
  46. var op = this.s.storedOps.shift();
  47. op.c(
  48. MongoError.create({
  49. message: f(
  50. 'no connection available for operation and number of stored operation > %s',
  51. this.s.storeOptions.bufferMaxEntries
  52. ),
  53. driver: true
  54. })
  55. );
  56. }
  57. return;
  58. }
  59. this.s.storedOps.push({ t: opType, n: ns, o: ops, op: options, c: callback });
  60. };
  61. Store.prototype.addObjectAndMethod = function(opType, object, method, params, callback) {
  62. if (this.s.storeOptions.force) {
  63. return callback(MongoError.create({ message: 'db closed by application', driver: true }));
  64. }
  65. if (this.s.storeOptions.bufferMaxEntries === 0) {
  66. return callback(
  67. MongoError.create({
  68. message: f(
  69. 'no connection available for operation and number of stored operation > %s',
  70. this.s.storeOptions.bufferMaxEntries
  71. ),
  72. driver: true
  73. })
  74. );
  75. }
  76. if (
  77. this.s.storeOptions.bufferMaxEntries > 0 &&
  78. this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
  79. ) {
  80. while (this.s.storedOps.length > 0) {
  81. var op = this.s.storedOps.shift();
  82. op.c(
  83. MongoError.create({
  84. message: f(
  85. 'no connection available for operation and number of stored operation > %s',
  86. this.s.storeOptions.bufferMaxEntries
  87. ),
  88. driver: true
  89. })
  90. );
  91. }
  92. return;
  93. }
  94. this.s.storedOps.push({ t: opType, m: method, o: object, p: params, c: callback });
  95. };
  96. Store.prototype.flush = function(err) {
  97. while (this.s.storedOps.length > 0) {
  98. this.s.storedOps
  99. .shift()
  100. .c(
  101. err ||
  102. MongoError.create({ message: f('no connection available for operation'), driver: true })
  103. );
  104. }
  105. };
  106. var primaryOptions = ['primary', 'primaryPreferred', 'nearest', 'secondaryPreferred'];
  107. var secondaryOptions = ['secondary', 'secondaryPreferred'];
  108. Store.prototype.execute = function(options) {
  109. options = options || {};
  110. // Get current ops
  111. var ops = this.s.storedOps;
  112. // Reset the ops
  113. this.s.storedOps = [];
  114. // Unpack options
  115. var executePrimary = typeof options.executePrimary === 'boolean' ? options.executePrimary : true;
  116. var executeSecondary =
  117. typeof options.executeSecondary === 'boolean' ? options.executeSecondary : true;
  118. // Execute all the stored ops
  119. while (ops.length > 0) {
  120. var op = ops.shift();
  121. if (op.t === 'cursor') {
  122. if (executePrimary && executeSecondary) {
  123. op.o[op.m].apply(op.o, op.p);
  124. } else if (
  125. executePrimary &&
  126. op.o.options &&
  127. op.o.options.readPreference &&
  128. primaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
  129. ) {
  130. op.o[op.m].apply(op.o, op.p);
  131. } else if (
  132. !executePrimary &&
  133. executeSecondary &&
  134. op.o.options &&
  135. op.o.options.readPreference &&
  136. secondaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
  137. ) {
  138. op.o[op.m].apply(op.o, op.p);
  139. }
  140. } else if (op.t === 'auth') {
  141. this.s.topology[op.t].apply(this.s.topology, op.o);
  142. } else {
  143. if (executePrimary && executeSecondary) {
  144. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  145. } else if (
  146. executePrimary &&
  147. op.op &&
  148. op.op.readPreference &&
  149. primaryOptions.indexOf(op.op.readPreference.mode) !== -1
  150. ) {
  151. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  152. } else if (
  153. !executePrimary &&
  154. executeSecondary &&
  155. op.op &&
  156. op.op.readPreference &&
  157. secondaryOptions.indexOf(op.op.readPreference.mode) !== -1
  158. ) {
  159. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  160. }
  161. }
  162. }
  163. };
  164. Store.prototype.all = function() {
  165. return this.s.storedOps;
  166. };
  167. // Server capabilities
  168. var ServerCapabilities = function(ismaster) {
  169. var setup_get_property = function(object, name, value) {
  170. Object.defineProperty(object, name, {
  171. enumerable: true,
  172. get: function() {
  173. return value;
  174. }
  175. });
  176. };
  177. // Capabilities
  178. var aggregationCursor = false;
  179. var writeCommands = false;
  180. var textSearch = false;
  181. var authCommands = false;
  182. var listCollections = false;
  183. var listIndexes = false;
  184. var maxNumberOfDocsInBatch = ismaster.maxWriteBatchSize || 1000;
  185. var commandsTakeWriteConcern = false;
  186. var commandsTakeCollation = false;
  187. if (ismaster.minWireVersion >= 0) {
  188. textSearch = true;
  189. }
  190. if (ismaster.maxWireVersion >= 1) {
  191. aggregationCursor = true;
  192. authCommands = true;
  193. }
  194. if (ismaster.maxWireVersion >= 2) {
  195. writeCommands = true;
  196. }
  197. if (ismaster.maxWireVersion >= 3) {
  198. listCollections = true;
  199. listIndexes = true;
  200. }
  201. if (ismaster.maxWireVersion >= 5) {
  202. commandsTakeWriteConcern = true;
  203. commandsTakeCollation = true;
  204. }
  205. // If no min or max wire version set to 0
  206. if (ismaster.minWireVersion == null) {
  207. ismaster.minWireVersion = 0;
  208. }
  209. if (ismaster.maxWireVersion == null) {
  210. ismaster.maxWireVersion = 0;
  211. }
  212. // Map up read only parameters
  213. setup_get_property(this, 'hasAggregationCursor', aggregationCursor);
  214. setup_get_property(this, 'hasWriteCommands', writeCommands);
  215. setup_get_property(this, 'hasTextSearch', textSearch);
  216. setup_get_property(this, 'hasAuthCommands', authCommands);
  217. setup_get_property(this, 'hasListCollectionsCommand', listCollections);
  218. setup_get_property(this, 'hasListIndexesCommand', listIndexes);
  219. setup_get_property(this, 'minWireVersion', ismaster.minWireVersion);
  220. setup_get_property(this, 'maxWireVersion', ismaster.maxWireVersion);
  221. setup_get_property(this, 'maxNumberOfDocsInBatch', maxNumberOfDocsInBatch);
  222. setup_get_property(this, 'commandsTakeWriteConcern', commandsTakeWriteConcern);
  223. setup_get_property(this, 'commandsTakeCollation', commandsTakeCollation);
  224. };
  225. // Get package.json variable
  226. const driverVersion = require('../../package.json').version,
  227. nodejsversion = f('Node.js %s, %s', process.version, os.endianness()),
  228. type = os.type(),
  229. name = process.platform,
  230. architecture = process.arch,
  231. release = os.release();
  232. class TopologyBase extends EventEmitter {
  233. constructor() {
  234. super();
  235. // Build default client information
  236. this.clientInfo = {
  237. driver: {
  238. name: 'nodejs',
  239. version: driverVersion
  240. },
  241. os: {
  242. type: type,
  243. name: name,
  244. architecture: architecture,
  245. version: release
  246. },
  247. platform: nodejsversion
  248. };
  249. this.setMaxListeners(Infinity);
  250. }
  251. // Sessions related methods
  252. hasSessionSupport() {
  253. return this.logicalSessionTimeoutMinutes != null;
  254. }
  255. startSession(options, clientOptions) {
  256. const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
  257. session.once('ended', () => {
  258. this.s.sessions = this.s.sessions.filter(s => !s.equals(session));
  259. });
  260. this.s.sessions.push(session);
  261. return session;
  262. }
  263. endSessions(sessions, callback) {
  264. return this.s.coreTopology.endSessions(sessions, callback);
  265. }
  266. // Server capabilities
  267. capabilities() {
  268. if (this.s.sCapabilities) return this.s.sCapabilities;
  269. if (this.s.coreTopology.lastIsMaster() == null) return null;
  270. this.s.sCapabilities = new ServerCapabilities(this.s.coreTopology.lastIsMaster());
  271. return this.s.sCapabilities;
  272. }
  273. // Command
  274. command(ns, cmd, options, callback) {
  275. this.s.coreTopology.command(ns, cmd, translateReadPreference(options), callback);
  276. }
  277. // Insert
  278. insert(ns, ops, options, callback) {
  279. this.s.coreTopology.insert(ns, ops, options, callback);
  280. }
  281. // Update
  282. update(ns, ops, options, callback) {
  283. this.s.coreTopology.update(ns, ops, options, callback);
  284. }
  285. // Remove
  286. remove(ns, ops, options, callback) {
  287. this.s.coreTopology.remove(ns, ops, options, callback);
  288. }
  289. // IsConnected
  290. isConnected(options) {
  291. options = options || {};
  292. options = translateReadPreference(options);
  293. return this.s.coreTopology.isConnected(options);
  294. }
  295. // IsDestroyed
  296. isDestroyed() {
  297. return this.s.coreTopology.isDestroyed();
  298. }
  299. // Cursor
  300. cursor(ns, cmd, options) {
  301. options = options || {};
  302. options = translateReadPreference(options);
  303. options.disconnectHandler = this.s.store;
  304. options.topology = this;
  305. return this.s.coreTopology.cursor(ns, cmd, options);
  306. }
  307. lastIsMaster() {
  308. return this.s.coreTopology.lastIsMaster();
  309. }
  310. selectServer(selector, options, callback) {
  311. return this.s.coreTopology.selectServer(selector, options, callback);
  312. }
  313. /**
  314. * Unref all sockets
  315. * @method
  316. */
  317. unref() {
  318. return this.s.coreTopology.unref();
  319. }
  320. /**
  321. * All raw connections
  322. * @method
  323. * @return {array}
  324. */
  325. connections() {
  326. return this.s.coreTopology.connections();
  327. }
  328. close(forceClosed, callback) {
  329. // If we have sessions, we want to individually move them to the session pool,
  330. // and then send a single endSessions call.
  331. if (this.s.sessions.length) {
  332. this.s.sessions.forEach(session => session.endSession());
  333. }
  334. if (this.s.sessionPool) {
  335. this.s.sessionPool.endAllPooledSessions();
  336. }
  337. // We need to wash out all stored processes
  338. if (forceClosed === true) {
  339. this.s.storeOptions.force = forceClosed;
  340. this.s.store.flush();
  341. }
  342. this.s.coreTopology.destroy(
  343. {
  344. force: typeof forceClosed === 'boolean' ? forceClosed : false
  345. },
  346. callback
  347. );
  348. }
  349. }
  350. // Properties
  351. Object.defineProperty(TopologyBase.prototype, 'bson', {
  352. enumerable: true,
  353. get: function() {
  354. return this.s.coreTopology.s.bson;
  355. }
  356. });
  357. Object.defineProperty(TopologyBase.prototype, 'parserType', {
  358. enumerable: true,
  359. get: function() {
  360. return this.s.coreTopology.parserType;
  361. }
  362. });
  363. Object.defineProperty(TopologyBase.prototype, 'logicalSessionTimeoutMinutes', {
  364. enumerable: true,
  365. get: function() {
  366. return this.s.coreTopology.logicalSessionTimeoutMinutes;
  367. }
  368. });
  369. Object.defineProperty(TopologyBase.prototype, 'type', {
  370. enumerable: true,
  371. get: function() {
  372. return this.s.coreTopology.type;
  373. }
  374. });
  375. exports.Store = Store;
  376. exports.ServerCapabilities = ServerCapabilities;
  377. exports.TopologyBase = TopologyBase;