123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949 |
- 'use strict';
- var inherits = require('util').inherits,
- f = require('util').format,
- EventEmitter = require('events').EventEmitter,
- ReadPreference = require('./read_preference'),
- Logger = require('../connection/logger'),
- debugOptions = require('../connection/utils').debugOptions,
- retrieveBSON = require('../connection/utils').retrieveBSON,
- Pool = require('../connection/pool'),
- MongoError = require('../error').MongoError,
- MongoNetworkError = require('../error').MongoNetworkError,
- wireProtocol = require('../wireprotocol'),
- BasicCursor = require('../cursor'),
- sdam = require('./shared'),
- createClientInfo = require('./shared').createClientInfo,
- createCompressionInfo = require('./shared').createCompressionInfo,
- resolveClusterTime = require('./shared').resolveClusterTime,
- SessionMixins = require('./shared').SessionMixins,
- relayEvents = require('../utils').relayEvents;
- const collationNotSupported = require('../utils').collationNotSupported;
- // Used for filtering out fields for loggin
- var debugFields = [
- 'reconnect',
- 'reconnectTries',
- 'reconnectInterval',
- 'emitError',
- 'cursorFactory',
- 'host',
- 'port',
- 'size',
- 'keepAlive',
- 'keepAliveInitialDelay',
- 'noDelay',
- 'connectionTimeout',
- 'checkServerIdentity',
- 'socketTimeout',
- 'ssl',
- 'ca',
- 'crl',
- 'cert',
- 'key',
- 'rejectUnauthorized',
- 'promoteLongs',
- 'promoteValues',
- 'promoteBuffers',
- 'servername'
- ];
- // Server instance id
- var id = 0;
- var serverAccounting = false;
- var servers = {};
- var BSON = retrieveBSON();
- /**
- * Creates a new Server instance
- * @class
- * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
- * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
- * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
- * @param {number} [options.monitoring=true] Enable the server state monitoring (calling ismaster at monitoringInterval)
- * @param {number} [options.monitoringInterval=5000] The interval of calling ismaster when monitoring is enabled.
- * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
- * @param {string} options.host The server host
- * @param {number} options.port The server port
- * @param {number} [options.size=5] Server connection pool size
- * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
- * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
- * @param {boolean} [options.noDelay=true] TCP Connection no delay
- * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
- * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
- * @param {boolean} [options.ssl=false] Use SSL for connection
- * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
- * @param {Buffer} [options.ca] SSL Certificate store binary buffer
- * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
- * @param {Buffer} [options.cert] SSL Certificate binary buffer
- * @param {Buffer} [options.key] SSL Key file binary buffer
- * @param {string} [options.passphrase] SSL Certificate pass phrase
- * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
- * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
- * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
- * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
- * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
- * @param {string} [options.appname=null] Application name, passed in on ismaster call and logged in mongod server logs. Maximum size 128 bytes.
- * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
- * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology
- * @return {Server} A cursor instance
- * @fires Server#connect
- * @fires Server#close
- * @fires Server#error
- * @fires Server#timeout
- * @fires Server#parseError
- * @fires Server#reconnect
- * @fires Server#reconnectFailed
- * @fires Server#serverHeartbeatStarted
- * @fires Server#serverHeartbeatSucceeded
- * @fires Server#serverHeartbeatFailed
- * @fires Server#topologyOpening
- * @fires Server#topologyClosed
- * @fires Server#topologyDescriptionChanged
- * @property {string} type the topology type.
- * @property {string} parserType the parser type used (c++ or js).
- */
- var Server = function(options) {
- options = options || {};
- // Add event listener
- EventEmitter.call(this);
- // Server instance id
- this.id = id++;
- // Internal state
- this.s = {
- // Options
- options: options,
- // Logger
- logger: Logger('Server', options),
- // Factory overrides
- Cursor: options.cursorFactory || BasicCursor,
- // BSON instance
- bson:
- options.bson ||
- new BSON([
- BSON.Binary,
- BSON.Code,
- BSON.DBRef,
- BSON.Decimal128,
- BSON.Double,
- BSON.Int32,
- BSON.Long,
- BSON.Map,
- BSON.MaxKey,
- BSON.MinKey,
- BSON.ObjectId,
- BSON.BSONRegExp,
- BSON.Symbol,
- BSON.Timestamp
- ]),
- // Pool
- pool: null,
- // Disconnect handler
- disconnectHandler: options.disconnectHandler,
- // Monitor thread (keeps the connection alive)
- monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : true,
- // Is the server in a topology
- inTopology: !!options.parent,
- // Monitoring timeout
- monitoringInterval:
- typeof options.monitoringInterval === 'number' ? options.monitoringInterval : 5000,
- // Topology id
- topologyId: -1,
- compression: { compressors: createCompressionInfo(options) },
- // Optional parent topology
- parent: options.parent
- };
- // If this is a single deployment we need to track the clusterTime here
- if (!this.s.parent) {
- this.s.clusterTime = null;
- }
- // Curent ismaster
- this.ismaster = null;
- // Current ping time
- this.lastIsMasterMS = -1;
- // The monitoringProcessId
- this.monitoringProcessId = null;
- // Initial connection
- this.initialConnect = true;
- // Default type
- this._type = 'server';
- // Set the client info
- this.clientInfo = createClientInfo(options);
- // Max Stalleness values
- // last time we updated the ismaster state
- this.lastUpdateTime = 0;
- // Last write time
- this.lastWriteDate = 0;
- // Stalleness
- this.staleness = 0;
- };
- inherits(Server, EventEmitter);
- Object.assign(Server.prototype, SessionMixins);
- Object.defineProperty(Server.prototype, 'type', {
- enumerable: true,
- get: function() {
- return this._type;
- }
- });
- Object.defineProperty(Server.prototype, 'parserType', {
- enumerable: true,
- get: function() {
- return BSON.native ? 'c++' : 'js';
- }
- });
- Object.defineProperty(Server.prototype, 'logicalSessionTimeoutMinutes', {
- enumerable: true,
- get: function() {
- if (!this.ismaster) return null;
- return this.ismaster.logicalSessionTimeoutMinutes || null;
- }
- });
- // In single server deployments we track the clusterTime directly on the topology, however
- // in Mongos and ReplSet deployments we instead need to delegate the clusterTime up to the
- // tracking objects so we can ensure we are gossiping the maximum time received from the
- // server.
- Object.defineProperty(Server.prototype, 'clusterTime', {
- enumerable: true,
- set: function(clusterTime) {
- const settings = this.s.parent ? this.s.parent : this.s;
- resolveClusterTime(settings, clusterTime);
- },
- get: function() {
- const settings = this.s.parent ? this.s.parent : this.s;
- return settings.clusterTime || null;
- }
- });
- Server.enableServerAccounting = function() {
- serverAccounting = true;
- servers = {};
- };
- Server.disableServerAccounting = function() {
- serverAccounting = false;
- };
- Server.servers = function() {
- return servers;
- };
- Object.defineProperty(Server.prototype, 'name', {
- enumerable: true,
- get: function() {
- return this.s.options.host + ':' + this.s.options.port;
- }
- });
- function disconnectHandler(self, type, ns, cmd, options, callback) {
- // Topology is not connected, save the call in the provided store to be
- // Executed at some point when the handler deems it's reconnected
- if (
- !self.s.pool.isConnected() &&
- self.s.options.reconnect &&
- self.s.disconnectHandler != null &&
- !options.monitoring
- ) {
- self.s.disconnectHandler.add(type, ns, cmd, options, callback);
- return true;
- }
- // If we have no connection error
- if (!self.s.pool.isConnected()) {
- callback(new MongoError(f('no connection available to server %s', self.name)));
- return true;
- }
- }
- function monitoringProcess(self) {
- return function() {
- // Pool was destroyed do not continue process
- if (self.s.pool.isDestroyed()) return;
- // Emit monitoring Process event
- self.emit('monitoring', self);
- // Perform ismaster call
- // Get start time
- var start = new Date().getTime();
- // Execute the ismaster query
- self.command(
- 'admin.$cmd',
- { ismaster: true },
- {
- socketTimeout:
- typeof self.s.options.connectionTimeout !== 'number'
- ? 2000
- : self.s.options.connectionTimeout,
- monitoring: true
- },
- (err, result) => {
- // Set initial lastIsMasterMS
- self.lastIsMasterMS = new Date().getTime() - start;
- if (self.s.pool.isDestroyed()) return;
- // Update the ismaster view if we have a result
- if (result) {
- self.ismaster = result.result;
- }
- // Re-schedule the monitoring process
- self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
- }
- );
- };
- }
- var eventHandler = function(self, event) {
- return function(err, conn) {
- // Log information of received information if in info mode
- if (self.s.logger.isInfo()) {
- var object = err instanceof MongoError ? JSON.stringify(err) : {};
- self.s.logger.info(
- f('server %s fired event %s out with message %s', self.name, event, object)
- );
- }
- // Handle connect event
- if (event === 'connect') {
- self.initialConnect = false;
- self.ismaster = conn.ismaster;
- self.lastIsMasterMS = conn.lastIsMasterMS;
- if (conn.agreedCompressor) {
- self.s.pool.options.agreedCompressor = conn.agreedCompressor;
- }
- if (conn.zlibCompressionLevel) {
- self.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel;
- }
- if (conn.ismaster.$clusterTime) {
- const $clusterTime = conn.ismaster.$clusterTime;
- self.clusterTime = $clusterTime;
- }
- // It's a proxy change the type so
- // the wireprotocol will send $readPreference
- if (self.ismaster.msg === 'isdbgrid') {
- self._type = 'mongos';
- }
- // Have we defined self monitoring
- if (self.s.monitoring) {
- self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
- }
- // Emit server description changed if something listening
- sdam.emitServerDescriptionChanged(self, {
- address: self.name,
- arbiters: [],
- hosts: [],
- passives: [],
- type: sdam.getTopologyType(self)
- });
- if (!self.s.inTopology) {
- // Emit topology description changed if something listening
- sdam.emitTopologyDescriptionChanged(self, {
- topologyType: 'Single',
- servers: [
- {
- address: self.name,
- arbiters: [],
- hosts: [],
- passives: [],
- type: sdam.getTopologyType(self)
- }
- ]
- });
- }
- // Log the ismaster if available
- if (self.s.logger.isInfo()) {
- self.s.logger.info(
- f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster))
- );
- }
- // Emit connect
- self.emit('connect', self);
- } else if (
- event === 'error' ||
- event === 'parseError' ||
- event === 'close' ||
- event === 'timeout' ||
- event === 'reconnect' ||
- event === 'attemptReconnect' ||
- 'reconnectFailed'
- ) {
- // Remove server instance from accounting
- if (
- serverAccounting &&
- ['close', 'timeout', 'error', 'parseError', 'reconnectFailed'].indexOf(event) !== -1
- ) {
- // Emit toplogy opening event if not in topology
- if (!self.s.inTopology) {
- self.emit('topologyOpening', { topologyId: self.id });
- }
- delete servers[self.id];
- }
- if (event === 'close') {
- // Closing emits a server description changed event going to unknown.
- sdam.emitServerDescriptionChanged(self, {
- address: self.name,
- arbiters: [],
- hosts: [],
- passives: [],
- type: 'Unknown'
- });
- }
- // Reconnect failed return error
- if (event === 'reconnectFailed') {
- self.emit('reconnectFailed', err);
- // Emit error if any listeners
- if (self.listeners('error').length > 0) {
- self.emit('error', err);
- }
- // Terminate
- return;
- }
- // On first connect fail
- if (
- self.s.pool.state === 'disconnected' &&
- self.initialConnect &&
- ['close', 'timeout', 'error', 'parseError'].indexOf(event) !== -1
- ) {
- self.initialConnect = false;
- return self.emit(
- 'error',
- new MongoNetworkError(
- f('failed to connect to server [%s] on first connect [%s]', self.name, err)
- )
- );
- }
- // Reconnect event, emit the server
- if (event === 'reconnect') {
- // Reconnecting emits a server description changed event going from unknown to the
- // current server type.
- sdam.emitServerDescriptionChanged(self, {
- address: self.name,
- arbiters: [],
- hosts: [],
- passives: [],
- type: sdam.getTopologyType(self)
- });
- return self.emit(event, self);
- }
- // Emit the event
- self.emit(event, err);
- }
- };
- };
- /**
- * Initiate server connect
- */
- Server.prototype.connect = function(options) {
- var self = this;
- options = options || {};
- // Set the connections
- if (serverAccounting) servers[this.id] = this;
- // Do not allow connect to be called on anything that's not disconnected
- if (self.s.pool && !self.s.pool.isDisconnected() && !self.s.pool.isDestroyed()) {
- throw new MongoError(f('server instance in invalid state %s', self.s.pool.state));
- }
- // Create a pool
- self.s.pool = new Pool(this, Object.assign(self.s.options, options, { bson: this.s.bson }));
- // Set up listeners
- self.s.pool.on('close', eventHandler(self, 'close'));
- self.s.pool.on('error', eventHandler(self, 'error'));
- self.s.pool.on('timeout', eventHandler(self, 'timeout'));
- self.s.pool.on('parseError', eventHandler(self, 'parseError'));
- self.s.pool.on('connect', eventHandler(self, 'connect'));
- self.s.pool.on('reconnect', eventHandler(self, 'reconnect'));
- self.s.pool.on('reconnectFailed', eventHandler(self, 'reconnectFailed'));
- // Set up listeners for command monitoring
- relayEvents(self.s.pool, self, ['commandStarted', 'commandSucceeded', 'commandFailed']);
- // Emit toplogy opening event if not in topology
- if (!self.s.inTopology) {
- this.emit('topologyOpening', { topologyId: self.id });
- }
- // Emit opening server event
- self.emit('serverOpening', {
- topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
- address: self.name
- });
- self.s.pool.connect();
- };
- /**
- * Authenticate the topology.
- * @method
- * @param {MongoCredentials} credentials The credentials for authentication we are using
- * @param {authResultCallback} callback A callback function
- */
- Server.prototype.auth = function(credentials, callback) {
- if (typeof callback === 'function') callback(null, null);
- };
- /**
- * Get the server description
- * @method
- * @return {object}
- */
- Server.prototype.getDescription = function() {
- var ismaster = this.ismaster || {};
- var description = {
- type: sdam.getTopologyType(this),
- address: this.name
- };
- // Add fields if available
- if (ismaster.hosts) description.hosts = ismaster.hosts;
- if (ismaster.arbiters) description.arbiters = ismaster.arbiters;
- if (ismaster.passives) description.passives = ismaster.passives;
- if (ismaster.setName) description.setName = ismaster.setName;
- return description;
- };
- /**
- * Returns the last known ismaster document for this server
- * @method
- * @return {object}
- */
- Server.prototype.lastIsMaster = function() {
- return this.ismaster;
- };
- /**
- * Unref all connections belong to this server
- * @method
- */
- Server.prototype.unref = function() {
- this.s.pool.unref();
- };
- /**
- * Figure out if the server is connected
- * @method
- * @return {boolean}
- */
- Server.prototype.isConnected = function() {
- if (!this.s.pool) return false;
- return this.s.pool.isConnected();
- };
- /**
- * Figure out if the server instance was destroyed by calling destroy
- * @method
- * @return {boolean}
- */
- Server.prototype.isDestroyed = function() {
- if (!this.s.pool) return false;
- return this.s.pool.isDestroyed();
- };
- function basicWriteValidations(self) {
- if (!self.s.pool) return new MongoError('server instance is not connected');
- if (self.s.pool.isDestroyed()) return new MongoError('server instance pool was destroyed');
- }
- function basicReadValidations(self, options) {
- basicWriteValidations(self, options);
- if (options.readPreference && !(options.readPreference instanceof ReadPreference)) {
- throw new Error('readPreference must be an instance of ReadPreference');
- }
- }
- /**
- * Execute a command
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {object} cmd The command hash
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.checkKeys=false] Specify if the bson parser should validate keys.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- Server.prototype.command = function(ns, cmd, options, callback) {
- var self = this;
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options || {});
- }
- var result = basicReadValidations(self, options);
- if (result) return callback(result);
- // Clone the options
- options = Object.assign({}, options, { wireProtocolCommand: false });
- // Debug log
- if (self.s.logger.isDebug())
- self.s.logger.debug(
- f(
- 'executing command [%s] against %s',
- JSON.stringify({
- ns: ns,
- cmd: cmd,
- options: debugOptions(debugFields, options)
- }),
- self.name
- )
- );
- // If we are not connected or have a disconnectHandler specified
- if (disconnectHandler(self, 'command', ns, cmd, options, callback)) return;
- // error if collation not supported
- if (collationNotSupported(this, cmd)) {
- return callback(new MongoError(`server ${this.name} does not support collation`));
- }
- wireProtocol.command(self, ns, cmd, options, callback);
- };
- /**
- * Insert one or more documents
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of documents to insert
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- Server.prototype.insert = function(ns, ops, options, callback) {
- var self = this;
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options || {});
- }
- var result = basicWriteValidations(self, options);
- if (result) return callback(result);
- // If we are not connected or have a disconnectHandler specified
- if (disconnectHandler(self, 'insert', ns, ops, options, callback)) return;
- // Setup the docs as an array
- ops = Array.isArray(ops) ? ops : [ops];
- // Execute write
- return wireProtocol.insert(self, ns, ops, options, callback);
- };
- /**
- * Perform one or more update operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of updates
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- Server.prototype.update = function(ns, ops, options, callback) {
- var self = this;
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options || {});
- }
- var result = basicWriteValidations(self, options);
- if (result) return callback(result);
- // If we are not connected or have a disconnectHandler specified
- if (disconnectHandler(self, 'update', ns, ops, options, callback)) return;
- // error if collation not supported
- if (collationNotSupported(this, options)) {
- return callback(new MongoError(`server ${this.name} does not support collation`));
- }
- // Setup the docs as an array
- ops = Array.isArray(ops) ? ops : [ops];
- // Execute write
- return wireProtocol.update(self, ns, ops, options, callback);
- };
- /**
- * Perform one or more remove operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of removes
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- Server.prototype.remove = function(ns, ops, options, callback) {
- var self = this;
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options || {});
- }
- var result = basicWriteValidations(self, options);
- if (result) return callback(result);
- // If we are not connected or have a disconnectHandler specified
- if (disconnectHandler(self, 'remove', ns, ops, options, callback)) return;
- // error if collation not supported
- if (collationNotSupported(this, options)) {
- return callback(new MongoError(`server ${this.name} does not support collation`));
- }
- // Setup the docs as an array
- ops = Array.isArray(ops) ? ops : [ops];
- // Execute write
- return wireProtocol.remove(self, ns, ops, options, callback);
- };
- /**
- * Get a new cursor
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {object|Long} cmd Can be either a command returning a cursor or a cursorId
- * @param {object} [options] Options for the cursor
- * @param {object} [options.batchSize=0] Batchsize for the operation
- * @param {array} [options.documents=[]] Initial documents list for cursor
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {object} [options.topology] The internal topology of the created cursor
- * @returns {Cursor}
- */
- Server.prototype.cursor = function(ns, cmd, options) {
- options = options || {};
- const topology = options.topology || this;
- // Set up final cursor type
- var FinalCursor = options.cursorFactory || this.s.Cursor;
- // Return the cursor
- return new FinalCursor(this.s.bson, ns, cmd, options, topology, this.s.options);
- };
- /**
- * Compare two server instances
- * @method
- * @param {Server} server Server to compare equality against
- * @return {boolean}
- */
- Server.prototype.equals = function(server) {
- if (typeof server === 'string') return this.name.toLowerCase() === server.toLowerCase();
- if (server.name) return this.name.toLowerCase() === server.name.toLowerCase();
- return false;
- };
- /**
- * All raw connections
- * @method
- * @return {Connection[]}
- */
- Server.prototype.connections = function() {
- return this.s.pool.allConnections();
- };
- /**
- * Selects a server
- * @method
- * @param {function} selector Unused
- * @param {ReadPreference} [options.readPreference] Unused
- * @param {ClientSession} [options.session] Unused
- * @return {Server}
- */
- Server.prototype.selectServer = function(selector, options, callback) {
- if (typeof selector === 'function' && typeof callback === 'undefined')
- (callback = selector), (selector = undefined), (options = {});
- if (typeof options === 'function')
- (callback = options), (options = selector), (selector = undefined);
- callback(null, this);
- };
- var listeners = ['close', 'error', 'timeout', 'parseError', 'connect'];
- /**
- * Destroy the server connection
- * @method
- * @param {boolean} [options.emitClose=false] Emit close event on destroy
- * @param {boolean} [options.emitDestroy=false] Emit destroy event on destroy
- * @param {boolean} [options.force=false] Force destroy the pool
- */
- Server.prototype.destroy = function(options, callback) {
- if (this._destroyed) {
- if (typeof callback === 'function') callback(null, null);
- return;
- }
- options = options || {};
- var self = this;
- // Set the connections
- if (serverAccounting) delete servers[this.id];
- // Destroy the monitoring process if any
- if (this.monitoringProcessId) {
- clearTimeout(this.monitoringProcessId);
- }
- // No pool, return
- if (!self.s.pool) {
- this._destroyed = true;
- if (typeof callback === 'function') callback(null, null);
- return;
- }
- // Emit close event
- if (options.emitClose) {
- self.emit('close', self);
- }
- // Emit destroy event
- if (options.emitDestroy) {
- self.emit('destroy', self);
- }
- // Remove all listeners
- listeners.forEach(function(event) {
- self.s.pool.removeAllListeners(event);
- });
- // Emit opening server event
- if (self.listeners('serverClosed').length > 0)
- self.emit('serverClosed', {
- topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
- address: self.name
- });
- // Emit toplogy opening event if not in topology
- if (self.listeners('topologyClosed').length > 0 && !self.s.inTopology) {
- self.emit('topologyClosed', { topologyId: self.id });
- }
- if (self.s.logger.isDebug()) {
- self.s.logger.debug(f('destroy called on server %s', self.name));
- }
- // Destroy the pool
- this.s.pool.destroy(options.force, callback);
- this._destroyed = true;
- };
- /**
- * A server connect event, used to verify that the connection is up and running
- *
- * @event Server#connect
- * @type {Server}
- */
- /**
- * A server reconnect event, used to verify that the server topology has reconnected
- *
- * @event Server#reconnect
- * @type {Server}
- */
- /**
- * A server opening SDAM monitoring event
- *
- * @event Server#serverOpening
- * @type {object}
- */
- /**
- * A server closed SDAM monitoring event
- *
- * @event Server#serverClosed
- * @type {object}
- */
- /**
- * A server description SDAM change monitoring event
- *
- * @event Server#serverDescriptionChanged
- * @type {object}
- */
- /**
- * A topology open SDAM event
- *
- * @event Server#topologyOpening
- * @type {object}
- */
- /**
- * A topology closed SDAM event
- *
- * @event Server#topologyClosed
- * @type {object}
- */
- /**
- * A topology structure SDAM change event
- *
- * @event Server#topologyDescriptionChanged
- * @type {object}
- */
- /**
- * Server reconnect failed
- *
- * @event Server#reconnectFailed
- * @type {Error}
- */
- /**
- * Server connection pool closed
- *
- * @event Server#close
- * @type {object}
- */
- /**
- * Server connection pool caused an error
- *
- * @event Server#error
- * @type {Error}
- */
- /**
- * Server destroyed was called
- *
- * @event Server#destroy
- * @type {Server}
- */
- module.exports = Server;
|