mongo_client_ops.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. 'use strict';
  2. const deprecate = require('util').deprecate;
  3. const Logger = require('mongodb-core').Logger;
  4. const MongoError = require('mongodb-core').MongoError;
  5. const Mongos = require('../topologies/mongos');
  6. const parse = require('mongodb-core').parseConnectionString;
  7. const ReadPreference = require('mongodb-core').ReadPreference;
  8. const ReplSet = require('../topologies/replset');
  9. const Server = require('../topologies/server');
  10. const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool;
  11. const NativeTopology = require('../topologies/native_topology');
  12. const MongoCredentials = require('mongodb-core').MongoCredentials;
  13. let client;
  14. function loadClient() {
  15. if (!client) {
  16. client = require('../mongo_client');
  17. }
  18. return client;
  19. }
  20. const monitoringEvents = [
  21. 'timeout',
  22. 'close',
  23. 'serverOpening',
  24. 'serverDescriptionChanged',
  25. 'serverHeartbeatStarted',
  26. 'serverHeartbeatSucceeded',
  27. 'serverHeartbeatFailed',
  28. 'serverClosed',
  29. 'topologyOpening',
  30. 'topologyClosed',
  31. 'topologyDescriptionChanged',
  32. 'commandStarted',
  33. 'commandSucceeded',
  34. 'commandFailed',
  35. 'joined',
  36. 'left',
  37. 'ping',
  38. 'ha',
  39. 'all',
  40. 'fullsetup',
  41. 'open'
  42. ];
  43. const ignoreOptionNames = ['native_parser'];
  44. const legacyOptionNames = ['server', 'replset', 'replSet', 'mongos', 'db'];
  45. const legacyParse = deprecate(
  46. require('../url_parser'),
  47. 'current URL string parser is deprecated, and will be removed in a future version. ' +
  48. 'To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.'
  49. );
  50. const validOptionNames = [
  51. 'poolSize',
  52. 'ssl',
  53. 'sslValidate',
  54. 'sslCA',
  55. 'sslCert',
  56. 'sslKey',
  57. 'sslPass',
  58. 'sslCRL',
  59. 'autoReconnect',
  60. 'noDelay',
  61. 'keepAlive',
  62. 'keepAliveInitialDelay',
  63. 'connectTimeoutMS',
  64. 'family',
  65. 'socketTimeoutMS',
  66. 'reconnectTries',
  67. 'reconnectInterval',
  68. 'ha',
  69. 'haInterval',
  70. 'replicaSet',
  71. 'secondaryAcceptableLatencyMS',
  72. 'acceptableLatencyMS',
  73. 'connectWithNoPrimary',
  74. 'authSource',
  75. 'w',
  76. 'wtimeout',
  77. 'j',
  78. 'forceServerObjectId',
  79. 'serializeFunctions',
  80. 'ignoreUndefined',
  81. 'raw',
  82. 'bufferMaxEntries',
  83. 'readPreference',
  84. 'pkFactory',
  85. 'promiseLibrary',
  86. 'readConcern',
  87. 'maxStalenessSeconds',
  88. 'loggerLevel',
  89. 'logger',
  90. 'promoteValues',
  91. 'promoteBuffers',
  92. 'promoteLongs',
  93. 'domainsEnabled',
  94. 'checkServerIdentity',
  95. 'validateOptions',
  96. 'appname',
  97. 'auth',
  98. 'user',
  99. 'password',
  100. 'authMechanism',
  101. 'compression',
  102. 'fsync',
  103. 'readPreferenceTags',
  104. 'numberOfRetries',
  105. 'auto_reconnect',
  106. 'minSize',
  107. 'monitorCommands',
  108. 'retryWrites',
  109. 'useNewUrlParser',
  110. 'useUnifiedTopology',
  111. 'serverSelectionTimeoutMS',
  112. 'useRecoveryToken'
  113. ];
  114. function addListeners(mongoClient, topology) {
  115. topology.on('authenticated', createListener(mongoClient, 'authenticated'));
  116. topology.on('error', createListener(mongoClient, 'error'));
  117. topology.on('timeout', createListener(mongoClient, 'timeout'));
  118. topology.on('close', createListener(mongoClient, 'close'));
  119. topology.on('parseError', createListener(mongoClient, 'parseError'));
  120. topology.once('open', createListener(mongoClient, 'open'));
  121. topology.once('fullsetup', createListener(mongoClient, 'fullsetup'));
  122. topology.once('all', createListener(mongoClient, 'all'));
  123. topology.on('reconnect', createListener(mongoClient, 'reconnect'));
  124. }
  125. function assignTopology(client, topology) {
  126. client.topology = topology;
  127. topology.s.sessionPool =
  128. topology instanceof NativeTopology
  129. ? new ServerSessionPool(topology)
  130. : new ServerSessionPool(topology.s.coreTopology);
  131. }
  132. // Clear out all events
  133. function clearAllEvents(topology) {
  134. monitoringEvents.forEach(event => topology.removeAllListeners(event));
  135. }
  136. // Collect all events in order from SDAM
  137. function collectEvents(mongoClient, topology) {
  138. let MongoClient = loadClient();
  139. const collectedEvents = [];
  140. if (mongoClient instanceof MongoClient) {
  141. monitoringEvents.forEach(event => {
  142. topology.on(event, (object1, object2) => {
  143. if (event === 'open') {
  144. collectedEvents.push({ event: event, object1: mongoClient });
  145. } else {
  146. collectedEvents.push({ event: event, object1: object1, object2: object2 });
  147. }
  148. });
  149. });
  150. }
  151. return collectedEvents;
  152. }
  153. /**
  154. * Connect to MongoDB using a url as documented at
  155. *
  156. * docs.mongodb.org/manual/reference/connection-string/
  157. *
  158. * Note that for replicasets the replicaSet query parameter is required in the 2.0 driver
  159. *
  160. * @method
  161. * @param {MongoClient} mongoClient The MongoClient instance with which to connect.
  162. * @param {string} url The connection URI string
  163. * @param {object} [options] Optional settings. See MongoClient.prototype.connect for a list of options.
  164. * @param {MongoClient~connectCallback} [callback] The command result callback
  165. */
  166. function connect(mongoClient, url, options, callback) {
  167. options = Object.assign({}, options);
  168. // If callback is null throw an exception
  169. if (callback == null) {
  170. throw new Error('no callback function provided');
  171. }
  172. let didRequestAuthentication = false;
  173. const logger = Logger('MongoClient', options);
  174. // Did we pass in a Server/ReplSet/Mongos
  175. if (url instanceof Server || url instanceof ReplSet || url instanceof Mongos) {
  176. return connectWithUrl(mongoClient, url, options, connectCallback);
  177. }
  178. const parseFn = options.useNewUrlParser ? parse : legacyParse;
  179. const transform = options.useNewUrlParser ? transformUrlOptions : legacyTransformUrlOptions;
  180. parseFn(url, options, (err, _object) => {
  181. // Do not attempt to connect if parsing error
  182. if (err) return callback(err);
  183. // Flatten
  184. const object = transform(_object);
  185. // Parse the string
  186. const _finalOptions = createUnifiedOptions(object, options);
  187. // Check if we have connection and socket timeout set
  188. if (_finalOptions.socketTimeoutMS == null) _finalOptions.socketTimeoutMS = 360000;
  189. if (_finalOptions.connectTimeoutMS == null) _finalOptions.connectTimeoutMS = 30000;
  190. if (_finalOptions.db_options && _finalOptions.db_options.auth) {
  191. delete _finalOptions.db_options.auth;
  192. }
  193. // Store the merged options object
  194. mongoClient.s.options = _finalOptions;
  195. // Failure modes
  196. if (object.servers.length === 0) {
  197. return callback(new Error('connection string must contain at least one seed host'));
  198. }
  199. if (_finalOptions.auth && !_finalOptions.credentials) {
  200. try {
  201. didRequestAuthentication = true;
  202. _finalOptions.credentials = generateCredentials(
  203. mongoClient,
  204. _finalOptions.auth.user,
  205. _finalOptions.auth.password,
  206. _finalOptions
  207. );
  208. } catch (err) {
  209. return callback(err);
  210. }
  211. }
  212. if (_finalOptions.useUnifiedTopology) {
  213. return createTopology(mongoClient, 'unified', _finalOptions, connectCallback);
  214. }
  215. // Do we have a replicaset then skip discovery and go straight to connectivity
  216. if (_finalOptions.replicaSet || _finalOptions.rs_name) {
  217. return createTopology(mongoClient, 'replicaset', _finalOptions, connectCallback);
  218. } else if (object.servers.length > 1) {
  219. return createTopology(mongoClient, 'mongos', _finalOptions, connectCallback);
  220. } else {
  221. return createServer(mongoClient, _finalOptions, connectCallback);
  222. }
  223. });
  224. function connectCallback(err, topology) {
  225. const warningMessage = `seed list contains no mongos proxies, replicaset connections requires the parameter replicaSet to be supplied in the URI or options object, mongodb://server:port/db?replicaSet=name`;
  226. if (err && err.message === 'no mongos proxies found in seed list') {
  227. if (logger.isWarn()) {
  228. logger.warn(warningMessage);
  229. }
  230. // Return a more specific error message for MongoClient.connect
  231. return callback(new MongoError(warningMessage));
  232. }
  233. if (didRequestAuthentication) {
  234. mongoClient.emit('authenticated', null, true);
  235. }
  236. // Return the error and db instance
  237. callback(err, topology);
  238. }
  239. }
  240. /**
  241. * Connect to MongoDB using a url as documented at
  242. *
  243. * docs.mongodb.org/manual/reference/connection-string/
  244. *
  245. * Note that for replicasets the replicaSet query parameter is required in the 2.0 driver
  246. *
  247. * @method
  248. * @param {MongoClient} mongoClient The MongoClient instance with which to connect.
  249. * @param {MongoClient~connectCallback} [callback] The command result callback
  250. */
  251. function connectOp(mongoClient, err, callback) {
  252. // Did we have a validation error
  253. if (err) return callback(err);
  254. // Fallback to callback based connect
  255. connect(mongoClient, mongoClient.s.url, mongoClient.s.options, err => {
  256. if (err) return callback(err);
  257. callback(null, mongoClient);
  258. });
  259. }
  260. function connectWithUrl(mongoClient, url, options, connectCallback) {
  261. // Set the topology
  262. assignTopology(mongoClient, url);
  263. // Add listeners
  264. addListeners(mongoClient, url);
  265. // Propagate the events to the client
  266. relayEvents(mongoClient, url);
  267. let finalOptions = Object.assign({}, options);
  268. // If we have a readPreference passed in by the db options, convert it from a string
  269. if (typeof options.readPreference === 'string' || typeof options.read_preference === 'string') {
  270. finalOptions.readPreference = new ReadPreference(
  271. options.readPreference || options.read_preference
  272. );
  273. }
  274. const isDoingAuth = finalOptions.user || finalOptions.password || finalOptions.authMechanism;
  275. if (isDoingAuth && !finalOptions.credentials) {
  276. try {
  277. finalOptions.credentials = generateCredentials(
  278. mongoClient,
  279. finalOptions.user,
  280. finalOptions.password,
  281. finalOptions
  282. );
  283. } catch (err) {
  284. return connectCallback(err, url);
  285. }
  286. }
  287. return url.connect(finalOptions, connectCallback);
  288. }
  289. function createListener(mongoClient, event) {
  290. const eventSet = new Set(['all', 'fullsetup', 'open', 'reconnect']);
  291. return (v1, v2) => {
  292. if (eventSet.has(event)) {
  293. return mongoClient.emit(event, mongoClient);
  294. }
  295. mongoClient.emit(event, v1, v2);
  296. };
  297. }
  298. function createServer(mongoClient, options, callback) {
  299. // Pass in the promise library
  300. options.promiseLibrary = mongoClient.s.promiseLibrary;
  301. // Set default options
  302. const servers = translateOptions(options);
  303. const server = servers[0];
  304. // Propagate the events to the client
  305. const collectedEvents = collectEvents(mongoClient, server);
  306. // Connect to topology
  307. server.connect(options, (err, topology) => {
  308. if (err) {
  309. server.close(true);
  310. return callback(err);
  311. }
  312. // Clear out all the collected event listeners
  313. clearAllEvents(server);
  314. // Relay all the events
  315. relayEvents(mongoClient, server);
  316. // Add listeners
  317. addListeners(mongoClient, server);
  318. // Check if we are really speaking to a mongos
  319. const ismaster = topology.lastIsMaster();
  320. // Set the topology
  321. assignTopology(mongoClient, topology);
  322. // Do we actually have a mongos
  323. if (ismaster && ismaster.msg === 'isdbgrid') {
  324. // Destroy the current connection
  325. topology.close();
  326. // Create mongos connection instead
  327. return createTopology(mongoClient, 'mongos', options, callback);
  328. }
  329. // Fire all the events
  330. replayEvents(mongoClient, collectedEvents);
  331. // Otherwise callback
  332. callback(err, topology);
  333. });
  334. }
  335. function createTopology(mongoClient, topologyType, options, callback) {
  336. // Pass in the promise library
  337. options.promiseLibrary = mongoClient.s.promiseLibrary;
  338. const translationOptions = {};
  339. if (topologyType === 'unified') translationOptions.createServers = false;
  340. // Set default options
  341. const servers = translateOptions(options, translationOptions);
  342. // Create the topology
  343. let topology;
  344. if (topologyType === 'mongos') {
  345. topology = new Mongos(servers, options);
  346. } else if (topologyType === 'replicaset') {
  347. topology = new ReplSet(servers, options);
  348. } else if (topologyType === 'unified') {
  349. topology = new NativeTopology(options.servers, options);
  350. }
  351. // Add listeners
  352. addListeners(mongoClient, topology);
  353. // Propagate the events to the client
  354. relayEvents(mongoClient, topology);
  355. // Open the connection
  356. topology.connect(options, (err, newTopology) => {
  357. if (err) {
  358. topology.close(true);
  359. return callback(err);
  360. }
  361. assignTopology(mongoClient, newTopology);
  362. callback(null, newTopology);
  363. });
  364. }
  365. function createUnifiedOptions(finalOptions, options) {
  366. const childOptions = [
  367. 'mongos',
  368. 'server',
  369. 'db',
  370. 'replset',
  371. 'db_options',
  372. 'server_options',
  373. 'rs_options',
  374. 'mongos_options'
  375. ];
  376. const noMerge = ['readconcern', 'compression'];
  377. for (const name in options) {
  378. if (noMerge.indexOf(name.toLowerCase()) !== -1) {
  379. finalOptions[name] = options[name];
  380. } else if (childOptions.indexOf(name.toLowerCase()) !== -1) {
  381. finalOptions = mergeOptions(finalOptions, options[name], false);
  382. } else {
  383. if (
  384. options[name] &&
  385. typeof options[name] === 'object' &&
  386. !Buffer.isBuffer(options[name]) &&
  387. !Array.isArray(options[name])
  388. ) {
  389. finalOptions = mergeOptions(finalOptions, options[name], true);
  390. } else {
  391. finalOptions[name] = options[name];
  392. }
  393. }
  394. }
  395. return finalOptions;
  396. }
  397. function legacyTransformUrlOptions(object) {
  398. return mergeOptions(createUnifiedOptions({}, object), object, false);
  399. }
  400. function mergeOptions(target, source, flatten) {
  401. for (const name in source) {
  402. if (source[name] && typeof source[name] === 'object' && flatten) {
  403. target = mergeOptions(target, source[name], flatten);
  404. } else {
  405. target[name] = source[name];
  406. }
  407. }
  408. return target;
  409. }
  410. function relayEvents(mongoClient, topology) {
  411. const serverOrCommandEvents = [
  412. 'serverOpening',
  413. 'serverDescriptionChanged',
  414. 'serverHeartbeatStarted',
  415. 'serverHeartbeatSucceeded',
  416. 'serverHeartbeatFailed',
  417. 'serverClosed',
  418. 'topologyOpening',
  419. 'topologyClosed',
  420. 'topologyDescriptionChanged',
  421. 'commandStarted',
  422. 'commandSucceeded',
  423. 'commandFailed',
  424. 'joined',
  425. 'left',
  426. 'ping',
  427. 'ha'
  428. ];
  429. serverOrCommandEvents.forEach(event => {
  430. topology.on(event, (object1, object2) => {
  431. mongoClient.emit(event, object1, object2);
  432. });
  433. });
  434. }
  435. //
  436. // Replay any events due to single server connection switching to Mongos
  437. //
  438. function replayEvents(mongoClient, events) {
  439. for (let i = 0; i < events.length; i++) {
  440. mongoClient.emit(events[i].event, events[i].object1, events[i].object2);
  441. }
  442. }
  443. const LEGACY_OPTIONS_MAP = validOptionNames.reduce((obj, name) => {
  444. obj[name.toLowerCase()] = name;
  445. return obj;
  446. }, {});
  447. function transformUrlOptions(_object) {
  448. let object = Object.assign({ servers: _object.hosts }, _object.options);
  449. for (let name in object) {
  450. const camelCaseName = LEGACY_OPTIONS_MAP[name];
  451. if (camelCaseName) {
  452. object[camelCaseName] = object[name];
  453. }
  454. }
  455. const hasUsername = _object.auth && _object.auth.username;
  456. const hasAuthMechanism = _object.options && _object.options.authMechanism;
  457. if (hasUsername || hasAuthMechanism) {
  458. object.auth = Object.assign({}, _object.auth);
  459. if (object.auth.db) {
  460. object.authSource = object.authSource || object.auth.db;
  461. }
  462. if (object.auth.username) {
  463. object.auth.user = object.auth.username;
  464. }
  465. }
  466. if (_object.defaultDatabase) {
  467. object.dbName = _object.defaultDatabase;
  468. }
  469. if (object.maxpoolsize) {
  470. object.poolSize = object.maxpoolsize;
  471. }
  472. if (object.readconcernlevel) {
  473. object.readConcern = { level: object.readconcernlevel };
  474. }
  475. if (object.wtimeoutms) {
  476. object.wtimeout = object.wtimeoutms;
  477. }
  478. return object;
  479. }
  480. function translateOptions(options, translationOptions) {
  481. translationOptions = Object.assign({}, { createServers: true }, translationOptions);
  482. // If we have a readPreference passed in by the db options
  483. if (typeof options.readPreference === 'string' || typeof options.read_preference === 'string') {
  484. options.readPreference = new ReadPreference(options.readPreference || options.read_preference);
  485. }
  486. // Do we have readPreference tags, add them
  487. if (options.readPreference && (options.readPreferenceTags || options.read_preference_tags)) {
  488. options.readPreference.tags = options.readPreferenceTags || options.read_preference_tags;
  489. }
  490. // Do we have maxStalenessSeconds
  491. if (options.maxStalenessSeconds) {
  492. options.readPreference.maxStalenessSeconds = options.maxStalenessSeconds;
  493. }
  494. // Set the socket and connection timeouts
  495. if (options.socketTimeoutMS == null) options.socketTimeoutMS = 360000;
  496. if (options.connectTimeoutMS == null) options.connectTimeoutMS = 30000;
  497. if (!translationOptions.createServers) {
  498. return;
  499. }
  500. // Create server instances
  501. return options.servers.map(serverObj => {
  502. return serverObj.domain_socket
  503. ? new Server(serverObj.domain_socket, 27017, options)
  504. : new Server(serverObj.host, serverObj.port, options);
  505. });
  506. }
  507. // Validate options object
  508. function validOptions(options) {
  509. const _validOptions = validOptionNames.concat(legacyOptionNames);
  510. for (const name in options) {
  511. if (ignoreOptionNames.indexOf(name) !== -1) {
  512. continue;
  513. }
  514. if (_validOptions.indexOf(name) === -1) {
  515. if (options.validateOptions) {
  516. return new MongoError(`option ${name} is not supported`);
  517. } else {
  518. console.warn(`the options [${name}] is not supported`);
  519. }
  520. }
  521. if (legacyOptionNames.indexOf(name) !== -1) {
  522. console.warn(
  523. `the server/replset/mongos/db options are deprecated, ` +
  524. `all their options are supported at the top level of the options object [${validOptionNames}]`
  525. );
  526. }
  527. }
  528. }
  529. const VALID_AUTH_MECHANISMS = new Set([
  530. 'DEFAULT',
  531. 'MONGODB-CR',
  532. 'PLAIN',
  533. 'MONGODB-X509',
  534. 'SCRAM-SHA-1',
  535. 'SCRAM-SHA-256',
  536. 'GSSAPI'
  537. ]);
  538. const AUTH_MECHANISM_INTERNAL_MAP = {
  539. DEFAULT: 'default',
  540. 'MONGODB-CR': 'mongocr',
  541. PLAIN: 'plain',
  542. 'MONGODB-X509': 'x509',
  543. 'SCRAM-SHA-1': 'scram-sha-1',
  544. 'SCRAM-SHA-256': 'scram-sha-256'
  545. };
  546. function generateCredentials(client, username, password, options) {
  547. options = Object.assign({}, options);
  548. // the default db to authenticate against is 'self'
  549. // if authenticate is called from a retry context, it may be another one, like admin
  550. const source = options.authSource || options.authdb || options.dbName;
  551. // authMechanism
  552. const authMechanismRaw = options.authMechanism || 'DEFAULT';
  553. const authMechanism = authMechanismRaw.toUpperCase();
  554. if (!VALID_AUTH_MECHANISMS.has(authMechanism)) {
  555. throw MongoError.create({
  556. message: `authentication mechanism ${authMechanismRaw} not supported', options.authMechanism`,
  557. driver: true
  558. });
  559. }
  560. if (authMechanism === 'GSSAPI') {
  561. return new MongoCredentials({
  562. mechanism: process.platform === 'win32' ? 'sspi' : 'gssapi',
  563. mechanismProperties: options,
  564. source,
  565. username,
  566. password
  567. });
  568. }
  569. return new MongoCredentials({
  570. mechanism: AUTH_MECHANISM_INTERNAL_MAP[authMechanism],
  571. source,
  572. username,
  573. password
  574. });
  575. }
  576. function closeOperation(client, force, callback) {
  577. const completeClose = err => {
  578. client.emit('close', client);
  579. for (const name in client.s.dbCache) {
  580. client.s.dbCache[name].emit('close', client);
  581. }
  582. client.removeAllListeners('close');
  583. callback(err, null);
  584. };
  585. if (client.topology == null) {
  586. completeClose();
  587. return;
  588. }
  589. client.topology.close(force, completeClose);
  590. }
  591. module.exports = { connectOp, validOptions, closeOperation };