replset.js 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507
  1. 'use strict';
  2. const inherits = require('util').inherits;
  3. const f = require('util').format;
  4. const EventEmitter = require('events').EventEmitter;
  5. const ReadPreference = require('./read_preference');
  6. const BasicCursor = require('../cursor');
  7. const retrieveBSON = require('../connection/utils').retrieveBSON;
  8. const Logger = require('../connection/logger');
  9. const MongoError = require('../error').MongoError;
  10. const Server = require('./server');
  11. const ReplSetState = require('./replset_state');
  12. const clone = require('./shared').clone;
  13. const Timeout = require('./shared').Timeout;
  14. const Interval = require('./shared').Interval;
  15. const createClientInfo = require('./shared').createClientInfo;
  16. const SessionMixins = require('./shared').SessionMixins;
  17. const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
  18. const relayEvents = require('../utils').relayEvents;
  19. const isRetryableError = require('../error').isRetryableError;
  20. const BSON = retrieveBSON();
  21. //
  22. // States
  23. var DISCONNECTED = 'disconnected';
  24. var CONNECTING = 'connecting';
  25. var CONNECTED = 'connected';
  26. var UNREFERENCED = 'unreferenced';
  27. var DESTROYED = 'destroyed';
  28. function stateTransition(self, newState) {
  29. var legalTransitions = {
  30. disconnected: [CONNECTING, DESTROYED, DISCONNECTED],
  31. connecting: [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
  32. connected: [CONNECTED, DISCONNECTED, DESTROYED, UNREFERENCED],
  33. unreferenced: [UNREFERENCED, DESTROYED],
  34. destroyed: [DESTROYED]
  35. };
  36. // Get current state
  37. var legalStates = legalTransitions[self.state];
  38. if (legalStates && legalStates.indexOf(newState) !== -1) {
  39. self.state = newState;
  40. } else {
  41. self.s.logger.error(
  42. f(
  43. 'Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]',
  44. self.id,
  45. self.state,
  46. newState,
  47. legalStates
  48. )
  49. );
  50. }
  51. }
  52. //
  53. // ReplSet instance id
  54. var id = 1;
  55. var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
  56. /**
  57. * Creates a new Replset instance
  58. * @class
  59. * @param {array} seedlist A list of seeds for the replicaset
  60. * @param {boolean} options.setName The Replicaset set name
  61. * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
  62. * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
  63. * @param {boolean} [options.emitError=false] Server will emit errors events
  64. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  65. * @param {number} [options.size=5] Server connection pool size
  66. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  67. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  68. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  69. * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
  70. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  71. * @param {boolean} [options.ssl=false] Use SSL for connection
  72. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  73. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  74. * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
  75. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  76. * @param {Buffer} [options.key] SSL Key file binary buffer
  77. * @param {string} [options.passphrase] SSL Certificate pass phrase
  78. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  79. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  80. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  81. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  82. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  83. * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
  84. * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for Replicaset member selection
  85. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  86. * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology
  87. * @return {ReplSet} A cursor instance
  88. * @fires ReplSet#connect
  89. * @fires ReplSet#ha
  90. * @fires ReplSet#joined
  91. * @fires ReplSet#left
  92. * @fires ReplSet#failed
  93. * @fires ReplSet#fullsetup
  94. * @fires ReplSet#all
  95. * @fires ReplSet#error
  96. * @fires ReplSet#serverHeartbeatStarted
  97. * @fires ReplSet#serverHeartbeatSucceeded
  98. * @fires ReplSet#serverHeartbeatFailed
  99. * @fires ReplSet#topologyOpening
  100. * @fires ReplSet#topologyClosed
  101. * @fires ReplSet#topologyDescriptionChanged
  102. * @property {string} type the topology type.
  103. * @property {string} parserType the parser type used (c++ or js).
  104. */
  105. var ReplSet = function(seedlist, options) {
  106. var self = this;
  107. options = options || {};
  108. // Validate seedlist
  109. if (!Array.isArray(seedlist)) throw new MongoError('seedlist must be an array');
  110. // Validate list
  111. if (seedlist.length === 0) throw new MongoError('seedlist must contain at least one entry');
  112. // Validate entries
  113. seedlist.forEach(function(e) {
  114. if (typeof e.host !== 'string' || typeof e.port !== 'number')
  115. throw new MongoError('seedlist entry must contain a host and port');
  116. });
  117. // Add event listener
  118. EventEmitter.call(this);
  119. // Get replSet Id
  120. this.id = id++;
  121. // Get the localThresholdMS
  122. var localThresholdMS = options.localThresholdMS || 15;
  123. // Backward compatibility
  124. if (options.acceptableLatency) localThresholdMS = options.acceptableLatency;
  125. // Create a logger
  126. var logger = Logger('ReplSet', options);
  127. // Internal state
  128. this.s = {
  129. options: Object.assign({}, options),
  130. // BSON instance
  131. bson:
  132. options.bson ||
  133. new BSON([
  134. BSON.Binary,
  135. BSON.Code,
  136. BSON.DBRef,
  137. BSON.Decimal128,
  138. BSON.Double,
  139. BSON.Int32,
  140. BSON.Long,
  141. BSON.Map,
  142. BSON.MaxKey,
  143. BSON.MinKey,
  144. BSON.ObjectId,
  145. BSON.BSONRegExp,
  146. BSON.Symbol,
  147. BSON.Timestamp
  148. ]),
  149. // Factory overrides
  150. Cursor: options.cursorFactory || BasicCursor,
  151. // Logger instance
  152. logger: logger,
  153. // Seedlist
  154. seedlist: seedlist,
  155. // Replicaset state
  156. replicaSetState: new ReplSetState({
  157. id: this.id,
  158. setName: options.setName,
  159. acceptableLatency: localThresholdMS,
  160. heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
  161. logger: logger
  162. }),
  163. // Current servers we are connecting to
  164. connectingServers: [],
  165. // Ha interval
  166. haInterval: options.haInterval ? options.haInterval : 10000,
  167. // Minimum heartbeat frequency used if we detect a server close
  168. minHeartbeatFrequencyMS: 500,
  169. // Disconnect handler
  170. disconnectHandler: options.disconnectHandler,
  171. // Server selection index
  172. index: 0,
  173. // Connect function options passed in
  174. connectOptions: {},
  175. // Are we running in debug mode
  176. debug: typeof options.debug === 'boolean' ? options.debug : false,
  177. // Client info
  178. clientInfo: createClientInfo(options)
  179. };
  180. // Add handler for topology change
  181. this.s.replicaSetState.on('topologyDescriptionChanged', function(r) {
  182. self.emit('topologyDescriptionChanged', r);
  183. });
  184. // Log info warning if the socketTimeout < haInterval as it will cause
  185. // a lot of recycled connections to happen.
  186. if (
  187. this.s.logger.isWarn() &&
  188. this.s.options.socketTimeout !== 0 &&
  189. this.s.options.socketTimeout < this.s.haInterval
  190. ) {
  191. this.s.logger.warn(
  192. f(
  193. 'warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts',
  194. this.s.options.socketTimeout,
  195. this.s.haInterval
  196. )
  197. );
  198. }
  199. // Add forwarding of events from state handler
  200. var types = ['joined', 'left'];
  201. types.forEach(function(x) {
  202. self.s.replicaSetState.on(x, function(t, s) {
  203. self.emit(x, t, s);
  204. });
  205. });
  206. // Connect stat
  207. this.initialConnectState = {
  208. connect: false,
  209. fullsetup: false,
  210. all: false
  211. };
  212. // Disconnected state
  213. this.state = DISCONNECTED;
  214. this.haTimeoutId = null;
  215. // Last ismaster
  216. this.ismaster = null;
  217. // Contains the intervalId
  218. this.intervalIds = [];
  219. // Highest clusterTime seen in responses from the current deployment
  220. this.clusterTime = null;
  221. };
  222. inherits(ReplSet, EventEmitter);
  223. Object.assign(ReplSet.prototype, SessionMixins);
  224. Object.defineProperty(ReplSet.prototype, 'type', {
  225. enumerable: true,
  226. get: function() {
  227. return 'replset';
  228. }
  229. });
  230. Object.defineProperty(ReplSet.prototype, 'parserType', {
  231. enumerable: true,
  232. get: function() {
  233. return BSON.native ? 'c++' : 'js';
  234. }
  235. });
  236. Object.defineProperty(ReplSet.prototype, 'logicalSessionTimeoutMinutes', {
  237. enumerable: true,
  238. get: function() {
  239. return this.s.replicaSetState.logicalSessionTimeoutMinutes || null;
  240. }
  241. });
  242. function rexecuteOperations(self) {
  243. // If we have a primary and a disconnect handler, execute
  244. // buffered operations
  245. if (self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
  246. self.s.disconnectHandler.execute();
  247. } else if (self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
  248. self.s.disconnectHandler.execute({ executePrimary: true });
  249. } else if (self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
  250. self.s.disconnectHandler.execute({ executeSecondary: true });
  251. }
  252. }
  253. function connectNewServers(self, servers, callback) {
  254. // Count lefts
  255. var count = servers.length;
  256. var error = null;
  257. // Handle events
  258. var _handleEvent = function(self, event) {
  259. return function(err) {
  260. var _self = this;
  261. count = count - 1;
  262. // Destroyed
  263. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  264. return this.destroy({ force: true });
  265. }
  266. if (event === 'connect') {
  267. // Destroyed
  268. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  269. return _self.destroy({ force: true });
  270. }
  271. // Update the state
  272. var result = self.s.replicaSetState.update(_self);
  273. // Update the state with the new server
  274. if (result) {
  275. // Primary lastIsMaster store it
  276. if (_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
  277. self.ismaster = _self.lastIsMaster();
  278. }
  279. // Remove the handlers
  280. for (let i = 0; i < handlers.length; i++) {
  281. _self.removeAllListeners(handlers[i]);
  282. }
  283. // Add stable state handlers
  284. _self.on('error', handleEvent(self, 'error'));
  285. _self.on('close', handleEvent(self, 'close'));
  286. _self.on('timeout', handleEvent(self, 'timeout'));
  287. _self.on('parseError', handleEvent(self, 'parseError'));
  288. // Enalbe the monitoring of the new server
  289. monitorServer(_self.lastIsMaster().me, self, {});
  290. // Rexecute any stalled operation
  291. rexecuteOperations(self);
  292. } else {
  293. _self.destroy({ force: true });
  294. }
  295. } else if (event === 'error') {
  296. error = err;
  297. }
  298. // Rexecute any stalled operation
  299. rexecuteOperations(self);
  300. // Are we done finish up callback
  301. if (count === 0) {
  302. callback(error);
  303. }
  304. };
  305. };
  306. // No new servers
  307. if (count === 0) return callback();
  308. // Execute method
  309. function execute(_server, i) {
  310. setTimeout(function() {
  311. // Destroyed
  312. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  313. return;
  314. }
  315. // Create a new server instance
  316. var server = new Server(
  317. Object.assign({}, self.s.options, {
  318. host: _server.split(':')[0],
  319. port: parseInt(_server.split(':')[1], 10),
  320. reconnect: false,
  321. monitoring: false,
  322. parent: self,
  323. clientInfo: clone(self.s.clientInfo)
  324. })
  325. );
  326. // Add temp handlers
  327. server.once('connect', _handleEvent(self, 'connect'));
  328. server.once('close', _handleEvent(self, 'close'));
  329. server.once('timeout', _handleEvent(self, 'timeout'));
  330. server.once('error', _handleEvent(self, 'error'));
  331. server.once('parseError', _handleEvent(self, 'parseError'));
  332. // SDAM Monitoring events
  333. server.on('serverOpening', e => self.emit('serverOpening', e));
  334. server.on('serverDescriptionChanged', e => self.emit('serverDescriptionChanged', e));
  335. server.on('serverClosed', e => self.emit('serverClosed', e));
  336. // Command Monitoring events
  337. relayEvents(server, self, ['commandStarted', 'commandSucceeded', 'commandFailed']);
  338. server.connect(self.s.connectOptions);
  339. }, i);
  340. }
  341. // Create new instances
  342. for (var i = 0; i < servers.length; i++) {
  343. execute(servers[i], i);
  344. }
  345. }
  346. // Ping the server
  347. var pingServer = function(self, server, cb) {
  348. // Measure running time
  349. var start = new Date().getTime();
  350. // Emit the server heartbeat start
  351. emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: server.name });
  352. // Execute ismaster
  353. // Set the socketTimeout for a monitoring message to a low number
  354. // Ensuring ismaster calls are timed out quickly
  355. server.command(
  356. 'admin.$cmd',
  357. {
  358. ismaster: true
  359. },
  360. {
  361. monitoring: true,
  362. socketTimeout: self.s.options.connectionTimeout || 2000
  363. },
  364. function(err, r) {
  365. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  366. server.destroy({ force: true });
  367. return cb(err, r);
  368. }
  369. // Calculate latency
  370. var latencyMS = new Date().getTime() - start;
  371. // Set the last updatedTime
  372. var hrTime = process.hrtime();
  373. // Calculate the last update time
  374. server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1] / 1000);
  375. // We had an error, remove it from the state
  376. if (err) {
  377. // Emit the server heartbeat failure
  378. emitSDAMEvent(self, 'serverHeartbeatFailed', {
  379. durationMS: latencyMS,
  380. failure: err,
  381. connectionId: server.name
  382. });
  383. // Remove server from the state
  384. self.s.replicaSetState.remove(server);
  385. } else {
  386. // Update the server ismaster
  387. server.ismaster = r.result;
  388. // Check if we have a lastWriteDate convert it to MS
  389. // and store on the server instance for later use
  390. if (server.ismaster.lastWrite && server.ismaster.lastWrite.lastWriteDate) {
  391. server.lastWriteDate = server.ismaster.lastWrite.lastWriteDate.getTime();
  392. }
  393. // Do we have a brand new server
  394. if (server.lastIsMasterMS === -1) {
  395. server.lastIsMasterMS = latencyMS;
  396. } else if (server.lastIsMasterMS) {
  397. // After the first measurement, average RTT MUST be computed using an
  398. // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
  399. // If the prior average is denoted old_rtt, then the new average (new_rtt) is
  400. // computed from a new RTT measurement (x) using the following formula:
  401. // alpha = 0.2
  402. // new_rtt = alpha * x + (1 - alpha) * old_rtt
  403. server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * server.lastIsMasterMS;
  404. }
  405. if (self.s.replicaSetState.update(server)) {
  406. // Primary lastIsMaster store it
  407. if (server.lastIsMaster() && server.lastIsMaster().ismaster) {
  408. self.ismaster = server.lastIsMaster();
  409. }
  410. }
  411. // Server heart beat event
  412. emitSDAMEvent(self, 'serverHeartbeatSucceeded', {
  413. durationMS: latencyMS,
  414. reply: r.result,
  415. connectionId: server.name
  416. });
  417. }
  418. // Calculate the staleness for this server
  419. self.s.replicaSetState.updateServerMaxStaleness(server, self.s.haInterval);
  420. // Callback
  421. cb(err, r);
  422. }
  423. );
  424. };
  425. // Each server is monitored in parallel in their own timeout loop
  426. var monitorServer = function(host, self, options) {
  427. // If this is not the initial scan
  428. // Is this server already being monitoried, then skip monitoring
  429. if (!options.haInterval) {
  430. for (var i = 0; i < self.intervalIds.length; i++) {
  431. if (self.intervalIds[i].__host === host) {
  432. return;
  433. }
  434. }
  435. }
  436. // Get the haInterval
  437. var _process = options.haInterval ? Timeout : Interval;
  438. var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval;
  439. // Create the interval
  440. var intervalId = new _process(function() {
  441. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  442. // clearInterval(intervalId);
  443. intervalId.stop();
  444. return;
  445. }
  446. // Do we already have server connection available for this host
  447. var _server = self.s.replicaSetState.get(host);
  448. // Check if we have a known server connection and reuse
  449. if (_server) {
  450. // Ping the server
  451. return pingServer(self, _server, function(err) {
  452. if (err) {
  453. // NOTE: should something happen here?
  454. return;
  455. }
  456. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  457. intervalId.stop();
  458. return;
  459. }
  460. // Filter out all called intervaliIds
  461. self.intervalIds = self.intervalIds.filter(function(intervalId) {
  462. return intervalId.isRunning();
  463. });
  464. // Initial sweep
  465. if (_process === Timeout) {
  466. if (
  467. self.state === CONNECTING &&
  468. ((self.s.replicaSetState.hasSecondary() &&
  469. self.s.options.secondaryOnlyConnectionAllowed) ||
  470. self.s.replicaSetState.hasPrimary())
  471. ) {
  472. self.state = CONNECTED;
  473. // Emit connected sign
  474. process.nextTick(function() {
  475. self.emit('connect', self);
  476. });
  477. // Start topology interval check
  478. topologyMonitor(self, {});
  479. }
  480. } else {
  481. if (
  482. self.state === DISCONNECTED &&
  483. ((self.s.replicaSetState.hasSecondary() &&
  484. self.s.options.secondaryOnlyConnectionAllowed) ||
  485. self.s.replicaSetState.hasPrimary())
  486. ) {
  487. self.state = CONNECTED;
  488. // Rexecute any stalled operation
  489. rexecuteOperations(self);
  490. // Emit connected sign
  491. process.nextTick(function() {
  492. self.emit('reconnect', self);
  493. });
  494. }
  495. }
  496. if (
  497. self.initialConnectState.connect &&
  498. !self.initialConnectState.fullsetup &&
  499. self.s.replicaSetState.hasPrimaryAndSecondary()
  500. ) {
  501. // Set initial connect state
  502. self.initialConnectState.fullsetup = true;
  503. self.initialConnectState.all = true;
  504. process.nextTick(function() {
  505. self.emit('fullsetup', self);
  506. self.emit('all', self);
  507. });
  508. }
  509. });
  510. }
  511. }, _haInterval);
  512. // Start the interval
  513. intervalId.start();
  514. // Add the intervalId host name
  515. intervalId.__host = host;
  516. // Add the intervalId to our list of intervalIds
  517. self.intervalIds.push(intervalId);
  518. };
  519. function topologyMonitor(self, options) {
  520. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  521. options = options || {};
  522. // Get the servers
  523. var servers = Object.keys(self.s.replicaSetState.set);
  524. // Get the haInterval
  525. var _process = options.haInterval ? Timeout : Interval;
  526. var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval;
  527. if (_process === Timeout) {
  528. return connectNewServers(self, self.s.replicaSetState.unknownServers, function(err) {
  529. // Don't emit errors if the connection was already
  530. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  531. return;
  532. }
  533. if (!self.s.replicaSetState.hasPrimary() && !self.s.options.secondaryOnlyConnectionAllowed) {
  534. if (err) {
  535. return self.emit('error', err);
  536. }
  537. self.emit(
  538. 'error',
  539. new MongoError('no primary found in replicaset or invalid replica set name')
  540. );
  541. return self.destroy({ force: true });
  542. } else if (
  543. !self.s.replicaSetState.hasSecondary() &&
  544. self.s.options.secondaryOnlyConnectionAllowed
  545. ) {
  546. if (err) {
  547. return self.emit('error', err);
  548. }
  549. self.emit(
  550. 'error',
  551. new MongoError('no secondary found in replicaset or invalid replica set name')
  552. );
  553. return self.destroy({ force: true });
  554. }
  555. for (var i = 0; i < servers.length; i++) {
  556. monitorServer(servers[i], self, options);
  557. }
  558. });
  559. } else {
  560. for (var i = 0; i < servers.length; i++) {
  561. monitorServer(servers[i], self, options);
  562. }
  563. }
  564. // Run the reconnect process
  565. function executeReconnect(self) {
  566. return function() {
  567. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  568. return;
  569. }
  570. connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
  571. var monitoringFrequencey = self.s.replicaSetState.hasPrimary()
  572. ? _haInterval
  573. : self.s.minHeartbeatFrequencyMS;
  574. // Create a timeout
  575. self.intervalIds.push(new Timeout(executeReconnect(self), monitoringFrequencey).start());
  576. });
  577. };
  578. }
  579. // Decide what kind of interval to use
  580. var intervalTime = !self.s.replicaSetState.hasPrimary()
  581. ? self.s.minHeartbeatFrequencyMS
  582. : _haInterval;
  583. self.intervalIds.push(new Timeout(executeReconnect(self), intervalTime).start());
  584. }
  585. function addServerToList(list, server) {
  586. for (var i = 0; i < list.length; i++) {
  587. if (list[i].name.toLowerCase() === server.name.toLowerCase()) return true;
  588. }
  589. list.push(server);
  590. }
  591. function handleEvent(self, event) {
  592. return function() {
  593. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  594. // Debug log
  595. if (self.s.logger.isDebug()) {
  596. self.s.logger.debug(
  597. f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id)
  598. );
  599. }
  600. // Remove from the replicaset state
  601. self.s.replicaSetState.remove(this);
  602. // Are we in a destroyed state return
  603. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  604. // If no primary and secondary available
  605. if (
  606. !self.s.replicaSetState.hasPrimary() &&
  607. !self.s.replicaSetState.hasSecondary() &&
  608. self.s.options.secondaryOnlyConnectionAllowed
  609. ) {
  610. stateTransition(self, DISCONNECTED);
  611. } else if (!self.s.replicaSetState.hasPrimary()) {
  612. stateTransition(self, DISCONNECTED);
  613. }
  614. addServerToList(self.s.connectingServers, this);
  615. };
  616. }
  617. function shouldTriggerConnect(self) {
  618. const isConnecting = self.state === CONNECTING;
  619. const hasPrimary = self.s.replicaSetState.hasPrimary();
  620. const hasSecondary = self.s.replicaSetState.hasSecondary();
  621. const secondaryOnlyConnectionAllowed = self.s.options.secondaryOnlyConnectionAllowed;
  622. const readPreferenceSecondary =
  623. self.s.connectOptions.readPreference &&
  624. self.s.connectOptions.readPreference.equals(ReadPreference.secondary);
  625. return (
  626. (isConnecting &&
  627. ((readPreferenceSecondary && hasSecondary) || (!readPreferenceSecondary && hasPrimary))) ||
  628. (hasSecondary && secondaryOnlyConnectionAllowed)
  629. );
  630. }
  631. function handleInitialConnectEvent(self, event) {
  632. return function() {
  633. var _this = this;
  634. // Debug log
  635. if (self.s.logger.isDebug()) {
  636. self.s.logger.debug(
  637. f(
  638. 'handleInitialConnectEvent %s from server %s in replset with id %s',
  639. event,
  640. this.name,
  641. self.id
  642. )
  643. );
  644. }
  645. // Destroy the instance
  646. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  647. return this.destroy({ force: true });
  648. }
  649. // Check the type of server
  650. if (event === 'connect') {
  651. // Update the state
  652. var result = self.s.replicaSetState.update(_this);
  653. if (result === true) {
  654. // Primary lastIsMaster store it
  655. if (_this.lastIsMaster() && _this.lastIsMaster().ismaster) {
  656. self.ismaster = _this.lastIsMaster();
  657. }
  658. // Debug log
  659. if (self.s.logger.isDebug()) {
  660. self.s.logger.debug(
  661. f(
  662. 'handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]',
  663. event,
  664. _this.name,
  665. self.id,
  666. JSON.stringify(self.s.replicaSetState.set)
  667. )
  668. );
  669. }
  670. // Remove the handlers
  671. for (let i = 0; i < handlers.length; i++) {
  672. _this.removeAllListeners(handlers[i]);
  673. }
  674. // Add stable state handlers
  675. _this.on('error', handleEvent(self, 'error'));
  676. _this.on('close', handleEvent(self, 'close'));
  677. _this.on('timeout', handleEvent(self, 'timeout'));
  678. _this.on('parseError', handleEvent(self, 'parseError'));
  679. // Do we have a primary or primaryAndSecondary
  680. if (shouldTriggerConnect(self)) {
  681. // We are connected
  682. self.state = CONNECTED;
  683. // Set initial connect state
  684. self.initialConnectState.connect = true;
  685. // Emit connect event
  686. process.nextTick(function() {
  687. self.emit('connect', self);
  688. });
  689. topologyMonitor(self, {});
  690. }
  691. } else if (result instanceof MongoError) {
  692. _this.destroy({ force: true });
  693. self.destroy({ force: true });
  694. return self.emit('error', result);
  695. } else {
  696. _this.destroy({ force: true });
  697. }
  698. } else {
  699. // Emit failure to connect
  700. self.emit('failed', this);
  701. addServerToList(self.s.connectingServers, this);
  702. // Remove from the state
  703. self.s.replicaSetState.remove(this);
  704. }
  705. if (
  706. self.initialConnectState.connect &&
  707. !self.initialConnectState.fullsetup &&
  708. self.s.replicaSetState.hasPrimaryAndSecondary()
  709. ) {
  710. // Set initial connect state
  711. self.initialConnectState.fullsetup = true;
  712. self.initialConnectState.all = true;
  713. process.nextTick(function() {
  714. self.emit('fullsetup', self);
  715. self.emit('all', self);
  716. });
  717. }
  718. // Remove from the list from connectingServers
  719. for (var i = 0; i < self.s.connectingServers.length; i++) {
  720. if (self.s.connectingServers[i].equals(this)) {
  721. self.s.connectingServers.splice(i, 1);
  722. }
  723. }
  724. // Trigger topologyMonitor
  725. if (self.s.connectingServers.length === 0 && self.state === CONNECTING) {
  726. topologyMonitor(self, { haInterval: 1 });
  727. }
  728. };
  729. }
  730. function connectServers(self, servers) {
  731. // Update connectingServers
  732. self.s.connectingServers = self.s.connectingServers.concat(servers);
  733. // Index used to interleaf the server connects, avoiding
  734. // runtime issues on io constrained vm's
  735. var timeoutInterval = 0;
  736. function connect(server, timeoutInterval) {
  737. setTimeout(function() {
  738. // Add the server to the state
  739. if (self.s.replicaSetState.update(server)) {
  740. // Primary lastIsMaster store it
  741. if (server.lastIsMaster() && server.lastIsMaster().ismaster) {
  742. self.ismaster = server.lastIsMaster();
  743. }
  744. }
  745. // Add event handlers
  746. server.once('close', handleInitialConnectEvent(self, 'close'));
  747. server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
  748. server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
  749. server.once('error', handleInitialConnectEvent(self, 'error'));
  750. server.once('connect', handleInitialConnectEvent(self, 'connect'));
  751. // SDAM Monitoring events
  752. server.on('serverOpening', e => self.emit('serverOpening', e));
  753. server.on('serverDescriptionChanged', e => self.emit('serverDescriptionChanged', e));
  754. server.on('serverClosed', e => self.emit('serverClosed', e));
  755. // Command Monitoring events
  756. relayEvents(server, self, ['commandStarted', 'commandSucceeded', 'commandFailed']);
  757. // Start connection
  758. server.connect(self.s.connectOptions);
  759. }, timeoutInterval);
  760. }
  761. // Start all the servers
  762. while (servers.length > 0) {
  763. connect(servers.shift(), timeoutInterval++);
  764. }
  765. }
  766. /**
  767. * Emit event if it exists
  768. * @method
  769. */
  770. function emitSDAMEvent(self, event, description) {
  771. if (self.listeners(event).length > 0) {
  772. self.emit(event, description);
  773. }
  774. }
  775. /**
  776. * Initiate server connect
  777. */
  778. ReplSet.prototype.connect = function(options) {
  779. var self = this;
  780. // Add any connect level options to the internal state
  781. this.s.connectOptions = options || {};
  782. // Set connecting state
  783. stateTransition(this, CONNECTING);
  784. // Create server instances
  785. var servers = this.s.seedlist.map(function(x) {
  786. return new Server(
  787. Object.assign({}, self.s.options, x, options, {
  788. reconnect: false,
  789. monitoring: false,
  790. parent: self,
  791. clientInfo: clone(self.s.clientInfo)
  792. })
  793. );
  794. });
  795. // Error out as high availbility interval must be < than socketTimeout
  796. if (
  797. this.s.options.socketTimeout > 0 &&
  798. this.s.options.socketTimeout <= this.s.options.haInterval
  799. ) {
  800. return self.emit(
  801. 'error',
  802. new MongoError(
  803. f(
  804. 'haInterval [%s] MS must be set to less than socketTimeout [%s] MS',
  805. this.s.options.haInterval,
  806. this.s.options.socketTimeout
  807. )
  808. )
  809. );
  810. }
  811. // Emit the topology opening event
  812. emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
  813. // Start all server connections
  814. connectServers(self, servers);
  815. };
  816. /**
  817. * Authenticate the topology.
  818. * @method
  819. * @param {MongoCredentials} credentials The credentials for authentication we are using
  820. * @param {authResultCallback} callback A callback function
  821. */
  822. ReplSet.prototype.auth = function(credentials, callback) {
  823. if (typeof callback === 'function') callback(null, null);
  824. };
  825. /**
  826. * Destroy the server connection
  827. * @param {boolean} [options.force=false] Force destroy the pool
  828. * @method
  829. */
  830. ReplSet.prototype.destroy = function(options, callback) {
  831. options = options || {};
  832. let destroyCount = this.s.connectingServers.length + 1; // +1 for the callback from `replicaSetState.destroy`
  833. const serverDestroyed = () => {
  834. destroyCount--;
  835. if (destroyCount > 0) {
  836. return;
  837. }
  838. // Emit toplogy closing event
  839. emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
  840. // Transition state
  841. stateTransition(this, DESTROYED);
  842. if (typeof callback === 'function') {
  843. callback(null, null);
  844. }
  845. };
  846. // Clear out any monitoring process
  847. if (this.haTimeoutId) clearTimeout(this.haTimeoutId);
  848. // Clear out all monitoring
  849. for (var i = 0; i < this.intervalIds.length; i++) {
  850. this.intervalIds[i].stop();
  851. }
  852. // Reset list of intervalIds
  853. this.intervalIds = [];
  854. if (destroyCount === 0) {
  855. serverDestroyed();
  856. return;
  857. }
  858. // Destroy the replicaset
  859. this.s.replicaSetState.destroy(options, serverDestroyed);
  860. // Destroy all connecting servers
  861. this.s.connectingServers.forEach(function(x) {
  862. x.destroy(options, serverDestroyed);
  863. });
  864. };
  865. /**
  866. * Unref all connections belong to this server
  867. * @method
  868. */
  869. ReplSet.prototype.unref = function() {
  870. // Transition state
  871. stateTransition(this, UNREFERENCED);
  872. this.s.replicaSetState.allServers().forEach(function(x) {
  873. x.unref();
  874. });
  875. clearTimeout(this.haTimeoutId);
  876. };
  877. /**
  878. * Returns the last known ismaster document for this server
  879. * @method
  880. * @return {object}
  881. */
  882. ReplSet.prototype.lastIsMaster = function() {
  883. // If secondaryOnlyConnectionAllowed and no primary but secondary
  884. // return the secondaries ismaster result.
  885. if (
  886. this.s.options.secondaryOnlyConnectionAllowed &&
  887. !this.s.replicaSetState.hasPrimary() &&
  888. this.s.replicaSetState.hasSecondary()
  889. ) {
  890. return this.s.replicaSetState.secondaries[0].lastIsMaster();
  891. }
  892. return this.s.replicaSetState.primary
  893. ? this.s.replicaSetState.primary.lastIsMaster()
  894. : this.ismaster;
  895. };
  896. /**
  897. * All raw connections
  898. * @method
  899. * @return {Connection[]}
  900. */
  901. ReplSet.prototype.connections = function() {
  902. var servers = this.s.replicaSetState.allServers();
  903. var connections = [];
  904. for (var i = 0; i < servers.length; i++) {
  905. connections = connections.concat(servers[i].connections());
  906. }
  907. return connections;
  908. };
  909. /**
  910. * Figure out if the server is connected
  911. * @method
  912. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  913. * @return {boolean}
  914. */
  915. ReplSet.prototype.isConnected = function(options) {
  916. options = options || {};
  917. // If we specified a read preference check if we are connected to something
  918. // than can satisfy this
  919. if (options.readPreference && options.readPreference.equals(ReadPreference.secondary)) {
  920. return this.s.replicaSetState.hasSecondary();
  921. }
  922. if (options.readPreference && options.readPreference.equals(ReadPreference.primary)) {
  923. return this.s.replicaSetState.hasPrimary();
  924. }
  925. if (options.readPreference && options.readPreference.equals(ReadPreference.primaryPreferred)) {
  926. return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
  927. }
  928. if (options.readPreference && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
  929. return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
  930. }
  931. if (this.s.options.secondaryOnlyConnectionAllowed && this.s.replicaSetState.hasSecondary()) {
  932. return true;
  933. }
  934. return this.s.replicaSetState.hasPrimary();
  935. };
  936. /**
  937. * Figure out if the replicaset instance was destroyed by calling destroy
  938. * @method
  939. * @return {boolean}
  940. */
  941. ReplSet.prototype.isDestroyed = function() {
  942. return this.state === DESTROYED;
  943. };
  944. /**
  945. * Selects a server
  946. *
  947. * @method
  948. * @param {function} selector Unused
  949. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  950. * @param {ClientSession} [options.session] Unused
  951. * @param {function} callback
  952. */
  953. ReplSet.prototype.selectServer = function(selector, options, callback) {
  954. if (typeof selector === 'function' && typeof callback === 'undefined')
  955. (callback = selector), (selector = undefined), (options = {});
  956. if (typeof options === 'function')
  957. (callback = options), (options = selector), (selector = undefined);
  958. options = options || {};
  959. const server = this.s.replicaSetState.pickServer(options.readPreference);
  960. if (this.s.debug) this.emit('pickedServer', options.readPreference, server);
  961. callback(null, server);
  962. };
  963. /**
  964. * Get all connected servers
  965. * @method
  966. * @return {Server[]}
  967. */
  968. ReplSet.prototype.getServers = function() {
  969. return this.s.replicaSetState.allServers();
  970. };
  971. //
  972. // Execute write operation
  973. function executeWriteOperation(args, options, callback) {
  974. if (typeof options === 'function') (callback = options), (options = {});
  975. options = options || {};
  976. // TODO: once we drop Node 4, use destructuring either here or in arguments.
  977. const self = args.self;
  978. const op = args.op;
  979. const ns = args.ns;
  980. const ops = args.ops;
  981. if (self.state === DESTROYED) {
  982. return callback(new MongoError(f('topology was destroyed')));
  983. }
  984. const willRetryWrite =
  985. !args.retrying &&
  986. !!options.retryWrites &&
  987. options.session &&
  988. isRetryableWritesSupported(self) &&
  989. !options.session.inTransaction();
  990. if (!self.s.replicaSetState.hasPrimary()) {
  991. if (self.s.disconnectHandler) {
  992. // Not connected but we have a disconnecthandler
  993. return self.s.disconnectHandler.add(op, ns, ops, options, callback);
  994. } else if (!willRetryWrite) {
  995. // No server returned we had an error
  996. return callback(new MongoError('no primary server found'));
  997. }
  998. }
  999. const handler = (err, result) => {
  1000. if (!err) return callback(null, result);
  1001. if (!isRetryableError(err)) {
  1002. return callback(err);
  1003. }
  1004. if (willRetryWrite) {
  1005. const newArgs = Object.assign({}, args, { retrying: true });
  1006. return executeWriteOperation(newArgs, options, callback);
  1007. }
  1008. // Per SDAM, remove primary from replicaset
  1009. if (self.s.replicaSetState.primary) {
  1010. self.s.replicaSetState.remove(self.s.replicaSetState.primary, { force: true });
  1011. }
  1012. return callback(err);
  1013. };
  1014. if (callback.operationId) {
  1015. handler.operationId = callback.operationId;
  1016. }
  1017. // increment and assign txnNumber
  1018. if (willRetryWrite) {
  1019. options.session.incrementTransactionNumber();
  1020. options.willRetryWrite = willRetryWrite;
  1021. }
  1022. self.s.replicaSetState.primary[op](ns, ops, options, handler);
  1023. }
  1024. /**
  1025. * Insert one or more documents
  1026. * @method
  1027. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1028. * @param {array} ops An array of documents to insert
  1029. * @param {boolean} [options.ordered=true] Execute in order or out of order
  1030. * @param {object} [options.writeConcern={}] Write concern for the operation
  1031. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  1032. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  1033. * @param {ClientSession} [options.session=null] Session to use for the operation
  1034. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  1035. * @param {opResultCallback} callback A callback function
  1036. */
  1037. ReplSet.prototype.insert = function(ns, ops, options, callback) {
  1038. // Execute write operation
  1039. executeWriteOperation({ self: this, op: 'insert', ns, ops }, options, callback);
  1040. };
  1041. /**
  1042. * Perform one or more update operations
  1043. * @method
  1044. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1045. * @param {array} ops An array of updates
  1046. * @param {boolean} [options.ordered=true] Execute in order or out of order
  1047. * @param {object} [options.writeConcern={}] Write concern for the operation
  1048. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  1049. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  1050. * @param {ClientSession} [options.session=null] Session to use for the operation
  1051. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  1052. * @param {opResultCallback} callback A callback function
  1053. */
  1054. ReplSet.prototype.update = function(ns, ops, options, callback) {
  1055. // Execute write operation
  1056. executeWriteOperation({ self: this, op: 'update', ns, ops }, options, callback);
  1057. };
  1058. /**
  1059. * Perform one or more remove operations
  1060. * @method
  1061. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1062. * @param {array} ops An array of removes
  1063. * @param {boolean} [options.ordered=true] Execute in order or out of order
  1064. * @param {object} [options.writeConcern={}] Write concern for the operation
  1065. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  1066. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  1067. * @param {ClientSession} [options.session=null] Session to use for the operation
  1068. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  1069. * @param {opResultCallback} callback A callback function
  1070. */
  1071. ReplSet.prototype.remove = function(ns, ops, options, callback) {
  1072. // Execute write operation
  1073. executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
  1074. };
  1075. const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
  1076. function isWriteCommand(command) {
  1077. return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
  1078. }
  1079. /**
  1080. * Execute a command
  1081. * @method
  1082. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1083. * @param {object} cmd The command hash
  1084. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  1085. * @param {Connection} [options.connection] Specify connection object to execute command against
  1086. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  1087. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  1088. * @param {ClientSession} [options.session=null] Session to use for the operation
  1089. * @param {opResultCallback} callback A callback function
  1090. */
  1091. ReplSet.prototype.command = function(ns, cmd, options, callback) {
  1092. if (typeof options === 'function') {
  1093. (callback = options), (options = {}), (options = options || {});
  1094. }
  1095. if (this.state === DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  1096. var self = this;
  1097. // Establish readPreference
  1098. var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
  1099. // If the readPreference is primary and we have no primary, store it
  1100. if (
  1101. readPreference.preference === 'primary' &&
  1102. !this.s.replicaSetState.hasPrimary() &&
  1103. this.s.disconnectHandler != null
  1104. ) {
  1105. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  1106. } else if (
  1107. readPreference.preference === 'secondary' &&
  1108. !this.s.replicaSetState.hasSecondary() &&
  1109. this.s.disconnectHandler != null
  1110. ) {
  1111. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  1112. } else if (
  1113. readPreference.preference !== 'primary' &&
  1114. !this.s.replicaSetState.hasSecondary() &&
  1115. !this.s.replicaSetState.hasPrimary() &&
  1116. this.s.disconnectHandler != null
  1117. ) {
  1118. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  1119. }
  1120. // Pick a server
  1121. var server = this.s.replicaSetState.pickServer(readPreference);
  1122. // We received an error, return it
  1123. if (!(server instanceof Server)) return callback(server);
  1124. // Emit debug event
  1125. if (self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
  1126. // No server returned we had an error
  1127. if (server == null) {
  1128. return callback(
  1129. new MongoError(
  1130. f('no server found that matches the provided readPreference %s', readPreference)
  1131. )
  1132. );
  1133. }
  1134. const willRetryWrite =
  1135. !options.retrying &&
  1136. !!options.retryWrites &&
  1137. options.session &&
  1138. isRetryableWritesSupported(self) &&
  1139. !options.session.inTransaction() &&
  1140. isWriteCommand(cmd);
  1141. const cb = (err, result) => {
  1142. if (!err) return callback(null, result);
  1143. if (!isRetryableError(err)) {
  1144. return callback(err);
  1145. }
  1146. if (willRetryWrite) {
  1147. const newOptions = Object.assign({}, options, { retrying: true });
  1148. return this.command(ns, cmd, newOptions, callback);
  1149. }
  1150. // Per SDAM, remove primary from replicaset
  1151. if (this.s.replicaSetState.primary) {
  1152. this.s.replicaSetState.remove(this.s.replicaSetState.primary, { force: true });
  1153. }
  1154. return callback(err);
  1155. };
  1156. // increment and assign txnNumber
  1157. if (willRetryWrite) {
  1158. options.session.incrementTransactionNumber();
  1159. options.willRetryWrite = willRetryWrite;
  1160. }
  1161. // Execute the command
  1162. server.command(ns, cmd, options, cb);
  1163. };
  1164. /**
  1165. * Get a new cursor
  1166. * @method
  1167. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1168. * @param {object|Long} cmd Can be either a command returning a cursor or a cursorId
  1169. * @param {object} [options] Options for the cursor
  1170. * @param {object} [options.batchSize=0] Batchsize for the operation
  1171. * @param {array} [options.documents=[]] Initial documents list for cursor
  1172. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  1173. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  1174. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  1175. * @param {ClientSession} [options.session=null] Session to use for the operation
  1176. * @param {object} [options.topology] The internal topology of the created cursor
  1177. * @returns {Cursor}
  1178. */
  1179. ReplSet.prototype.cursor = function(ns, cmd, options) {
  1180. options = options || {};
  1181. const topology = options.topology || this;
  1182. // Set up final cursor type
  1183. var FinalCursor = options.cursorFactory || this.s.Cursor;
  1184. // Return the cursor
  1185. return new FinalCursor(this.s.bson, ns, cmd, options, topology, this.s.options);
  1186. };
  1187. /**
  1188. * A replset connect event, used to verify that the connection is up and running
  1189. *
  1190. * @event ReplSet#connect
  1191. * @type {ReplSet}
  1192. */
  1193. /**
  1194. * A replset reconnect event, used to verify that the topology reconnected
  1195. *
  1196. * @event ReplSet#reconnect
  1197. * @type {ReplSet}
  1198. */
  1199. /**
  1200. * A replset fullsetup event, used to signal that all topology members have been contacted.
  1201. *
  1202. * @event ReplSet#fullsetup
  1203. * @type {ReplSet}
  1204. */
  1205. /**
  1206. * A replset all event, used to signal that all topology members have been contacted.
  1207. *
  1208. * @event ReplSet#all
  1209. * @type {ReplSet}
  1210. */
  1211. /**
  1212. * A replset failed event, used to signal that initial replset connection failed.
  1213. *
  1214. * @event ReplSet#failed
  1215. * @type {ReplSet}
  1216. */
  1217. /**
  1218. * A server member left the replicaset
  1219. *
  1220. * @event ReplSet#left
  1221. * @type {function}
  1222. * @param {string} type The type of member that left (primary|secondary|arbiter)
  1223. * @param {Server} server The server object that left
  1224. */
  1225. /**
  1226. * A server member joined the replicaset
  1227. *
  1228. * @event ReplSet#joined
  1229. * @type {function}
  1230. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  1231. * @param {Server} server The server object that joined
  1232. */
  1233. /**
  1234. * A server opening SDAM monitoring event
  1235. *
  1236. * @event ReplSet#serverOpening
  1237. * @type {object}
  1238. */
  1239. /**
  1240. * A server closed SDAM monitoring event
  1241. *
  1242. * @event ReplSet#serverClosed
  1243. * @type {object}
  1244. */
  1245. /**
  1246. * A server description SDAM change monitoring event
  1247. *
  1248. * @event ReplSet#serverDescriptionChanged
  1249. * @type {object}
  1250. */
  1251. /**
  1252. * A topology open SDAM event
  1253. *
  1254. * @event ReplSet#topologyOpening
  1255. * @type {object}
  1256. */
  1257. /**
  1258. * A topology closed SDAM event
  1259. *
  1260. * @event ReplSet#topologyClosed
  1261. * @type {object}
  1262. */
  1263. /**
  1264. * A topology structure SDAM change event
  1265. *
  1266. * @event ReplSet#topologyDescriptionChanged
  1267. * @type {object}
  1268. */
  1269. /**
  1270. * A topology serverHeartbeatStarted SDAM event
  1271. *
  1272. * @event ReplSet#serverHeartbeatStarted
  1273. * @type {object}
  1274. */
  1275. /**
  1276. * A topology serverHeartbeatFailed SDAM event
  1277. *
  1278. * @event ReplSet#serverHeartbeatFailed
  1279. * @type {object}
  1280. */
  1281. /**
  1282. * A topology serverHeartbeatSucceeded SDAM change event
  1283. *
  1284. * @event ReplSet#serverHeartbeatSucceeded
  1285. * @type {object}
  1286. */
  1287. /**
  1288. * An event emitted indicating a command was started, if command monitoring is enabled
  1289. *
  1290. * @event ReplSet#commandStarted
  1291. * @type {object}
  1292. */
  1293. /**
  1294. * An event emitted indicating a command succeeded, if command monitoring is enabled
  1295. *
  1296. * @event ReplSet#commandSucceeded
  1297. * @type {object}
  1298. */
  1299. /**
  1300. * An event emitted indicating a command failed, if command monitoring is enabled
  1301. *
  1302. * @event ReplSet#commandFailed
  1303. * @type {object}
  1304. */
  1305. module.exports = ReplSet;