connection.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. 'use strict';
  2. const EventEmitter = require('events').EventEmitter;
  3. const crypto = require('crypto');
  4. const debugOptions = require('./utils').debugOptions;
  5. const parseHeader = require('../wireprotocol/shared').parseHeader;
  6. const decompress = require('../wireprotocol/compression').decompress;
  7. const Response = require('./commands').Response;
  8. const BinMsg = require('./msg').BinMsg;
  9. const MongoNetworkError = require('../error').MongoNetworkError;
  10. const MongoError = require('../error').MongoError;
  11. const Logger = require('./logger');
  12. const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
  13. const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
  14. const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
  15. const Buffer = require('safe-buffer').Buffer;
  16. let _id = 0;
  17. const DEFAULT_MAX_BSON_MESSAGE_SIZE = 1024 * 1024 * 16 * 4;
  18. const DEBUG_FIELDS = [
  19. 'host',
  20. 'port',
  21. 'size',
  22. 'keepAlive',
  23. 'keepAliveInitialDelay',
  24. 'noDelay',
  25. 'connectionTimeout',
  26. 'socketTimeout',
  27. 'ssl',
  28. 'ca',
  29. 'crl',
  30. 'cert',
  31. 'rejectUnauthorized',
  32. 'promoteLongs',
  33. 'promoteValues',
  34. 'promoteBuffers',
  35. 'checkServerIdentity'
  36. ];
  37. let connectionAccountingSpy = undefined;
  38. let connectionAccounting = false;
  39. let connections = {};
  40. /**
  41. * A class representing a single connection to a MongoDB server
  42. *
  43. * @fires Connection#connect
  44. * @fires Connection#close
  45. * @fires Connection#error
  46. * @fires Connection#timeout
  47. * @fires Connection#parseError
  48. * @fires Connection#message
  49. */
  50. class Connection extends EventEmitter {
  51. /**
  52. * Creates a new Connection instance
  53. *
  54. * @param {Socket} socket The socket this connection wraps
  55. * @param {Object} [options] Optional settings
  56. * @param {string} [options.host] The host the socket is connected to
  57. * @param {number} [options.port] The port used for the socket connection
  58. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  59. * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
  60. * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
  61. * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
  62. * @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits
  63. * @param {boolean} [options.promoteValues] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  64. * @param {boolean} [options.promoteBuffers] Promotes Binary BSON values to native Node Buffers.
  65. */
  66. constructor(socket, options) {
  67. super();
  68. options = options || {};
  69. if (!options.bson) {
  70. throw new TypeError('must pass in valid bson parser');
  71. }
  72. this.id = _id++;
  73. this.options = options;
  74. this.logger = Logger('Connection', options);
  75. this.bson = options.bson;
  76. this.tag = options.tag;
  77. this.maxBsonMessageSize = options.maxBsonMessageSize || DEFAULT_MAX_BSON_MESSAGE_SIZE;
  78. this.port = options.port || 27017;
  79. this.host = options.host || 'localhost';
  80. this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
  81. // These values are inspected directly in tests, but maybe not necessary to keep around
  82. this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
  83. this.keepAliveInitialDelay =
  84. typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
  85. this.connectionTimeout =
  86. typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
  87. if (this.keepAliveInitialDelay > this.socketTimeout) {
  88. this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2);
  89. }
  90. // Debug information
  91. if (this.logger.isDebug()) {
  92. this.logger.debug(
  93. `creating connection ${this.id} with options [${JSON.stringify(
  94. debugOptions(DEBUG_FIELDS, options)
  95. )}]`
  96. );
  97. }
  98. // Response options
  99. this.responseOptions = {
  100. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  101. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  102. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false
  103. };
  104. // Flushing
  105. this.flushing = false;
  106. this.queue = [];
  107. // Internal state
  108. this.writeStream = null;
  109. this.destroyed = false;
  110. // Create hash method
  111. const hash = crypto.createHash('sha1');
  112. hash.update(this.address);
  113. this.hashedName = hash.digest('hex');
  114. // All operations in flight on the connection
  115. this.workItems = [];
  116. // setup socket
  117. this.socket = socket;
  118. this.socket.once('error', errorHandler(this));
  119. this.socket.once('timeout', timeoutHandler(this));
  120. this.socket.once('close', closeHandler(this));
  121. this.socket.on('data', dataHandler(this));
  122. if (connectionAccounting) {
  123. addConnection(this.id, this);
  124. }
  125. }
  126. setSocketTimeout(value) {
  127. if (this.socket) {
  128. this.socket.setTimeout(value);
  129. }
  130. }
  131. resetSocketTimeout() {
  132. if (this.socket) {
  133. this.socket.setTimeout(this.socketTimeout);
  134. }
  135. }
  136. static enableConnectionAccounting(spy) {
  137. if (spy) {
  138. connectionAccountingSpy = spy;
  139. }
  140. connectionAccounting = true;
  141. connections = {};
  142. }
  143. static disableConnectionAccounting() {
  144. connectionAccounting = false;
  145. connectionAccountingSpy = undefined;
  146. }
  147. static connections() {
  148. return connections;
  149. }
  150. get address() {
  151. return `${this.host}:${this.port}`;
  152. }
  153. /**
  154. * Unref this connection
  155. * @method
  156. * @return {boolean}
  157. */
  158. unref() {
  159. if (this.socket == null) {
  160. this.once('connect', () => this.socket.unref());
  161. return;
  162. }
  163. this.socket.unref();
  164. }
  165. /**
  166. * Destroy connection
  167. * @method
  168. */
  169. destroy(options, callback) {
  170. if (typeof options === 'function') {
  171. callback = options;
  172. options = {};
  173. }
  174. options = Object.assign({ force: false }, options);
  175. if (connectionAccounting) {
  176. deleteConnection(this.id);
  177. }
  178. if (this.socket == null) {
  179. this.destroyed = true;
  180. return;
  181. }
  182. if (options.force) {
  183. this.socket.destroy();
  184. this.destroyed = true;
  185. if (typeof callback === 'function') callback(null, null);
  186. return;
  187. }
  188. this.socket.end(err => {
  189. this.destroyed = true;
  190. if (typeof callback === 'function') callback(err, null);
  191. });
  192. }
  193. /**
  194. * Write to connection
  195. * @method
  196. * @param {Command} command Command to write out need to implement toBin and toBinUnified
  197. */
  198. write(buffer) {
  199. // Debug Log
  200. if (this.logger.isDebug()) {
  201. if (!Array.isArray(buffer)) {
  202. this.logger.debug(`writing buffer [${buffer.toString('hex')}] to ${this.address}`);
  203. } else {
  204. for (let i = 0; i < buffer.length; i++)
  205. this.logger.debug(`writing buffer [${buffer[i].toString('hex')}] to ${this.address}`);
  206. }
  207. }
  208. // Double check that the connection is not destroyed
  209. if (this.socket.destroyed === false) {
  210. // Write out the command
  211. if (!Array.isArray(buffer)) {
  212. this.socket.write(buffer, 'binary');
  213. return true;
  214. }
  215. // Iterate over all buffers and write them in order to the socket
  216. for (let i = 0; i < buffer.length; i++) {
  217. this.socket.write(buffer[i], 'binary');
  218. }
  219. return true;
  220. }
  221. // Connection is destroyed return write failed
  222. return false;
  223. }
  224. /**
  225. * Return id of connection as a string
  226. * @method
  227. * @return {string}
  228. */
  229. toString() {
  230. return '' + this.id;
  231. }
  232. /**
  233. * Return json object of connection
  234. * @method
  235. * @return {object}
  236. */
  237. toJSON() {
  238. return { id: this.id, host: this.host, port: this.port };
  239. }
  240. /**
  241. * Is the connection connected
  242. * @method
  243. * @return {boolean}
  244. */
  245. isConnected() {
  246. if (this.destroyed) return false;
  247. return !this.socket.destroyed && this.socket.writable;
  248. }
  249. }
  250. function deleteConnection(id) {
  251. // console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : ''))
  252. delete connections[id];
  253. if (connectionAccountingSpy) {
  254. connectionAccountingSpy.deleteConnection(id);
  255. }
  256. }
  257. function addConnection(id, connection) {
  258. // console.log("=== added connection " + id + " :: " + connection.port)
  259. connections[id] = connection;
  260. if (connectionAccountingSpy) {
  261. connectionAccountingSpy.addConnection(id, connection);
  262. }
  263. }
  264. //
  265. // Connection handlers
  266. function errorHandler(conn) {
  267. return function(err) {
  268. if (connectionAccounting) deleteConnection(conn.id);
  269. // Debug information
  270. if (conn.logger.isDebug()) {
  271. conn.logger.debug(
  272. `connection ${conn.id} for [${conn.address}] errored out with [${JSON.stringify(err)}]`
  273. );
  274. }
  275. conn.emit('error', new MongoNetworkError(err), conn);
  276. };
  277. }
  278. function timeoutHandler(conn) {
  279. return function() {
  280. if (connectionAccounting) deleteConnection(conn.id);
  281. if (conn.logger.isDebug()) {
  282. conn.logger.debug(`connection ${conn.id} for [${conn.address}] timed out`);
  283. }
  284. conn.emit(
  285. 'timeout',
  286. new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`),
  287. conn
  288. );
  289. };
  290. }
  291. function closeHandler(conn) {
  292. return function(hadError) {
  293. if (connectionAccounting) deleteConnection(conn.id);
  294. if (conn.logger.isDebug()) {
  295. conn.logger.debug(`connection ${conn.id} with for [${conn.address}] closed`);
  296. }
  297. if (!hadError) {
  298. conn.emit(
  299. 'close',
  300. new MongoNetworkError(`connection ${conn.id} to ${conn.address} closed`),
  301. conn
  302. );
  303. }
  304. };
  305. }
  306. // Handle a message once it is received
  307. function processMessage(conn, message) {
  308. const msgHeader = parseHeader(message);
  309. if (msgHeader.opCode !== OP_COMPRESSED) {
  310. const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
  311. conn.emit(
  312. 'message',
  313. new ResponseConstructor(
  314. conn.bson,
  315. message,
  316. msgHeader,
  317. message.slice(MESSAGE_HEADER_SIZE),
  318. conn.responseOptions
  319. ),
  320. conn
  321. );
  322. return;
  323. }
  324. msgHeader.fromCompressed = true;
  325. let index = MESSAGE_HEADER_SIZE;
  326. msgHeader.opCode = message.readInt32LE(index);
  327. index += 4;
  328. msgHeader.length = message.readInt32LE(index);
  329. index += 4;
  330. const compressorID = message[index];
  331. index++;
  332. decompress(compressorID, message.slice(index), (err, decompressedMsgBody) => {
  333. if (err) {
  334. conn.emit('error', err);
  335. return;
  336. }
  337. if (decompressedMsgBody.length !== msgHeader.length) {
  338. conn.emit(
  339. 'error',
  340. new MongoError(
  341. 'Decompressing a compressed message from the server failed. The message is corrupt.'
  342. )
  343. );
  344. return;
  345. }
  346. const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
  347. conn.emit(
  348. 'message',
  349. new ResponseConstructor(
  350. conn.bson,
  351. message,
  352. msgHeader,
  353. decompressedMsgBody,
  354. conn.responseOptions
  355. ),
  356. conn
  357. );
  358. });
  359. }
  360. function dataHandler(conn) {
  361. return function(data) {
  362. // Parse until we are done with the data
  363. while (data.length > 0) {
  364. // If we still have bytes to read on the current message
  365. if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) {
  366. // Calculate the amount of remaining bytes
  367. const remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead;
  368. // Check if the current chunk contains the rest of the message
  369. if (remainingBytesToRead > data.length) {
  370. // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
  371. data.copy(conn.buffer, conn.bytesRead);
  372. // Adjust the number of bytes read so it point to the correct index in the buffer
  373. conn.bytesRead = conn.bytesRead + data.length;
  374. // Reset state of buffer
  375. data = Buffer.alloc(0);
  376. } else {
  377. // Copy the missing part of the data into our current buffer
  378. data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead);
  379. // Slice the overflow into a new buffer that we will then re-parse
  380. data = data.slice(remainingBytesToRead);
  381. // Emit current complete message
  382. const emitBuffer = conn.buffer;
  383. // Reset state of buffer
  384. conn.buffer = null;
  385. conn.sizeOfMessage = 0;
  386. conn.bytesRead = 0;
  387. conn.stubBuffer = null;
  388. processMessage(conn, emitBuffer);
  389. }
  390. } else {
  391. // Stub buffer is kept in case we don't get enough bytes to determine the
  392. // size of the message (< 4 bytes)
  393. if (conn.stubBuffer != null && conn.stubBuffer.length > 0) {
  394. // If we have enough bytes to determine the message size let's do it
  395. if (conn.stubBuffer.length + data.length > 4) {
  396. // Prepad the data
  397. const newData = Buffer.alloc(conn.stubBuffer.length + data.length);
  398. conn.stubBuffer.copy(newData, 0);
  399. data.copy(newData, conn.stubBuffer.length);
  400. // Reassign for parsing
  401. data = newData;
  402. // Reset state of buffer
  403. conn.buffer = null;
  404. conn.sizeOfMessage = 0;
  405. conn.bytesRead = 0;
  406. conn.stubBuffer = null;
  407. } else {
  408. // Add the the bytes to the stub buffer
  409. const newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length);
  410. // Copy existing stub buffer
  411. conn.stubBuffer.copy(newStubBuffer, 0);
  412. // Copy missing part of the data
  413. data.copy(newStubBuffer, conn.stubBuffer.length);
  414. // Exit parsing loop
  415. data = Buffer.alloc(0);
  416. }
  417. } else {
  418. if (data.length > 4) {
  419. // Retrieve the message size
  420. const sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24);
  421. // If we have a negative sizeOfMessage emit error and return
  422. if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) {
  423. const errorObject = {
  424. err: 'socketHandler',
  425. trace: '',
  426. bin: conn.buffer,
  427. parseState: {
  428. sizeOfMessage: sizeOfMessage,
  429. bytesRead: conn.bytesRead,
  430. stubBuffer: conn.stubBuffer
  431. }
  432. };
  433. // We got a parse Error fire it off then keep going
  434. conn.emit('parseError', errorObject, conn);
  435. return;
  436. }
  437. // Ensure that the size of message is larger than 0 and less than the max allowed
  438. if (
  439. sizeOfMessage > 4 &&
  440. sizeOfMessage < conn.maxBsonMessageSize &&
  441. sizeOfMessage > data.length
  442. ) {
  443. conn.buffer = Buffer.alloc(sizeOfMessage);
  444. // Copy all the data into the buffer
  445. data.copy(conn.buffer, 0);
  446. // Update bytes read
  447. conn.bytesRead = data.length;
  448. // Update sizeOfMessage
  449. conn.sizeOfMessage = sizeOfMessage;
  450. // Ensure stub buffer is null
  451. conn.stubBuffer = null;
  452. // Exit parsing loop
  453. data = Buffer.alloc(0);
  454. } else if (
  455. sizeOfMessage > 4 &&
  456. sizeOfMessage < conn.maxBsonMessageSize &&
  457. sizeOfMessage === data.length
  458. ) {
  459. const emitBuffer = data;
  460. // Reset state of buffer
  461. conn.buffer = null;
  462. conn.sizeOfMessage = 0;
  463. conn.bytesRead = 0;
  464. conn.stubBuffer = null;
  465. // Exit parsing loop
  466. data = Buffer.alloc(0);
  467. // Emit the message
  468. processMessage(conn, emitBuffer);
  469. } else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) {
  470. const errorObject = {
  471. err: 'socketHandler',
  472. trace: null,
  473. bin: data,
  474. parseState: {
  475. sizeOfMessage: sizeOfMessage,
  476. bytesRead: 0,
  477. buffer: null,
  478. stubBuffer: null
  479. }
  480. };
  481. // We got a parse Error fire it off then keep going
  482. conn.emit('parseError', errorObject, conn);
  483. // Clear out the state of the parser
  484. conn.buffer = null;
  485. conn.sizeOfMessage = 0;
  486. conn.bytesRead = 0;
  487. conn.stubBuffer = null;
  488. // Exit parsing loop
  489. data = Buffer.alloc(0);
  490. } else {
  491. const emitBuffer = data.slice(0, sizeOfMessage);
  492. // Reset state of buffer
  493. conn.buffer = null;
  494. conn.sizeOfMessage = 0;
  495. conn.bytesRead = 0;
  496. conn.stubBuffer = null;
  497. // Copy rest of message
  498. data = data.slice(sizeOfMessage);
  499. // Emit the message
  500. processMessage(conn, emitBuffer);
  501. }
  502. } else {
  503. // Create a buffer that contains the space for the non-complete message
  504. conn.stubBuffer = Buffer.alloc(data.length);
  505. // Copy the data to the stub buffer
  506. data.copy(conn.stubBuffer, 0);
  507. // Exit parsing loop
  508. data = Buffer.alloc(0);
  509. }
  510. }
  511. }
  512. }
  513. };
  514. }
  515. /**
  516. * A server connect event, used to verify that the connection is up and running
  517. *
  518. * @event Connection#connect
  519. * @type {Connection}
  520. */
  521. /**
  522. * The server connection closed, all pool connections closed
  523. *
  524. * @event Connection#close
  525. * @type {Connection}
  526. */
  527. /**
  528. * The server connection caused an error, all pool connections closed
  529. *
  530. * @event Connection#error
  531. * @type {Connection}
  532. */
  533. /**
  534. * The server connection timed out, all pool connections closed
  535. *
  536. * @event Connection#timeout
  537. * @type {Connection}
  538. */
  539. /**
  540. * The driver experienced an invalid message, all pool connections closed
  541. *
  542. * @event Connection#parseError
  543. * @type {Connection}
  544. */
  545. /**
  546. * An event emitted each time the connection receives a parsed message from the wire
  547. *
  548. * @event Connection#message
  549. * @type {Connection}
  550. */
  551. module.exports = Connection;