123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- 'use strict';
- const ServerDescription = require('./server_description').ServerDescription;
- const calculateDurationInMs = require('../utils').calculateDurationInMs;
- /**
- * Published when server description changes, but does NOT include changes to the RTT.
- *
- * @property {Object} topologyId A unique identifier for the topology
- * @property {ServerAddress} address The address (host/port pair) of the server
- * @property {ServerDescription} previousDescription The previous server description
- * @property {ServerDescription} newDescription The new server description
- */
- class ServerDescriptionChangedEvent {
- constructor(topologyId, address, previousDescription, newDescription) {
- Object.assign(this, { topologyId, address, previousDescription, newDescription });
- }
- }
- /**
- * Published when server is initialized.
- *
- * @property {Object} topologyId A unique identifier for the topology
- * @property {ServerAddress} address The address (host/port pair) of the server
- */
- class ServerOpeningEvent {
- constructor(topologyId, address) {
- Object.assign(this, { topologyId, address });
- }
- }
- /**
- * Published when server is closed.
- *
- * @property {ServerAddress} address The address (host/port pair) of the server
- * @property {Object} topologyId A unique identifier for the topology
- */
- class ServerClosedEvent {
- constructor(topologyId, address) {
- Object.assign(this, { topologyId, address });
- }
- }
- /**
- * Published when topology description changes.
- *
- * @property {Object} topologyId
- * @property {TopologyDescription} previousDescription The old topology description
- * @property {TopologyDescription} newDescription The new topology description
- */
- class TopologyDescriptionChangedEvent {
- constructor(topologyId, previousDescription, newDescription) {
- Object.assign(this, { topologyId, previousDescription, newDescription });
- }
- }
- /**
- * Published when topology is initialized.
- *
- * @param {Object} topologyId A unique identifier for the topology
- */
- class TopologyOpeningEvent {
- constructor(topologyId) {
- Object.assign(this, { topologyId });
- }
- }
- /**
- * Published when topology is closed.
- *
- * @param {Object} topologyId A unique identifier for the topology
- */
- class TopologyClosedEvent {
- constructor(topologyId) {
- Object.assign(this, { topologyId });
- }
- }
- /**
- * Fired when the server monitor’s ismaster command is started - immediately before
- * the ismaster command is serialized into raw BSON and written to the socket.
- *
- * @property {Object} connectionId The connection id for the command
- */
- class ServerHeartbeatStartedEvent {
- constructor(connectionId) {
- Object.assign(this, { connectionId });
- }
- }
- /**
- * Fired when the server monitor’s ismaster succeeds.
- *
- * @param {Number} duration The execution time of the event in ms
- * @param {Object} reply The command reply
- * @param {Object} connectionId The connection id for the command
- */
- class ServerHeartbeatSucceededEvent {
- constructor(duration, reply, connectionId) {
- Object.assign(this, { duration, reply, connectionId });
- }
- }
- /**
- * Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception.
- *
- * @param {Number} duration The execution time of the event in ms
- * @param {MongoError|Object} failure The command failure
- * @param {Object} connectionId The connection id for the command
- */
- class ServerHeartbeatFailedEvent {
- constructor(duration, failure, connectionId) {
- Object.assign(this, { duration, failure, connectionId });
- }
- }
- /**
- * Performs a server check as described by the SDAM spec.
- *
- * NOTE: This method automatically reschedules itself, so that there is always an active
- * monitoring process
- *
- * @param {Server} server The server to monitor
- */
- function monitorServer(server, options) {
- options = options || {};
- const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000;
- if (options.initial === true) {
- server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
- return;
- }
- // executes a single check of a server
- const checkServer = callback => {
- let start = process.hrtime();
- // emit a signal indicating we have started the heartbeat
- server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));
- // NOTE: legacy monitoring event
- process.nextTick(() => server.emit('monitoring', server));
- server.command(
- 'admin.$cmd',
- { ismaster: true },
- {
- monitoring: true,
- socketTimeout: server.s.options.connectionTimeout || 2000
- },
- (err, result) => {
- let duration = calculateDurationInMs(start);
- if (err) {
- server.emit(
- 'serverHeartbeatFailed',
- new ServerHeartbeatFailedEvent(duration, err, server.name)
- );
- return callback(err, null);
- }
- const isMaster = result.result;
- server.emit(
- 'serverHeartbeatSucceded',
- new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
- );
- return callback(null, isMaster);
- }
- );
- };
- const successHandler = isMaster => {
- server.s.monitoring = false;
- // emit an event indicating that our description has changed
- server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
- // schedule the next monitoring process
- server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
- };
- // run the actual monitoring loop
- server.s.monitoring = true;
- checkServer((err, isMaster) => {
- if (!err) {
- successHandler(isMaster);
- return;
- }
- // According to the SDAM specification's "Network error during server check" section, if
- // an ismaster call fails we reset the server's pool. If a server was once connected,
- // change its type to `Unknown` only after retrying once.
- server.s.pool.reset(() => {
- // otherwise re-attempt monitoring once
- checkServer((error, isMaster) => {
- if (error) {
- server.s.monitoring = false;
- // we revert to an `Unknown` by emitting a default description with no isMaster
- server.emit(
- 'descriptionReceived',
- new ServerDescription(server.description.address, null, { error })
- );
- // we do not reschedule monitoring in this case
- return;
- }
- successHandler(isMaster);
- });
- });
- });
- }
- module.exports = {
- ServerDescriptionChangedEvent,
- ServerOpeningEvent,
- ServerClosedEvent,
- TopologyDescriptionChangedEvent,
- TopologyOpeningEvent,
- TopologyClosedEvent,
- ServerHeartbeatStartedEvent,
- ServerHeartbeatSucceededEvent,
- ServerHeartbeatFailedEvent,
- monitorServer
- };
|