123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- 'use strict';
- const EventEmitter = require('events').EventEmitter;
- const crypto = require('crypto');
- const debugOptions = require('./utils').debugOptions;
- const parseHeader = require('../wireprotocol/shared').parseHeader;
- const decompress = require('../wireprotocol/compression').decompress;
- const Response = require('./commands').Response;
- const BinMsg = require('./msg').BinMsg;
- const MongoNetworkError = require('../error').MongoNetworkError;
- const MongoError = require('../error').MongoError;
- const Logger = require('./logger');
- const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
- const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
- const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
- const Buffer = require('safe-buffer').Buffer;
- let _id = 0;
- const DEFAULT_MAX_BSON_MESSAGE_SIZE = 1024 * 1024 * 16 * 4;
- const DEBUG_FIELDS = [
- 'host',
- 'port',
- 'size',
- 'keepAlive',
- 'keepAliveInitialDelay',
- 'noDelay',
- 'connectionTimeout',
- 'socketTimeout',
- 'ssl',
- 'ca',
- 'crl',
- 'cert',
- 'rejectUnauthorized',
- 'promoteLongs',
- 'promoteValues',
- 'promoteBuffers',
- 'checkServerIdentity'
- ];
- let connectionAccountingSpy = undefined;
- let connectionAccounting = false;
- let connections = {};
- /**
- * A class representing a single connection to a MongoDB server
- *
- * @fires Connection#connect
- * @fires Connection#close
- * @fires Connection#error
- * @fires Connection#timeout
- * @fires Connection#parseError
- * @fires Connection#message
- */
- class Connection extends EventEmitter {
- /**
- * Creates a new Connection instance
- *
- * @param {Socket} socket The socket this connection wraps
- * @param {Object} [options] Optional settings
- * @param {string} [options.host] The host the socket is connected to
- * @param {number} [options.port] The port used for the socket connection
- * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
- * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
- * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
- * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
- * @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits
- * @param {boolean} [options.promoteValues] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
- * @param {boolean} [options.promoteBuffers] Promotes Binary BSON values to native Node Buffers.
- */
- constructor(socket, options) {
- super();
- options = options || {};
- if (!options.bson) {
- throw new TypeError('must pass in valid bson parser');
- }
- this.id = _id++;
- this.options = options;
- this.logger = Logger('Connection', options);
- this.bson = options.bson;
- this.tag = options.tag;
- this.maxBsonMessageSize = options.maxBsonMessageSize || DEFAULT_MAX_BSON_MESSAGE_SIZE;
- this.port = options.port || 27017;
- this.host = options.host || 'localhost';
- this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
- // These values are inspected directly in tests, but maybe not necessary to keep around
- this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
- this.keepAliveInitialDelay =
- typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
- this.connectionTimeout =
- typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
- if (this.keepAliveInitialDelay > this.socketTimeout) {
- this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2);
- }
- // Debug information
- if (this.logger.isDebug()) {
- this.logger.debug(
- `creating connection ${this.id} with options [${JSON.stringify(
- debugOptions(DEBUG_FIELDS, options)
- )}]`
- );
- }
- // Response options
- this.responseOptions = {
- promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
- promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
- promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false
- };
- // Flushing
- this.flushing = false;
- this.queue = [];
- // Internal state
- this.writeStream = null;
- this.destroyed = false;
- // Create hash method
- const hash = crypto.createHash('sha1');
- hash.update(this.address);
- this.hashedName = hash.digest('hex');
- // All operations in flight on the connection
- this.workItems = [];
- // setup socket
- this.socket = socket;
- this.socket.once('error', errorHandler(this));
- this.socket.once('timeout', timeoutHandler(this));
- this.socket.once('close', closeHandler(this));
- this.socket.on('data', dataHandler(this));
- if (connectionAccounting) {
- addConnection(this.id, this);
- }
- }
- setSocketTimeout(value) {
- if (this.socket) {
- this.socket.setTimeout(value);
- }
- }
- resetSocketTimeout() {
- if (this.socket) {
- this.socket.setTimeout(this.socketTimeout);
- }
- }
- static enableConnectionAccounting(spy) {
- if (spy) {
- connectionAccountingSpy = spy;
- }
- connectionAccounting = true;
- connections = {};
- }
- static disableConnectionAccounting() {
- connectionAccounting = false;
- connectionAccountingSpy = undefined;
- }
- static connections() {
- return connections;
- }
- get address() {
- return `${this.host}:${this.port}`;
- }
- /**
- * Unref this connection
- * @method
- * @return {boolean}
- */
- unref() {
- if (this.socket == null) {
- this.once('connect', () => this.socket.unref());
- return;
- }
- this.socket.unref();
- }
- /**
- * Destroy connection
- * @method
- */
- destroy(options, callback) {
- if (typeof options === 'function') {
- callback = options;
- options = {};
- }
- options = Object.assign({ force: false }, options);
- if (connectionAccounting) {
- deleteConnection(this.id);
- }
- if (this.socket == null) {
- this.destroyed = true;
- return;
- }
- if (options.force) {
- this.socket.destroy();
- this.destroyed = true;
- if (typeof callback === 'function') callback(null, null);
- return;
- }
- this.socket.end(err => {
- this.destroyed = true;
- if (typeof callback === 'function') callback(err, null);
- });
- }
- /**
- * Write to connection
- * @method
- * @param {Command} command Command to write out need to implement toBin and toBinUnified
- */
- write(buffer) {
- // Debug Log
- if (this.logger.isDebug()) {
- if (!Array.isArray(buffer)) {
- this.logger.debug(`writing buffer [${buffer.toString('hex')}] to ${this.address}`);
- } else {
- for (let i = 0; i < buffer.length; i++)
- this.logger.debug(`writing buffer [${buffer[i].toString('hex')}] to ${this.address}`);
- }
- }
- // Double check that the connection is not destroyed
- if (this.socket.destroyed === false) {
- // Write out the command
- if (!Array.isArray(buffer)) {
- this.socket.write(buffer, 'binary');
- return true;
- }
- // Iterate over all buffers and write them in order to the socket
- for (let i = 0; i < buffer.length; i++) {
- this.socket.write(buffer[i], 'binary');
- }
- return true;
- }
- // Connection is destroyed return write failed
- return false;
- }
- /**
- * Return id of connection as a string
- * @method
- * @return {string}
- */
- toString() {
- return '' + this.id;
- }
- /**
- * Return json object of connection
- * @method
- * @return {object}
- */
- toJSON() {
- return { id: this.id, host: this.host, port: this.port };
- }
- /**
- * Is the connection connected
- * @method
- * @return {boolean}
- */
- isConnected() {
- if (this.destroyed) return false;
- return !this.socket.destroyed && this.socket.writable;
- }
- }
- function deleteConnection(id) {
- // console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : ''))
- delete connections[id];
- if (connectionAccountingSpy) {
- connectionAccountingSpy.deleteConnection(id);
- }
- }
- function addConnection(id, connection) {
- // console.log("=== added connection " + id + " :: " + connection.port)
- connections[id] = connection;
- if (connectionAccountingSpy) {
- connectionAccountingSpy.addConnection(id, connection);
- }
- }
- //
- // Connection handlers
- function errorHandler(conn) {
- return function(err) {
- if (connectionAccounting) deleteConnection(conn.id);
- // Debug information
- if (conn.logger.isDebug()) {
- conn.logger.debug(
- `connection ${conn.id} for [${conn.address}] errored out with [${JSON.stringify(err)}]`
- );
- }
- conn.emit('error', new MongoNetworkError(err), conn);
- };
- }
- function timeoutHandler(conn) {
- return function() {
- if (connectionAccounting) deleteConnection(conn.id);
- if (conn.logger.isDebug()) {
- conn.logger.debug(`connection ${conn.id} for [${conn.address}] timed out`);
- }
- conn.emit(
- 'timeout',
- new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`),
- conn
- );
- };
- }
- function closeHandler(conn) {
- return function(hadError) {
- if (connectionAccounting) deleteConnection(conn.id);
- if (conn.logger.isDebug()) {
- conn.logger.debug(`connection ${conn.id} with for [${conn.address}] closed`);
- }
- if (!hadError) {
- conn.emit(
- 'close',
- new MongoNetworkError(`connection ${conn.id} to ${conn.address} closed`),
- conn
- );
- }
- };
- }
- // Handle a message once it is received
- function processMessage(conn, message) {
- const msgHeader = parseHeader(message);
- if (msgHeader.opCode !== OP_COMPRESSED) {
- const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
- conn.emit(
- 'message',
- new ResponseConstructor(
- conn.bson,
- message,
- msgHeader,
- message.slice(MESSAGE_HEADER_SIZE),
- conn.responseOptions
- ),
- conn
- );
- return;
- }
- msgHeader.fromCompressed = true;
- let index = MESSAGE_HEADER_SIZE;
- msgHeader.opCode = message.readInt32LE(index);
- index += 4;
- msgHeader.length = message.readInt32LE(index);
- index += 4;
- const compressorID = message[index];
- index++;
- decompress(compressorID, message.slice(index), (err, decompressedMsgBody) => {
- if (err) {
- conn.emit('error', err);
- return;
- }
- if (decompressedMsgBody.length !== msgHeader.length) {
- conn.emit(
- 'error',
- new MongoError(
- 'Decompressing a compressed message from the server failed. The message is corrupt.'
- )
- );
- return;
- }
- const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
- conn.emit(
- 'message',
- new ResponseConstructor(
- conn.bson,
- message,
- msgHeader,
- decompressedMsgBody,
- conn.responseOptions
- ),
- conn
- );
- });
- }
- function dataHandler(conn) {
- return function(data) {
- // Parse until we are done with the data
- while (data.length > 0) {
- // If we still have bytes to read on the current message
- if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) {
- // Calculate the amount of remaining bytes
- const remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead;
- // Check if the current chunk contains the rest of the message
- if (remainingBytesToRead > data.length) {
- // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
- data.copy(conn.buffer, conn.bytesRead);
- // Adjust the number of bytes read so it point to the correct index in the buffer
- conn.bytesRead = conn.bytesRead + data.length;
- // Reset state of buffer
- data = Buffer.alloc(0);
- } else {
- // Copy the missing part of the data into our current buffer
- data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead);
- // Slice the overflow into a new buffer that we will then re-parse
- data = data.slice(remainingBytesToRead);
- // Emit current complete message
- const emitBuffer = conn.buffer;
- // Reset state of buffer
- conn.buffer = null;
- conn.sizeOfMessage = 0;
- conn.bytesRead = 0;
- conn.stubBuffer = null;
- processMessage(conn, emitBuffer);
- }
- } else {
- // Stub buffer is kept in case we don't get enough bytes to determine the
- // size of the message (< 4 bytes)
- if (conn.stubBuffer != null && conn.stubBuffer.length > 0) {
- // If we have enough bytes to determine the message size let's do it
- if (conn.stubBuffer.length + data.length > 4) {
- // Prepad the data
- const newData = Buffer.alloc(conn.stubBuffer.length + data.length);
- conn.stubBuffer.copy(newData, 0);
- data.copy(newData, conn.stubBuffer.length);
- // Reassign for parsing
- data = newData;
- // Reset state of buffer
- conn.buffer = null;
- conn.sizeOfMessage = 0;
- conn.bytesRead = 0;
- conn.stubBuffer = null;
- } else {
- // Add the the bytes to the stub buffer
- const newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length);
- // Copy existing stub buffer
- conn.stubBuffer.copy(newStubBuffer, 0);
- // Copy missing part of the data
- data.copy(newStubBuffer, conn.stubBuffer.length);
- // Exit parsing loop
- data = Buffer.alloc(0);
- }
- } else {
- if (data.length > 4) {
- // Retrieve the message size
- const sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24);
- // If we have a negative sizeOfMessage emit error and return
- if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) {
- const errorObject = {
- err: 'socketHandler',
- trace: '',
- bin: conn.buffer,
- parseState: {
- sizeOfMessage: sizeOfMessage,
- bytesRead: conn.bytesRead,
- stubBuffer: conn.stubBuffer
- }
- };
- // We got a parse Error fire it off then keep going
- conn.emit('parseError', errorObject, conn);
- return;
- }
- // Ensure that the size of message is larger than 0 and less than the max allowed
- if (
- sizeOfMessage > 4 &&
- sizeOfMessage < conn.maxBsonMessageSize &&
- sizeOfMessage > data.length
- ) {
- conn.buffer = Buffer.alloc(sizeOfMessage);
- // Copy all the data into the buffer
- data.copy(conn.buffer, 0);
- // Update bytes read
- conn.bytesRead = data.length;
- // Update sizeOfMessage
- conn.sizeOfMessage = sizeOfMessage;
- // Ensure stub buffer is null
- conn.stubBuffer = null;
- // Exit parsing loop
- data = Buffer.alloc(0);
- } else if (
- sizeOfMessage > 4 &&
- sizeOfMessage < conn.maxBsonMessageSize &&
- sizeOfMessage === data.length
- ) {
- const emitBuffer = data;
- // Reset state of buffer
- conn.buffer = null;
- conn.sizeOfMessage = 0;
- conn.bytesRead = 0;
- conn.stubBuffer = null;
- // Exit parsing loop
- data = Buffer.alloc(0);
- // Emit the message
- processMessage(conn, emitBuffer);
- } else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) {
- const errorObject = {
- err: 'socketHandler',
- trace: null,
- bin: data,
- parseState: {
- sizeOfMessage: sizeOfMessage,
- bytesRead: 0,
- buffer: null,
- stubBuffer: null
- }
- };
- // We got a parse Error fire it off then keep going
- conn.emit('parseError', errorObject, conn);
- // Clear out the state of the parser
- conn.buffer = null;
- conn.sizeOfMessage = 0;
- conn.bytesRead = 0;
- conn.stubBuffer = null;
- // Exit parsing loop
- data = Buffer.alloc(0);
- } else {
- const emitBuffer = data.slice(0, sizeOfMessage);
- // Reset state of buffer
- conn.buffer = null;
- conn.sizeOfMessage = 0;
- conn.bytesRead = 0;
- conn.stubBuffer = null;
- // Copy rest of message
- data = data.slice(sizeOfMessage);
- // Emit the message
- processMessage(conn, emitBuffer);
- }
- } else {
- // Create a buffer that contains the space for the non-complete message
- conn.stubBuffer = Buffer.alloc(data.length);
- // Copy the data to the stub buffer
- data.copy(conn.stubBuffer, 0);
- // Exit parsing loop
- data = Buffer.alloc(0);
- }
- }
- }
- }
- };
- }
- /**
- * A server connect event, used to verify that the connection is up and running
- *
- * @event Connection#connect
- * @type {Connection}
- */
- /**
- * The server connection closed, all pool connections closed
- *
- * @event Connection#close
- * @type {Connection}
- */
- /**
- * The server connection caused an error, all pool connections closed
- *
- * @event Connection#error
- * @type {Connection}
- */
- /**
- * The server connection timed out, all pool connections closed
- *
- * @event Connection#timeout
- * @type {Connection}
- */
- /**
- * The driver experienced an invalid message, all pool connections closed
- *
- * @event Connection#parseError
- * @type {Connection}
- */
- /**
- * An event emitted each time the connection receives a parsed message from the wire
- *
- * @event Connection#message
- * @type {Connection}
- */
- module.exports = Connection;
|