monitoring.js 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. 'use strict';
  2. const ServerDescription = require('./server_description').ServerDescription;
  3. const calculateDurationInMs = require('../utils').calculateDurationInMs;
  4. /**
  5. * Published when server description changes, but does NOT include changes to the RTT.
  6. *
  7. * @property {Object} topologyId A unique identifier for the topology
  8. * @property {ServerAddress} address The address (host/port pair) of the server
  9. * @property {ServerDescription} previousDescription The previous server description
  10. * @property {ServerDescription} newDescription The new server description
  11. */
  12. class ServerDescriptionChangedEvent {
  13. constructor(topologyId, address, previousDescription, newDescription) {
  14. Object.assign(this, { topologyId, address, previousDescription, newDescription });
  15. }
  16. }
  17. /**
  18. * Published when server is initialized.
  19. *
  20. * @property {Object} topologyId A unique identifier for the topology
  21. * @property {ServerAddress} address The address (host/port pair) of the server
  22. */
  23. class ServerOpeningEvent {
  24. constructor(topologyId, address) {
  25. Object.assign(this, { topologyId, address });
  26. }
  27. }
  28. /**
  29. * Published when server is closed.
  30. *
  31. * @property {ServerAddress} address The address (host/port pair) of the server
  32. * @property {Object} topologyId A unique identifier for the topology
  33. */
  34. class ServerClosedEvent {
  35. constructor(topologyId, address) {
  36. Object.assign(this, { topologyId, address });
  37. }
  38. }
  39. /**
  40. * Published when topology description changes.
  41. *
  42. * @property {Object} topologyId
  43. * @property {TopologyDescription} previousDescription The old topology description
  44. * @property {TopologyDescription} newDescription The new topology description
  45. */
  46. class TopologyDescriptionChangedEvent {
  47. constructor(topologyId, previousDescription, newDescription) {
  48. Object.assign(this, { topologyId, previousDescription, newDescription });
  49. }
  50. }
  51. /**
  52. * Published when topology is initialized.
  53. *
  54. * @param {Object} topologyId A unique identifier for the topology
  55. */
  56. class TopologyOpeningEvent {
  57. constructor(topologyId) {
  58. Object.assign(this, { topologyId });
  59. }
  60. }
  61. /**
  62. * Published when topology is closed.
  63. *
  64. * @param {Object} topologyId A unique identifier for the topology
  65. */
  66. class TopologyClosedEvent {
  67. constructor(topologyId) {
  68. Object.assign(this, { topologyId });
  69. }
  70. }
  71. /**
  72. * Fired when the server monitor’s ismaster command is started - immediately before
  73. * the ismaster command is serialized into raw BSON and written to the socket.
  74. *
  75. * @property {Object} connectionId The connection id for the command
  76. */
  77. class ServerHeartbeatStartedEvent {
  78. constructor(connectionId) {
  79. Object.assign(this, { connectionId });
  80. }
  81. }
  82. /**
  83. * Fired when the server monitor’s ismaster succeeds.
  84. *
  85. * @param {Number} duration The execution time of the event in ms
  86. * @param {Object} reply The command reply
  87. * @param {Object} connectionId The connection id for the command
  88. */
  89. class ServerHeartbeatSucceededEvent {
  90. constructor(duration, reply, connectionId) {
  91. Object.assign(this, { duration, reply, connectionId });
  92. }
  93. }
  94. /**
  95. * Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception.
  96. *
  97. * @param {Number} duration The execution time of the event in ms
  98. * @param {MongoError|Object} failure The command failure
  99. * @param {Object} connectionId The connection id for the command
  100. */
  101. class ServerHeartbeatFailedEvent {
  102. constructor(duration, failure, connectionId) {
  103. Object.assign(this, { duration, failure, connectionId });
  104. }
  105. }
  106. /**
  107. * Performs a server check as described by the SDAM spec.
  108. *
  109. * NOTE: This method automatically reschedules itself, so that there is always an active
  110. * monitoring process
  111. *
  112. * @param {Server} server The server to monitor
  113. */
  114. function monitorServer(server, options) {
  115. options = options || {};
  116. const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000;
  117. if (options.initial === true) {
  118. server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
  119. return;
  120. }
  121. // executes a single check of a server
  122. const checkServer = callback => {
  123. let start = process.hrtime();
  124. // emit a signal indicating we have started the heartbeat
  125. server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));
  126. // NOTE: legacy monitoring event
  127. process.nextTick(() => server.emit('monitoring', server));
  128. server.command(
  129. 'admin.$cmd',
  130. { ismaster: true },
  131. {
  132. monitoring: true,
  133. socketTimeout: server.s.options.connectionTimeout || 2000
  134. },
  135. (err, result) => {
  136. let duration = calculateDurationInMs(start);
  137. if (err) {
  138. server.emit(
  139. 'serverHeartbeatFailed',
  140. new ServerHeartbeatFailedEvent(duration, err, server.name)
  141. );
  142. return callback(err, null);
  143. }
  144. const isMaster = result.result;
  145. server.emit(
  146. 'serverHeartbeatSucceded',
  147. new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
  148. );
  149. return callback(null, isMaster);
  150. }
  151. );
  152. };
  153. const successHandler = isMaster => {
  154. server.s.monitoring = false;
  155. // emit an event indicating that our description has changed
  156. server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
  157. // schedule the next monitoring process
  158. server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
  159. };
  160. // run the actual monitoring loop
  161. server.s.monitoring = true;
  162. checkServer((err, isMaster) => {
  163. if (!err) {
  164. successHandler(isMaster);
  165. return;
  166. }
  167. // According to the SDAM specification's "Network error during server check" section, if
  168. // an ismaster call fails we reset the server's pool. If a server was once connected,
  169. // change its type to `Unknown` only after retrying once.
  170. server.s.pool.reset(() => {
  171. // otherwise re-attempt monitoring once
  172. checkServer((error, isMaster) => {
  173. if (error) {
  174. server.s.monitoring = false;
  175. // we revert to an `Unknown` by emitting a default description with no isMaster
  176. server.emit(
  177. 'descriptionReceived',
  178. new ServerDescription(server.description.address, null, { error })
  179. );
  180. // we do not reschedule monitoring in this case
  181. return;
  182. }
  183. successHandler(isMaster);
  184. });
  185. });
  186. });
  187. }
  188. module.exports = {
  189. ServerDescriptionChangedEvent,
  190. ServerOpeningEvent,
  191. ServerClosedEvent,
  192. TopologyDescriptionChangedEvent,
  193. TopologyOpeningEvent,
  194. TopologyClosedEvent,
  195. ServerHeartbeatStartedEvent,
  196. ServerHeartbeatSucceededEvent,
  197. ServerHeartbeatFailedEvent,
  198. monitorServer
  199. };