mongos.js 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351
  1. 'use strict';
  2. const inherits = require('util').inherits;
  3. const f = require('util').format;
  4. const EventEmitter = require('events').EventEmitter;
  5. const BasicCursor = require('../cursor');
  6. const Logger = require('../connection/logger');
  7. const retrieveBSON = require('../connection/utils').retrieveBSON;
  8. const MongoError = require('../error').MongoError;
  9. const Server = require('./server');
  10. const clone = require('./shared').clone;
  11. const diff = require('./shared').diff;
  12. const cloneOptions = require('./shared').cloneOptions;
  13. const createClientInfo = require('./shared').createClientInfo;
  14. const SessionMixins = require('./shared').SessionMixins;
  15. const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
  16. const relayEvents = require('../utils').relayEvents;
  17. const isRetryableError = require('../error').isRetryableError;
  18. const BSON = retrieveBSON();
  19. /**
  20. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  21. * used to construct connections.
  22. *
  23. * @example
  24. * var Mongos = require('mongodb-core').Mongos
  25. * , ReadPreference = require('mongodb-core').ReadPreference
  26. * , assert = require('assert');
  27. *
  28. * var server = new Mongos([{host: 'localhost', port: 30000}]);
  29. * // Wait for the connection event
  30. * server.on('connect', function(server) {
  31. * server.destroy();
  32. * });
  33. *
  34. * // Start connecting
  35. * server.connect();
  36. */
  37. //
  38. // States
  39. var DISCONNECTED = 'disconnected';
  40. var CONNECTING = 'connecting';
  41. var CONNECTED = 'connected';
  42. var UNREFERENCED = 'unreferenced';
  43. var DESTROYED = 'destroyed';
  44. function stateTransition(self, newState) {
  45. var legalTransitions = {
  46. disconnected: [CONNECTING, DESTROYED, DISCONNECTED],
  47. connecting: [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
  48. connected: [CONNECTED, DISCONNECTED, DESTROYED, UNREFERENCED],
  49. unreferenced: [UNREFERENCED, DESTROYED],
  50. destroyed: [DESTROYED]
  51. };
  52. // Get current state
  53. var legalStates = legalTransitions[self.state];
  54. if (legalStates && legalStates.indexOf(newState) !== -1) {
  55. self.state = newState;
  56. } else {
  57. self.logger.error(
  58. f(
  59. 'Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]',
  60. self.id,
  61. self.state,
  62. newState,
  63. legalStates
  64. )
  65. );
  66. }
  67. }
  68. //
  69. // ReplSet instance id
  70. var id = 1;
  71. var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
  72. /**
  73. * Creates a new Mongos instance
  74. * @class
  75. * @param {array} seedlist A list of seeds for the replicaset
  76. * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
  77. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  78. * @param {number} [options.size=5] Server connection pool size
  79. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  80. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  81. * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
  82. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  83. * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
  84. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  85. * @param {boolean} [options.ssl=false] Use SSL for connection
  86. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  87. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  88. * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
  89. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  90. * @param {Buffer} [options.key] SSL Key file binary buffer
  91. * @param {string} [options.passphrase] SSL Certificate pass phrase
  92. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  93. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  94. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  95. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  96. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  97. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  98. * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology
  99. * @return {Mongos} A cursor instance
  100. * @fires Mongos#connect
  101. * @fires Mongos#reconnect
  102. * @fires Mongos#joined
  103. * @fires Mongos#left
  104. * @fires Mongos#failed
  105. * @fires Mongos#fullsetup
  106. * @fires Mongos#all
  107. * @fires Mongos#serverHeartbeatStarted
  108. * @fires Mongos#serverHeartbeatSucceeded
  109. * @fires Mongos#serverHeartbeatFailed
  110. * @fires Mongos#topologyOpening
  111. * @fires Mongos#topologyClosed
  112. * @fires Mongos#topologyDescriptionChanged
  113. * @property {string} type the topology type.
  114. * @property {string} parserType the parser type used (c++ or js).
  115. */
  116. var Mongos = function(seedlist, options) {
  117. options = options || {};
  118. // Get replSet Id
  119. this.id = id++;
  120. // Internal state
  121. this.s = {
  122. options: Object.assign({}, options),
  123. // BSON instance
  124. bson:
  125. options.bson ||
  126. new BSON([
  127. BSON.Binary,
  128. BSON.Code,
  129. BSON.DBRef,
  130. BSON.Decimal128,
  131. BSON.Double,
  132. BSON.Int32,
  133. BSON.Long,
  134. BSON.Map,
  135. BSON.MaxKey,
  136. BSON.MinKey,
  137. BSON.ObjectId,
  138. BSON.BSONRegExp,
  139. BSON.Symbol,
  140. BSON.Timestamp
  141. ]),
  142. // Factory overrides
  143. Cursor: options.cursorFactory || BasicCursor,
  144. // Logger instance
  145. logger: Logger('Mongos', options),
  146. // Seedlist
  147. seedlist: seedlist,
  148. // Ha interval
  149. haInterval: options.haInterval ? options.haInterval : 10000,
  150. // Disconnect handler
  151. disconnectHandler: options.disconnectHandler,
  152. // Server selection index
  153. index: 0,
  154. // Connect function options passed in
  155. connectOptions: {},
  156. // Are we running in debug mode
  157. debug: typeof options.debug === 'boolean' ? options.debug : false,
  158. // localThresholdMS
  159. localThresholdMS: options.localThresholdMS || 15,
  160. // Client info
  161. clientInfo: createClientInfo(options)
  162. };
  163. // Set the client info
  164. this.s.options.clientInfo = createClientInfo(options);
  165. // Log info warning if the socketTimeout < haInterval as it will cause
  166. // a lot of recycled connections to happen.
  167. if (
  168. this.s.logger.isWarn() &&
  169. this.s.options.socketTimeout !== 0 &&
  170. this.s.options.socketTimeout < this.s.haInterval
  171. ) {
  172. this.s.logger.warn(
  173. f(
  174. 'warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts',
  175. this.s.options.socketTimeout,
  176. this.s.haInterval
  177. )
  178. );
  179. }
  180. // Disconnected state
  181. this.state = DISCONNECTED;
  182. // Current proxies we are connecting to
  183. this.connectingProxies = [];
  184. // Currently connected proxies
  185. this.connectedProxies = [];
  186. // Disconnected proxies
  187. this.disconnectedProxies = [];
  188. // Index of proxy to run operations against
  189. this.index = 0;
  190. // High availability timeout id
  191. this.haTimeoutId = null;
  192. // Last ismaster
  193. this.ismaster = null;
  194. // Description of the Replicaset
  195. this.topologyDescription = {
  196. topologyType: 'Unknown',
  197. servers: []
  198. };
  199. // Highest clusterTime seen in responses from the current deployment
  200. this.clusterTime = null;
  201. // Add event listener
  202. EventEmitter.call(this);
  203. };
  204. inherits(Mongos, EventEmitter);
  205. Object.assign(Mongos.prototype, SessionMixins);
  206. Object.defineProperty(Mongos.prototype, 'type', {
  207. enumerable: true,
  208. get: function() {
  209. return 'mongos';
  210. }
  211. });
  212. Object.defineProperty(Mongos.prototype, 'parserType', {
  213. enumerable: true,
  214. get: function() {
  215. return BSON.native ? 'c++' : 'js';
  216. }
  217. });
  218. Object.defineProperty(Mongos.prototype, 'logicalSessionTimeoutMinutes', {
  219. enumerable: true,
  220. get: function() {
  221. if (!this.ismaster) return null;
  222. return this.ismaster.logicalSessionTimeoutMinutes || null;
  223. }
  224. });
  225. /**
  226. * Emit event if it exists
  227. * @method
  228. */
  229. function emitSDAMEvent(self, event, description) {
  230. if (self.listeners(event).length > 0) {
  231. self.emit(event, description);
  232. }
  233. }
  234. const SERVER_EVENTS = ['serverDescriptionChanged', 'error', 'close', 'timeout', 'parseError'];
  235. function destroyServer(server, options, callback) {
  236. options = options || {};
  237. SERVER_EVENTS.forEach(event => server.removeAllListeners(event));
  238. server.destroy(options, callback);
  239. }
  240. /**
  241. * Initiate server connect
  242. */
  243. Mongos.prototype.connect = function(options) {
  244. var self = this;
  245. // Add any connect level options to the internal state
  246. this.s.connectOptions = options || {};
  247. // Set connecting state
  248. stateTransition(this, CONNECTING);
  249. // Create server instances
  250. var servers = this.s.seedlist.map(function(x) {
  251. const server = new Server(
  252. Object.assign({}, self.s.options, x, options, {
  253. reconnect: false,
  254. monitoring: false,
  255. parent: self,
  256. clientInfo: clone(self.s.clientInfo)
  257. })
  258. );
  259. relayEvents(server, self, ['serverDescriptionChanged']);
  260. return server;
  261. });
  262. // Emit the topology opening event
  263. emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
  264. // Start all server connections
  265. connectProxies(self, servers);
  266. };
  267. /**
  268. * Authenticate the topology.
  269. * @method
  270. * @param {MongoCredentials} credentials The credentials for authentication we are using
  271. * @param {authResultCallback} callback A callback function
  272. */
  273. Mongos.prototype.auth = function(credentials, callback) {
  274. if (typeof callback === 'function') callback(null, null);
  275. };
  276. function handleEvent(self) {
  277. return function() {
  278. if (self.state === DESTROYED) return;
  279. // Move to list of disconnectedProxies
  280. moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
  281. // Emit the initial topology
  282. emitTopologyDescriptionChanged(self);
  283. // Emit the left signal
  284. self.emit('left', 'mongos', this);
  285. // Emit the sdam event
  286. self.emit('serverClosed', {
  287. topologyId: self.id,
  288. address: this.name
  289. });
  290. };
  291. }
  292. function handleInitialConnectEvent(self, event) {
  293. return function() {
  294. var _this = this;
  295. // Destroy the instance
  296. if (self.state === DESTROYED) {
  297. // Emit the initial topology
  298. emitTopologyDescriptionChanged(self);
  299. // Move from connectingProxies
  300. moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
  301. return this.destroy();
  302. }
  303. // Check the type of server
  304. if (event === 'connect') {
  305. // Get last known ismaster
  306. self.ismaster = _this.lastIsMaster();
  307. // Is this not a proxy, remove t
  308. if (self.ismaster.msg === 'isdbgrid') {
  309. // Add to the connectd list
  310. for (let i = 0; i < self.connectedProxies.length; i++) {
  311. if (self.connectedProxies[i].name === _this.name) {
  312. // Move from connectingProxies
  313. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _this);
  314. // Emit the initial topology
  315. emitTopologyDescriptionChanged(self);
  316. _this.destroy();
  317. return self.emit('failed', _this);
  318. }
  319. }
  320. // Remove the handlers
  321. for (let i = 0; i < handlers.length; i++) {
  322. _this.removeAllListeners(handlers[i]);
  323. }
  324. // Add stable state handlers
  325. _this.on('error', handleEvent(self, 'error'));
  326. _this.on('close', handleEvent(self, 'close'));
  327. _this.on('timeout', handleEvent(self, 'timeout'));
  328. _this.on('parseError', handleEvent(self, 'parseError'));
  329. // Move from connecting proxies connected
  330. moveServerFrom(self.connectingProxies, self.connectedProxies, _this);
  331. // Emit the joined event
  332. self.emit('joined', 'mongos', _this);
  333. } else {
  334. // Print warning if we did not find a mongos proxy
  335. if (self.s.logger.isWarn()) {
  336. var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
  337. // We have a standalone server
  338. if (!self.ismaster.hosts) {
  339. message = 'expected mongos proxy, but found standalone mongod for server %s';
  340. }
  341. self.s.logger.warn(f(message, _this.name));
  342. }
  343. // This is not a mongos proxy, remove it completely
  344. removeProxyFrom(self.connectingProxies, _this);
  345. // Emit the left event
  346. self.emit('left', 'server', _this);
  347. // Emit failed event
  348. self.emit('failed', _this);
  349. }
  350. } else {
  351. moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
  352. // Emit the left event
  353. self.emit('left', 'mongos', this);
  354. // Emit failed event
  355. self.emit('failed', this);
  356. }
  357. // Emit the initial topology
  358. emitTopologyDescriptionChanged(self);
  359. // Trigger topologyMonitor
  360. if (self.connectingProxies.length === 0) {
  361. // Emit connected if we are connected
  362. if (self.connectedProxies.length > 0 && self.state === CONNECTING) {
  363. // Set the state to connected
  364. stateTransition(self, CONNECTED);
  365. // Emit the connect event
  366. self.emit('connect', self);
  367. self.emit('fullsetup', self);
  368. self.emit('all', self);
  369. } else if (self.disconnectedProxies.length === 0) {
  370. // Print warning if we did not find a mongos proxy
  371. if (self.s.logger.isWarn()) {
  372. self.s.logger.warn(
  373. f('no mongos proxies found in seed list, did you mean to connect to a replicaset')
  374. );
  375. }
  376. // Emit the error that no proxies were found
  377. return self.emit('error', new MongoError('no mongos proxies found in seed list'));
  378. }
  379. // Topology monitor
  380. topologyMonitor(self, { firstConnect: true });
  381. }
  382. };
  383. }
  384. function connectProxies(self, servers) {
  385. // Update connectingProxies
  386. self.connectingProxies = self.connectingProxies.concat(servers);
  387. // Index used to interleaf the server connects, avoiding
  388. // runtime issues on io constrained vm's
  389. var timeoutInterval = 0;
  390. function connect(server, timeoutInterval) {
  391. setTimeout(function() {
  392. // Emit opening server event
  393. self.emit('serverOpening', {
  394. topologyId: self.id,
  395. address: server.name
  396. });
  397. // Emit the initial topology
  398. emitTopologyDescriptionChanged(self);
  399. // Add event handlers
  400. server.once('close', handleInitialConnectEvent(self, 'close'));
  401. server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
  402. server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
  403. server.once('error', handleInitialConnectEvent(self, 'error'));
  404. server.once('connect', handleInitialConnectEvent(self, 'connect'));
  405. // Command Monitoring events
  406. relayEvents(server, self, ['commandStarted', 'commandSucceeded', 'commandFailed']);
  407. // Start connection
  408. server.connect(self.s.connectOptions);
  409. }, timeoutInterval);
  410. }
  411. // Start all the servers
  412. while (servers.length > 0) {
  413. connect(servers.shift(), timeoutInterval++);
  414. }
  415. }
  416. function pickProxy(self, session) {
  417. // TODO: Destructure :)
  418. const transaction = session && session.transaction;
  419. if (transaction && transaction.server) {
  420. if (transaction.server.isConnected()) {
  421. return transaction.server;
  422. } else {
  423. transaction.unpinServer();
  424. }
  425. }
  426. // Get the currently connected Proxies
  427. var connectedProxies = self.connectedProxies.slice(0);
  428. // Set lower bound
  429. var lowerBoundLatency = Number.MAX_VALUE;
  430. // Determine the lower bound for the Proxies
  431. for (var i = 0; i < connectedProxies.length; i++) {
  432. if (connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
  433. lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
  434. }
  435. }
  436. // Filter out the possible servers
  437. connectedProxies = connectedProxies.filter(function(server) {
  438. if (
  439. server.lastIsMasterMS <= lowerBoundLatency + self.s.localThresholdMS &&
  440. server.isConnected()
  441. ) {
  442. return true;
  443. }
  444. });
  445. let proxy;
  446. // We have no connectedProxies pick first of the connected ones
  447. if (connectedProxies.length === 0) {
  448. proxy = self.connectedProxies[0];
  449. } else {
  450. // Get proxy
  451. proxy = connectedProxies[self.index % connectedProxies.length];
  452. // Update the index
  453. self.index = (self.index + 1) % connectedProxies.length;
  454. }
  455. if (transaction && transaction.isActive && proxy && proxy.isConnected()) {
  456. transaction.pinServer(proxy);
  457. }
  458. // Return the proxy
  459. return proxy;
  460. }
  461. function moveServerFrom(from, to, proxy) {
  462. for (var i = 0; i < from.length; i++) {
  463. if (from[i].name === proxy.name) {
  464. from.splice(i, 1);
  465. }
  466. }
  467. for (i = 0; i < to.length; i++) {
  468. if (to[i].name === proxy.name) {
  469. to.splice(i, 1);
  470. }
  471. }
  472. to.push(proxy);
  473. }
  474. function removeProxyFrom(from, proxy) {
  475. for (var i = 0; i < from.length; i++) {
  476. if (from[i].name === proxy.name) {
  477. from.splice(i, 1);
  478. }
  479. }
  480. }
  481. function reconnectProxies(self, proxies, callback) {
  482. // Count lefts
  483. var count = proxies.length;
  484. // Handle events
  485. var _handleEvent = function(self, event) {
  486. return function() {
  487. var _self = this;
  488. count = count - 1;
  489. // Destroyed
  490. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  491. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  492. return this.destroy();
  493. }
  494. if (event === 'connect') {
  495. // Destroyed
  496. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  497. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  498. return _self.destroy();
  499. }
  500. // Remove the handlers
  501. for (var i = 0; i < handlers.length; i++) {
  502. _self.removeAllListeners(handlers[i]);
  503. }
  504. // Add stable state handlers
  505. _self.on('error', handleEvent(self, 'error'));
  506. _self.on('close', handleEvent(self, 'close'));
  507. _self.on('timeout', handleEvent(self, 'timeout'));
  508. _self.on('parseError', handleEvent(self, 'parseError'));
  509. // Move to the connected servers
  510. moveServerFrom(self.connectingProxies, self.connectedProxies, _self);
  511. // Emit topology Change
  512. emitTopologyDescriptionChanged(self);
  513. // Emit joined event
  514. self.emit('joined', 'mongos', _self);
  515. } else {
  516. // Move from connectingProxies
  517. moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
  518. this.destroy();
  519. }
  520. // Are we done finish up callback
  521. if (count === 0) {
  522. callback();
  523. }
  524. };
  525. };
  526. // No new servers
  527. if (count === 0) {
  528. return callback();
  529. }
  530. // Execute method
  531. function execute(_server, i) {
  532. setTimeout(function() {
  533. // Destroyed
  534. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  535. return;
  536. }
  537. // Create a new server instance
  538. var server = new Server(
  539. Object.assign({}, self.s.options, {
  540. host: _server.name.split(':')[0],
  541. port: parseInt(_server.name.split(':')[1], 10),
  542. reconnect: false,
  543. monitoring: false,
  544. parent: self,
  545. clientInfo: clone(self.s.clientInfo)
  546. })
  547. );
  548. destroyServer(_server);
  549. removeProxyFrom(self.disconnectedProxies, _server);
  550. // Relay the server description change
  551. relayEvents(server, self, ['serverDescriptionChanged']);
  552. // Emit opening server event
  553. self.emit('serverOpening', {
  554. topologyId: server.s.topologyId !== -1 ? server.s.topologyId : self.id,
  555. address: server.name
  556. });
  557. // Add temp handlers
  558. server.once('connect', _handleEvent(self, 'connect'));
  559. server.once('close', _handleEvent(self, 'close'));
  560. server.once('timeout', _handleEvent(self, 'timeout'));
  561. server.once('error', _handleEvent(self, 'error'));
  562. server.once('parseError', _handleEvent(self, 'parseError'));
  563. // Command Monitoring events
  564. relayEvents(server, self, ['commandStarted', 'commandSucceeded', 'commandFailed']);
  565. // Connect to proxy
  566. self.connectingProxies.push(server);
  567. server.connect(self.s.connectOptions);
  568. }, i);
  569. }
  570. // Create new instances
  571. for (var i = 0; i < proxies.length; i++) {
  572. execute(proxies[i], i);
  573. }
  574. }
  575. function topologyMonitor(self, options) {
  576. options = options || {};
  577. // Set momitoring timeout
  578. self.haTimeoutId = setTimeout(function() {
  579. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  580. // If we have a primary and a disconnect handler, execute
  581. // buffered operations
  582. if (self.isConnected() && self.s.disconnectHandler) {
  583. self.s.disconnectHandler.execute();
  584. }
  585. // Get the connectingServers
  586. var proxies = self.connectedProxies.slice(0);
  587. // Get the count
  588. var count = proxies.length;
  589. // If the count is zero schedule a new fast
  590. function pingServer(_self, _server, cb) {
  591. // Measure running time
  592. var start = new Date().getTime();
  593. // Emit the server heartbeat start
  594. emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
  595. // Execute ismaster
  596. _server.command(
  597. 'admin.$cmd',
  598. {
  599. ismaster: true
  600. },
  601. {
  602. monitoring: true,
  603. socketTimeout: self.s.options.connectionTimeout || 2000
  604. },
  605. function(err, r) {
  606. if (self.state === DESTROYED || self.state === UNREFERENCED) {
  607. // Move from connectingProxies
  608. moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
  609. _server.destroy();
  610. return cb(err, r);
  611. }
  612. // Calculate latency
  613. var latencyMS = new Date().getTime() - start;
  614. // We had an error, remove it from the state
  615. if (err) {
  616. // Emit the server heartbeat failure
  617. emitSDAMEvent(self, 'serverHeartbeatFailed', {
  618. durationMS: latencyMS,
  619. failure: err,
  620. connectionId: _server.name
  621. });
  622. // Move from connected proxies to disconnected proxies
  623. moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
  624. } else {
  625. // Update the server ismaster
  626. _server.ismaster = r.result;
  627. _server.lastIsMasterMS = latencyMS;
  628. // Server heart beat event
  629. emitSDAMEvent(self, 'serverHeartbeatSucceeded', {
  630. durationMS: latencyMS,
  631. reply: r.result,
  632. connectionId: _server.name
  633. });
  634. }
  635. cb(err, r);
  636. }
  637. );
  638. }
  639. // No proxies initiate monitor again
  640. if (proxies.length === 0) {
  641. // Emit close event if any listeners registered
  642. if (self.listeners('close').length > 0 && self.state === CONNECTING) {
  643. self.emit('error', new MongoError('no mongos proxy available'));
  644. } else {
  645. self.emit('close', self);
  646. }
  647. // Attempt to connect to any unknown servers
  648. return reconnectProxies(self, self.disconnectedProxies, function() {
  649. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  650. // Are we connected ? emit connect event
  651. if (self.state === CONNECTING && options.firstConnect) {
  652. self.emit('connect', self);
  653. self.emit('fullsetup', self);
  654. self.emit('all', self);
  655. } else if (self.isConnected()) {
  656. self.emit('reconnect', self);
  657. } else if (!self.isConnected() && self.listeners('close').length > 0) {
  658. self.emit('close', self);
  659. }
  660. // Perform topology monitor
  661. topologyMonitor(self);
  662. });
  663. }
  664. // Ping all servers
  665. for (var i = 0; i < proxies.length; i++) {
  666. pingServer(self, proxies[i], function() {
  667. count = count - 1;
  668. if (count === 0) {
  669. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  670. // Attempt to connect to any unknown servers
  671. reconnectProxies(self, self.disconnectedProxies, function() {
  672. if (self.state === DESTROYED || self.state === UNREFERENCED) return;
  673. // Perform topology monitor
  674. topologyMonitor(self);
  675. });
  676. }
  677. });
  678. }
  679. }, self.s.haInterval);
  680. }
  681. /**
  682. * Returns the last known ismaster document for this server
  683. * @method
  684. * @return {object}
  685. */
  686. Mongos.prototype.lastIsMaster = function() {
  687. return this.ismaster;
  688. };
  689. /**
  690. * Unref all connections belong to this server
  691. * @method
  692. */
  693. Mongos.prototype.unref = function() {
  694. // Transition state
  695. stateTransition(this, UNREFERENCED);
  696. // Get all proxies
  697. var proxies = this.connectedProxies.concat(this.connectingProxies);
  698. proxies.forEach(function(x) {
  699. x.unref();
  700. });
  701. clearTimeout(this.haTimeoutId);
  702. };
  703. /**
  704. * Destroy the server connection
  705. * @param {boolean} [options.force=false] Force destroy the pool
  706. * @method
  707. */
  708. Mongos.prototype.destroy = function(options, callback) {
  709. if (this.haTimeoutId) {
  710. clearTimeout(this.haTimeoutId);
  711. }
  712. const proxies = this.connectedProxies.concat(this.connectingProxies);
  713. let serverCount = proxies.length;
  714. const serverDestroyed = () => {
  715. serverCount--;
  716. if (serverCount > 0) {
  717. return;
  718. }
  719. emitTopologyDescriptionChanged(this);
  720. emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
  721. stateTransition(this, DESTROYED);
  722. if (typeof callback === 'function') {
  723. callback(null, null);
  724. }
  725. };
  726. if (serverCount === 0) {
  727. serverDestroyed();
  728. return;
  729. }
  730. // Destroy all connecting servers
  731. proxies.forEach(server => {
  732. // Emit the sdam event
  733. this.emit('serverClosed', {
  734. topologyId: this.id,
  735. address: server.name
  736. });
  737. destroyServer(server, options, serverDestroyed);
  738. moveServerFrom(this.connectedProxies, this.disconnectedProxies, server);
  739. });
  740. };
  741. /**
  742. * Figure out if the server is connected
  743. * @method
  744. * @return {boolean}
  745. */
  746. Mongos.prototype.isConnected = function() {
  747. return this.connectedProxies.length > 0;
  748. };
  749. /**
  750. * Figure out if the server instance was destroyed by calling destroy
  751. * @method
  752. * @return {boolean}
  753. */
  754. Mongos.prototype.isDestroyed = function() {
  755. return this.state === DESTROYED;
  756. };
  757. //
  758. // Operations
  759. //
  760. function executeWriteOperation(args, options, callback) {
  761. if (typeof options === 'function') (callback = options), (options = {});
  762. options = options || {};
  763. // TODO: once we drop Node 4, use destructuring either here or in arguments.
  764. const self = args.self;
  765. const op = args.op;
  766. const ns = args.ns;
  767. const ops = args.ops;
  768. // Pick a server
  769. let server = pickProxy(self, options.session);
  770. // No server found error out
  771. if (!server) return callback(new MongoError('no mongos proxy available'));
  772. const willRetryWrite =
  773. !args.retrying &&
  774. !!options.retryWrites &&
  775. options.session &&
  776. isRetryableWritesSupported(self) &&
  777. !options.session.inTransaction();
  778. const handler = (err, result) => {
  779. if (!err) return callback(null, result);
  780. if (!isRetryableError(err) || !willRetryWrite) {
  781. return callback(err);
  782. }
  783. // Pick another server
  784. server = pickProxy(self, options.session);
  785. // No server found error out with original error
  786. if (!server) {
  787. return callback(err);
  788. }
  789. const newArgs = Object.assign({}, args, { retrying: true });
  790. return executeWriteOperation(newArgs, options, callback);
  791. };
  792. if (callback.operationId) {
  793. handler.operationId = callback.operationId;
  794. }
  795. // increment and assign txnNumber
  796. if (willRetryWrite) {
  797. options.session.incrementTransactionNumber();
  798. options.willRetryWrite = willRetryWrite;
  799. }
  800. // rerun the operation
  801. server[op](ns, ops, options, handler);
  802. }
  803. /**
  804. * Insert one or more documents
  805. * @method
  806. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  807. * @param {array} ops An array of documents to insert
  808. * @param {boolean} [options.ordered=true] Execute in order or out of order
  809. * @param {object} [options.writeConcern={}] Write concern for the operation
  810. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  811. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  812. * @param {ClientSession} [options.session=null] Session to use for the operation
  813. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  814. * @param {opResultCallback} callback A callback function
  815. */
  816. Mongos.prototype.insert = function(ns, ops, options, callback) {
  817. if (typeof options === 'function') {
  818. (callback = options), (options = {}), (options = options || {});
  819. }
  820. if (this.state === DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  821. // Not connected but we have a disconnecthandler
  822. if (!this.isConnected() && this.s.disconnectHandler != null) {
  823. return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
  824. }
  825. // No mongos proxy available
  826. if (!this.isConnected()) {
  827. return callback(new MongoError('no mongos proxy available'));
  828. }
  829. // Execute write operation
  830. executeWriteOperation({ self: this, op: 'insert', ns, ops }, options, callback);
  831. };
  832. /**
  833. * Perform one or more update operations
  834. * @method
  835. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  836. * @param {array} ops An array of updates
  837. * @param {boolean} [options.ordered=true] Execute in order or out of order
  838. * @param {object} [options.writeConcern={}] Write concern for the operation
  839. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  840. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  841. * @param {ClientSession} [options.session=null] Session to use for the operation
  842. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  843. * @param {opResultCallback} callback A callback function
  844. */
  845. Mongos.prototype.update = function(ns, ops, options, callback) {
  846. if (typeof options === 'function') {
  847. (callback = options), (options = {}), (options = options || {});
  848. }
  849. if (this.state === DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  850. // Not connected but we have a disconnecthandler
  851. if (!this.isConnected() && this.s.disconnectHandler != null) {
  852. return this.s.disconnectHandler.add('update', ns, ops, options, callback);
  853. }
  854. // No mongos proxy available
  855. if (!this.isConnected()) {
  856. return callback(new MongoError('no mongos proxy available'));
  857. }
  858. // Execute write operation
  859. executeWriteOperation({ self: this, op: 'update', ns, ops }, options, callback);
  860. };
  861. /**
  862. * Perform one or more remove operations
  863. * @method
  864. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  865. * @param {array} ops An array of removes
  866. * @param {boolean} [options.ordered=true] Execute in order or out of order
  867. * @param {object} [options.writeConcern={}] Write concern for the operation
  868. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  869. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  870. * @param {ClientSession} [options.session=null] Session to use for the operation
  871. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  872. * @param {opResultCallback} callback A callback function
  873. */
  874. Mongos.prototype.remove = function(ns, ops, options, callback) {
  875. if (typeof options === 'function') {
  876. (callback = options), (options = {}), (options = options || {});
  877. }
  878. if (this.state === DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  879. // Not connected but we have a disconnecthandler
  880. if (!this.isConnected() && this.s.disconnectHandler != null) {
  881. return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
  882. }
  883. // No mongos proxy available
  884. if (!this.isConnected()) {
  885. return callback(new MongoError('no mongos proxy available'));
  886. }
  887. // Execute write operation
  888. executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
  889. };
  890. const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
  891. function isWriteCommand(command) {
  892. return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
  893. }
  894. /**
  895. * Execute a command
  896. * @method
  897. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  898. * @param {object} cmd The command hash
  899. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  900. * @param {Connection} [options.connection] Specify connection object to execute command against
  901. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  902. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  903. * @param {ClientSession} [options.session=null] Session to use for the operation
  904. * @param {opResultCallback} callback A callback function
  905. */
  906. Mongos.prototype.command = function(ns, cmd, options, callback) {
  907. if (typeof options === 'function') {
  908. (callback = options), (options = {}), (options = options || {});
  909. }
  910. if (this.state === DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  911. var self = this;
  912. // Pick a proxy
  913. var server = pickProxy(self, options.session);
  914. // Topology is not connected, save the call in the provided store to be
  915. // Executed at some point when the handler deems it's reconnected
  916. if ((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
  917. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  918. }
  919. // No server returned we had an error
  920. if (server == null) {
  921. return callback(new MongoError('no mongos proxy available'));
  922. }
  923. // Cloned options
  924. var clonedOptions = cloneOptions(options);
  925. clonedOptions.topology = self;
  926. const willRetryWrite =
  927. !options.retrying &&
  928. options.retryWrites &&
  929. options.session &&
  930. isRetryableWritesSupported(self) &&
  931. !options.session.inTransaction() &&
  932. isWriteCommand(cmd);
  933. const cb = (err, result) => {
  934. if (!err) return callback(null, result);
  935. if (!isRetryableError(err)) {
  936. return callback(err);
  937. }
  938. if (willRetryWrite) {
  939. const newOptions = Object.assign({}, clonedOptions, { retrying: true });
  940. return this.command(ns, cmd, newOptions, callback);
  941. }
  942. return callback(err);
  943. };
  944. // increment and assign txnNumber
  945. if (willRetryWrite) {
  946. options.session.incrementTransactionNumber();
  947. options.willRetryWrite = willRetryWrite;
  948. }
  949. // Execute the command
  950. server.command(ns, cmd, clonedOptions, cb);
  951. };
  952. /**
  953. * Get a new cursor
  954. * @method
  955. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  956. * @param {object|Long} cmd Can be either a command returning a cursor or a cursorId
  957. * @param {object} [options] Options for the cursor
  958. * @param {object} [options.batchSize=0] Batchsize for the operation
  959. * @param {array} [options.documents=[]] Initial documents list for cursor
  960. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  961. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  962. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  963. * @param {ClientSession} [options.session=null] Session to use for the operation
  964. * @param {object} [options.topology] The internal topology of the created cursor
  965. * @returns {Cursor}
  966. */
  967. Mongos.prototype.cursor = function(ns, cmd, options) {
  968. options = options || {};
  969. const topology = options.topology || this;
  970. // Set up final cursor type
  971. var FinalCursor = options.cursorFactory || this.s.Cursor;
  972. // Return the cursor
  973. return new FinalCursor(this.s.bson, ns, cmd, options, topology, this.s.options);
  974. };
  975. /**
  976. * Selects a server
  977. *
  978. * @method
  979. * @param {function} selector Unused
  980. * @param {ReadPreference} [options.readPreference] Unused
  981. * @param {ClientSession} [options.session] Specify a session if it is being used
  982. * @param {function} callback
  983. */
  984. Mongos.prototype.selectServer = function(selector, options, callback) {
  985. if (typeof selector === 'function' && typeof callback === 'undefined')
  986. (callback = selector), (selector = undefined), (options = {});
  987. if (typeof options === 'function')
  988. (callback = options), (options = selector), (selector = undefined);
  989. options = options || {};
  990. const server = pickProxy(this, options.session);
  991. if (this.s.debug) this.emit('pickedServer', null, server);
  992. callback(null, server);
  993. };
  994. /**
  995. * All raw connections
  996. * @method
  997. * @return {Connection[]}
  998. */
  999. Mongos.prototype.connections = function() {
  1000. var connections = [];
  1001. for (var i = 0; i < this.connectedProxies.length; i++) {
  1002. connections = connections.concat(this.connectedProxies[i].connections());
  1003. }
  1004. return connections;
  1005. };
  1006. function emitTopologyDescriptionChanged(self) {
  1007. if (self.listeners('topologyDescriptionChanged').length > 0) {
  1008. var topology = 'Unknown';
  1009. if (self.connectedProxies.length > 0) {
  1010. topology = 'Sharded';
  1011. }
  1012. // Generate description
  1013. var description = {
  1014. topologyType: topology,
  1015. servers: []
  1016. };
  1017. // All proxies
  1018. var proxies = self.disconnectedProxies.concat(self.connectingProxies);
  1019. // Add all the disconnected proxies
  1020. description.servers = description.servers.concat(
  1021. proxies.map(function(x) {
  1022. var description = x.getDescription();
  1023. description.type = 'Unknown';
  1024. return description;
  1025. })
  1026. );
  1027. // Add all the connected proxies
  1028. description.servers = description.servers.concat(
  1029. self.connectedProxies.map(function(x) {
  1030. var description = x.getDescription();
  1031. description.type = 'Mongos';
  1032. return description;
  1033. })
  1034. );
  1035. // Get the diff
  1036. var diffResult = diff(self.topologyDescription, description);
  1037. // Create the result
  1038. var result = {
  1039. topologyId: self.id,
  1040. previousDescription: self.topologyDescription,
  1041. newDescription: description,
  1042. diff: diffResult
  1043. };
  1044. // Emit the topologyDescription change
  1045. if (diffResult.servers.length > 0) {
  1046. self.emit('topologyDescriptionChanged', result);
  1047. }
  1048. // Set the new description
  1049. self.topologyDescription = description;
  1050. }
  1051. }
  1052. /**
  1053. * A mongos connect event, used to verify that the connection is up and running
  1054. *
  1055. * @event Mongos#connect
  1056. * @type {Mongos}
  1057. */
  1058. /**
  1059. * A mongos reconnect event, used to verify that the mongos topology has reconnected
  1060. *
  1061. * @event Mongos#reconnect
  1062. * @type {Mongos}
  1063. */
  1064. /**
  1065. * A mongos fullsetup event, used to signal that all topology members have been contacted.
  1066. *
  1067. * @event Mongos#fullsetup
  1068. * @type {Mongos}
  1069. */
  1070. /**
  1071. * A mongos all event, used to signal that all topology members have been contacted.
  1072. *
  1073. * @event Mongos#all
  1074. * @type {Mongos}
  1075. */
  1076. /**
  1077. * A server member left the mongos list
  1078. *
  1079. * @event Mongos#left
  1080. * @type {Mongos}
  1081. * @param {string} type The type of member that left (mongos)
  1082. * @param {Server} server The server object that left
  1083. */
  1084. /**
  1085. * A server member joined the mongos list
  1086. *
  1087. * @event Mongos#joined
  1088. * @type {Mongos}
  1089. * @param {string} type The type of member that left (mongos)
  1090. * @param {Server} server The server object that joined
  1091. */
  1092. /**
  1093. * A server opening SDAM monitoring event
  1094. *
  1095. * @event Mongos#serverOpening
  1096. * @type {object}
  1097. */
  1098. /**
  1099. * A server closed SDAM monitoring event
  1100. *
  1101. * @event Mongos#serverClosed
  1102. * @type {object}
  1103. */
  1104. /**
  1105. * A server description SDAM change monitoring event
  1106. *
  1107. * @event Mongos#serverDescriptionChanged
  1108. * @type {object}
  1109. */
  1110. /**
  1111. * A topology open SDAM event
  1112. *
  1113. * @event Mongos#topologyOpening
  1114. * @type {object}
  1115. */
  1116. /**
  1117. * A topology closed SDAM event
  1118. *
  1119. * @event Mongos#topologyClosed
  1120. * @type {object}
  1121. */
  1122. /**
  1123. * A topology structure SDAM change event
  1124. *
  1125. * @event Mongos#topologyDescriptionChanged
  1126. * @type {object}
  1127. */
  1128. /**
  1129. * A topology serverHeartbeatStarted SDAM event
  1130. *
  1131. * @event Mongos#serverHeartbeatStarted
  1132. * @type {object}
  1133. */
  1134. /**
  1135. * A topology serverHeartbeatFailed SDAM event
  1136. *
  1137. * @event Mongos#serverHeartbeatFailed
  1138. * @type {object}
  1139. */
  1140. /**
  1141. * A topology serverHeartbeatSucceeded SDAM change event
  1142. *
  1143. * @event Mongos#serverHeartbeatSucceeded
  1144. * @type {object}
  1145. */
  1146. /**
  1147. * An event emitted indicating a command was started, if command monitoring is enabled
  1148. *
  1149. * @event Mongos#commandStarted
  1150. * @type {object}
  1151. */
  1152. /**
  1153. * An event emitted indicating a command succeeded, if command monitoring is enabled
  1154. *
  1155. * @event Mongos#commandSucceeded
  1156. * @type {object}
  1157. */
  1158. /**
  1159. * An event emitted indicating a command failed, if command monitoring is enabled
  1160. *
  1161. * @event Mongos#commandFailed
  1162. * @type {object}
  1163. */
  1164. module.exports = Mongos;