server.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const MongoError = require('../error').MongoError;
  4. const Pool = require('../connection/pool');
  5. const relayEvents = require('../utils').relayEvents;
  6. const wireProtocol = require('../wireprotocol');
  7. const BSON = require('../connection/utils').retrieveBSON();
  8. const createClientInfo = require('../topologies/shared').createClientInfo;
  9. const Logger = require('../connection/logger');
  10. const ServerDescription = require('./server_description').ServerDescription;
  11. const ReadPreference = require('../topologies/read_preference');
  12. const monitorServer = require('./monitoring').monitorServer;
  13. const MongoParseError = require('../error').MongoParseError;
  14. const MongoNetworkError = require('../error').MongoNetworkError;
  15. const collationNotSupported = require('../utils').collationNotSupported;
  16. const debugOptions = require('../connection/utils').debugOptions;
  17. // Used for filtering out fields for logging
  18. const DEBUG_FIELDS = [
  19. 'reconnect',
  20. 'reconnectTries',
  21. 'reconnectInterval',
  22. 'emitError',
  23. 'cursorFactory',
  24. 'host',
  25. 'port',
  26. 'size',
  27. 'keepAlive',
  28. 'keepAliveInitialDelay',
  29. 'noDelay',
  30. 'connectionTimeout',
  31. 'checkServerIdentity',
  32. 'socketTimeout',
  33. 'ssl',
  34. 'ca',
  35. 'crl',
  36. 'cert',
  37. 'key',
  38. 'rejectUnauthorized',
  39. 'promoteLongs',
  40. 'promoteValues',
  41. 'promoteBuffers',
  42. 'servername'
  43. ];
  44. const STATE_DISCONNECTED = 0;
  45. const STATE_CONNECTING = 1;
  46. const STATE_CONNECTED = 2;
  47. /**
  48. *
  49. * @fires Server#serverHeartbeatStarted
  50. * @fires Server#serverHeartbeatSucceeded
  51. * @fires Server#serverHeartbeatFailed
  52. */
  53. class Server extends EventEmitter {
  54. /**
  55. * Create a server
  56. *
  57. * @param {ServerDescription} description
  58. * @param {Object} options
  59. */
  60. constructor(description, options, topology) {
  61. super();
  62. this.s = {
  63. // the server description
  64. description,
  65. // a saved copy of the incoming options
  66. options,
  67. // the server logger
  68. logger: Logger('Server', options),
  69. // the bson parser
  70. bson: options.bson || new BSON(),
  71. // client metadata for the initial handshake
  72. clientInfo: createClientInfo(options),
  73. // state variable to determine if there is an active server check in progress
  74. monitoring: false,
  75. // the implementation of the monitoring method
  76. monitorFunction: options.monitorFunction || monitorServer,
  77. // the connection pool
  78. pool: null,
  79. // the server state
  80. state: STATE_DISCONNECTED,
  81. credentials: options.credentials,
  82. topology
  83. };
  84. }
  85. get description() {
  86. return this.s.description;
  87. }
  88. get name() {
  89. return this.s.description.address;
  90. }
  91. /**
  92. * Initiate server connect
  93. */
  94. connect(options) {
  95. options = options || {};
  96. // do not allow connect to be called on anything that's not disconnected
  97. if (this.s.pool && !this.s.pool.isDisconnected() && !this.s.pool.isDestroyed()) {
  98. throw new MongoError(`Server instance in invalid state ${this.s.pool.state}`);
  99. }
  100. // create a pool
  101. const addressParts = this.description.address.split(':');
  102. const poolOptions = Object.assign(
  103. { host: addressParts[0], port: parseInt(addressParts[1], 10) },
  104. this.s.options,
  105. options,
  106. { bson: this.s.bson }
  107. );
  108. // NOTE: this should only be the case if we are connecting to a single server
  109. poolOptions.reconnect = true;
  110. this.s.pool = new Pool(this, poolOptions);
  111. // setup listeners
  112. this.s.pool.on('connect', connectEventHandler(this));
  113. this.s.pool.on('close', errorEventHandler(this));
  114. this.s.pool.on('error', errorEventHandler(this));
  115. this.s.pool.on('parseError', parseErrorEventHandler(this));
  116. // it is unclear whether consumers should even know about these events
  117. // this.s.pool.on('timeout', timeoutEventHandler(this));
  118. // this.s.pool.on('reconnect', reconnectEventHandler(this));
  119. // this.s.pool.on('reconnectFailed', errorEventHandler(this));
  120. // relay all command monitoring events
  121. relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']);
  122. this.s.state = STATE_CONNECTING;
  123. // If auth settings have been provided, use them
  124. if (options.auth) {
  125. this.s.pool.connect.apply(this.s.pool, options.auth);
  126. return;
  127. }
  128. this.s.pool.connect();
  129. }
  130. /**
  131. * Destroy the server connection
  132. *
  133. * @param {Boolean} [options.force=false] Force destroy the pool
  134. */
  135. destroy(options, callback) {
  136. if (typeof options === 'function') (callback = options), (options = {});
  137. options = Object.assign({}, { force: false }, options);
  138. if (!this.s.pool) {
  139. this.s.state = STATE_DISCONNECTED;
  140. if (typeof callback === 'function') {
  141. callback(null, null);
  142. }
  143. return;
  144. }
  145. ['close', 'error', 'timeout', 'parseError', 'connect'].forEach(event => {
  146. this.s.pool.removeAllListeners(event);
  147. });
  148. if (this.s.monitorId) {
  149. clearTimeout(this.s.monitorId);
  150. }
  151. this.s.pool.destroy(options.force, err => {
  152. this.s.state = STATE_DISCONNECTED;
  153. callback(err);
  154. });
  155. }
  156. /**
  157. * Immediately schedule monitoring of this server. If there already an attempt being made
  158. * this will be a no-op.
  159. */
  160. monitor(options) {
  161. options = options || {};
  162. if (this.s.state !== STATE_CONNECTED || this.s.monitoring) return;
  163. if (this.s.monitorId) clearTimeout(this.s.monitorId);
  164. this.s.monitorFunction(this, options);
  165. }
  166. /**
  167. * Execute a command
  168. *
  169. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  170. * @param {object} cmd The command hash
  171. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  172. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  173. * @param {Boolean} [options.checkKeys=false] Specify if the bson parser should validate keys.
  174. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  175. * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document.
  176. * @param {ClientSession} [options.session=null] Session to use for the operation
  177. * @param {opResultCallback} callback A callback function
  178. */
  179. command(ns, cmd, options, callback) {
  180. if (typeof options === 'function') {
  181. (callback = options), (options = {}), (options = options || {});
  182. }
  183. const error = basicReadValidations(this, options);
  184. if (error) {
  185. return callback(error, null);
  186. }
  187. // Clone the options
  188. options = Object.assign({}, options, { wireProtocolCommand: false });
  189. // Debug log
  190. if (this.s.logger.isDebug()) {
  191. this.s.logger.debug(
  192. `executing command [${JSON.stringify({
  193. ns,
  194. cmd,
  195. options: debugOptions(DEBUG_FIELDS, options)
  196. })}] against ${this.name}`
  197. );
  198. }
  199. // error if collation not supported
  200. if (collationNotSupported(this, cmd)) {
  201. callback(new MongoError(`server ${this.name} does not support collation`));
  202. return;
  203. }
  204. wireProtocol.command(this, ns, cmd, options, callback);
  205. }
  206. /**
  207. * Insert one or more documents
  208. * @method
  209. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  210. * @param {array} ops An array of documents to insert
  211. * @param {boolean} [options.ordered=true] Execute in order or out of order
  212. * @param {object} [options.writeConcern={}] Write concern for the operation
  213. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  214. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  215. * @param {ClientSession} [options.session=null] Session to use for the operation
  216. * @param {opResultCallback} callback A callback function
  217. */
  218. insert(ns, ops, options, callback) {
  219. executeWriteOperation({ server: this, op: 'insert', ns, ops }, options, callback);
  220. }
  221. /**
  222. * Perform one or more update operations
  223. * @method
  224. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  225. * @param {array} ops An array of updates
  226. * @param {boolean} [options.ordered=true] Execute in order or out of order
  227. * @param {object} [options.writeConcern={}] Write concern for the operation
  228. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  229. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  230. * @param {ClientSession} [options.session=null] Session to use for the operation
  231. * @param {opResultCallback} callback A callback function
  232. */
  233. update(ns, ops, options, callback) {
  234. executeWriteOperation({ server: this, op: 'update', ns, ops }, options, callback);
  235. }
  236. /**
  237. * Perform one or more remove operations
  238. * @method
  239. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  240. * @param {array} ops An array of removes
  241. * @param {boolean} [options.ordered=true] Execute in order or out of order
  242. * @param {object} [options.writeConcern={}] Write concern for the operation
  243. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  244. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  245. * @param {ClientSession} [options.session=null] Session to use for the operation
  246. * @param {opResultCallback} callback A callback function
  247. */
  248. remove(ns, ops, options, callback) {
  249. executeWriteOperation({ server: this, op: 'remove', ns, ops }, options, callback);
  250. }
  251. }
  252. Object.defineProperty(Server.prototype, 'clusterTime', {
  253. get: function() {
  254. return this.s.topology.clusterTime;
  255. },
  256. set: function(clusterTime) {
  257. this.s.topology.clusterTime = clusterTime;
  258. }
  259. });
  260. function basicWriteValidations(server) {
  261. if (!server.s.pool) {
  262. return new MongoError('server instance is not connected');
  263. }
  264. if (server.s.pool.isDestroyed()) {
  265. return new MongoError('server instance pool was destroyed');
  266. }
  267. return null;
  268. }
  269. function basicReadValidations(server, options) {
  270. const error = basicWriteValidations(server, options);
  271. if (error) {
  272. return error;
  273. }
  274. if (options.readPreference && !(options.readPreference instanceof ReadPreference)) {
  275. return new MongoError('readPreference must be an instance of ReadPreference');
  276. }
  277. }
  278. function executeWriteOperation(args, options, callback) {
  279. if (typeof options === 'function') (callback = options), (options = {});
  280. options = options || {};
  281. // TODO: once we drop Node 4, use destructuring either here or in arguments.
  282. const server = args.server;
  283. const op = args.op;
  284. const ns = args.ns;
  285. const ops = Array.isArray(args.ops) ? args.ops : [args.ops];
  286. const error = basicWriteValidations(server, options);
  287. if (error) {
  288. callback(error, null);
  289. return;
  290. }
  291. if (collationNotSupported(server, options)) {
  292. callback(new MongoError(`server ${this.name} does not support collation`));
  293. return;
  294. }
  295. return wireProtocol[op](server, ns, ops, options, callback);
  296. }
  297. function connectEventHandler(server) {
  298. return function(pool, conn) {
  299. const ismaster = conn.ismaster;
  300. server.s.lastIsMasterMS = conn.lastIsMasterMS;
  301. if (conn.agreedCompressor) {
  302. server.s.pool.options.agreedCompressor = conn.agreedCompressor;
  303. }
  304. if (conn.zlibCompressionLevel) {
  305. server.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel;
  306. }
  307. if (conn.ismaster.$clusterTime) {
  308. const $clusterTime = conn.ismaster.$clusterTime;
  309. server.s.sclusterTime = $clusterTime;
  310. }
  311. // log the connection event if requested
  312. if (server.s.logger.isInfo()) {
  313. server.s.logger.info(
  314. `server ${server.name} connected with ismaster [${JSON.stringify(ismaster)}]`
  315. );
  316. }
  317. // emit an event indicating that our description has changed
  318. server.emit('descriptionReceived', new ServerDescription(server.description.address, ismaster));
  319. // we are connected and handshaked (guaranteed by the pool)
  320. server.s.state = STATE_CONNECTED;
  321. server.emit('connect', server);
  322. };
  323. }
  324. function errorEventHandler(server) {
  325. return function(err) {
  326. if (err) {
  327. server.emit('error', new MongoNetworkError(err));
  328. }
  329. server.emit('close');
  330. };
  331. }
  332. function parseErrorEventHandler(server) {
  333. return function(err) {
  334. server.s.state = STATE_DISCONNECTED;
  335. server.emit('error', new MongoParseError(err));
  336. };
  337. }
  338. module.exports = Server;