shared.js 11 KB


  1. 'use strict';
  2. const os = require('os');
  3. const f = require('util').format;
  4. const ReadPreference = require('./read_preference');
  5. const Buffer = require('safe-buffer').Buffer;
  6. /**
  7. * Emit event if it exists
  8. * @method
  9. */
  10. function emitSDAMEvent(self, event, description) {
  11. if (self.listeners(event).length > 0) {
  12. self.emit(event, description);
  13. }
  14. }
  15. // Get package.json variable
  16. var driverVersion = require('../../package.json').version;
  17. var nodejsversion = f('Node.js %s, %s', process.version, os.endianness());
  18. var type = os.type();
  19. var name = process.platform;
  20. var architecture = process.arch;
  21. var release = os.release();
  22. function createClientInfo(options) {
  23. // Build default client information
  24. var clientInfo = options.clientInfo
  25. ? clone(options.clientInfo)
  26. : {
  27. driver: {
  28. name: 'nodejs-core',
  29. version: driverVersion
  30. },
  31. os: {
  32. type: type,
  33. name: name,
  34. architecture: architecture,
  35. version: release
  36. }
  37. };
  38. // Is platform specified
  39. if (clientInfo.platform && clientInfo.platform.indexOf('mongodb-core') === -1) {
  40. clientInfo.platform = f('%s, mongodb-core: %s', clientInfo.platform, driverVersion);
  41. } else if (!clientInfo.platform) {
  42. clientInfo.platform = nodejsversion;
  43. }
  44. // Do we have an application specific string
  45. if (options.appname) {
  46. // Cut at 128 bytes
  47. var buffer = Buffer.from(options.appname);
  48. // Return the truncated appname
  49. var appname = buffer.length > 128 ? buffer.slice(0, 128).toString('utf8') : options.appname;
  50. // Add to the clientInfo
  51. clientInfo.application = { name: appname };
  52. }
  53. return clientInfo;
  54. }
  55. function createCompressionInfo(options) {
  56. if (!options.compression || !options.compression.compressors) {
  57. return [];
  58. }
  59. // Check that all supplied compressors are valid
  60. options.compression.compressors.forEach(function(compressor) {
  61. if (compressor !== 'snappy' && compressor !== 'zlib') {
  62. throw new Error('compressors must be at least one of snappy or zlib');
  63. }
  64. });
  65. return options.compression.compressors;
  66. }
  67. function clone(object) {
  68. return JSON.parse(JSON.stringify(object));
  69. }
  70. var getPreviousDescription = function(self) {
  71. if (!self.s.serverDescription) {
  72. self.s.serverDescription = {
  73. address: self.name,
  74. arbiters: [],
  75. hosts: [],
  76. passives: [],
  77. type: 'Unknown'
  78. };
  79. }
  80. return self.s.serverDescription;
  81. };
  82. var emitServerDescriptionChanged = function(self, description) {
  83. if (self.listeners('serverDescriptionChanged').length > 0) {
  84. // Emit the server description changed events
  85. self.emit('serverDescriptionChanged', {
  86. topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
  87. address: self.name,
  88. previousDescription: getPreviousDescription(self),
  89. newDescription: description
  90. });
  91. self.s.serverDescription = description;
  92. }
  93. };
  94. var getPreviousTopologyDescription = function(self) {
  95. if (!self.s.topologyDescription) {
  96. self.s.topologyDescription = {
  97. topologyType: 'Unknown',
  98. servers: [
  99. {
  100. address: self.name,
  101. arbiters: [],
  102. hosts: [],
  103. passives: [],
  104. type: 'Unknown'
  105. }
  106. ]
  107. };
  108. }
  109. return self.s.topologyDescription;
  110. };
  111. var emitTopologyDescriptionChanged = function(self, description) {
  112. if (self.listeners('topologyDescriptionChanged').length > 0) {
  113. // Emit the server description changed events
  114. self.emit('topologyDescriptionChanged', {
  115. topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
  116. address: self.name,
  117. previousDescription: getPreviousTopologyDescription(self),
  118. newDescription: description
  119. });
  120. self.s.serverDescription = description;
  121. }
  122. };
  123. var changedIsMaster = function(self, currentIsmaster, ismaster) {
  124. var currentType = getTopologyType(self, currentIsmaster);
  125. var newType = getTopologyType(self, ismaster);
  126. if (newType !== currentType) return true;
  127. return false;
  128. };
  129. var getTopologyType = function(self, ismaster) {
  130. if (!ismaster) {
  131. ismaster = self.ismaster;
  132. }
  133. if (!ismaster) return 'Unknown';
  134. if (ismaster.ismaster && ismaster.msg === 'isdbgrid') return 'Mongos';
  135. if (ismaster.ismaster && !ismaster.hosts) return 'Standalone';
  136. if (ismaster.ismaster) return 'RSPrimary';
  137. if (ismaster.secondary) return 'RSSecondary';
  138. if (ismaster.arbiterOnly) return 'RSArbiter';
  139. return 'Unknown';
  140. };
  141. var inquireServerState = function(self) {
  142. return function(callback) {
  143. if (self.s.state === 'destroyed') return;
  144. // Record response time
  145. var start = new Date().getTime();
  146. // emitSDAMEvent
  147. emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: self.name });
  148. // Attempt to execute ismaster command
  149. self.command('admin.$cmd', { ismaster: true }, { monitoring: true }, function(err, r) {
  150. if (!err) {
  151. // Legacy event sender
  152. self.emit('ismaster', r, self);
  153. // Calculate latencyMS
  154. var latencyMS = new Date().getTime() - start;
  155. // Server heart beat event
  156. emitSDAMEvent(self, 'serverHeartbeatSucceeded', {
  157. durationMS: latencyMS,
  158. reply: r.result,
  159. connectionId: self.name
  160. });
  161. // Did the server change
  162. if (changedIsMaster(self, self.s.ismaster, r.result)) {
  163. // Emit server description changed if something listening
  164. emitServerDescriptionChanged(self, {
  165. address: self.name,
  166. arbiters: [],
  167. hosts: [],
  168. passives: [],
  169. type: !self.s.inTopology ? 'Standalone' : getTopologyType(self)
  170. });
  171. }
  172. // Updat ismaster view
  173. self.s.ismaster = r.result;
  174. // Set server response time
  175. self.s.isMasterLatencyMS = latencyMS;
  176. } else {
  177. emitSDAMEvent(self, 'serverHeartbeatFailed', {
  178. durationMS: latencyMS,
  179. failure: err,
  180. connectionId: self.name
  181. });
  182. }
  183. // Peforming an ismaster monitoring callback operation
  184. if (typeof callback === 'function') {
  185. return callback(err, r);
  186. }
  187. // Perform another sweep
  188. self.s.inquireServerStateTimeout = setTimeout(inquireServerState(self), self.s.haInterval);
  189. });
  190. };
  191. };
  192. //
  193. // Clone the options
  194. var cloneOptions = function(options) {
  195. var opts = {};
  196. for (var name in options) {
  197. opts[name] = options[name];
  198. }
  199. return opts;
  200. };
  201. function Interval(fn, time) {
  202. var timer = false;
  203. this.start = function() {
  204. if (!this.isRunning()) {
  205. timer = setInterval(fn, time);
  206. }
  207. return this;
  208. };
  209. this.stop = function() {
  210. clearInterval(timer);
  211. timer = false;
  212. return this;
  213. };
  214. this.isRunning = function() {
  215. return timer !== false;
  216. };
  217. }
  218. function Timeout(fn, time) {
  219. var timer = false;
  220. this.start = function() {
  221. if (!this.isRunning()) {
  222. timer = setTimeout(fn, time);
  223. }
  224. return this;
  225. };
  226. this.stop = function() {
  227. clearTimeout(timer);
  228. timer = false;
  229. return this;
  230. };
  231. this.isRunning = function() {
  232. if (timer && timer._called) return false;
  233. return timer !== false;
  234. };
  235. }
  236. function diff(previous, current) {
  237. // Difference document
  238. var diff = {
  239. servers: []
  240. };
  241. // Previous entry
  242. if (!previous) {
  243. previous = { servers: [] };
  244. }
  245. // Check if we have any previous servers missing in the current ones
  246. for (var i = 0; i < previous.servers.length; i++) {
  247. var found = false;
  248. for (var j = 0; j < current.servers.length; j++) {
  249. if (current.servers[j].address.toLowerCase() === previous.servers[i].address.toLowerCase()) {
  250. found = true;
  251. break;
  252. }
  253. }
  254. if (!found) {
  255. // Add to the diff
  256. diff.servers.push({
  257. address: previous.servers[i].address,
  258. from: previous.servers[i].type,
  259. to: 'Unknown'
  260. });
  261. }
  262. }
  263. // Check if there are any severs that don't exist
  264. for (j = 0; j < current.servers.length; j++) {
  265. found = false;
  266. // Go over all the previous servers
  267. for (i = 0; i < previous.servers.length; i++) {
  268. if (previous.servers[i].address.toLowerCase() === current.servers[j].address.toLowerCase()) {
  269. found = true;
  270. break;
  271. }
  272. }
  273. // Add the server to the diff
  274. if (!found) {
  275. diff.servers.push({
  276. address: current.servers[j].address,
  277. from: 'Unknown',
  278. to: current.servers[j].type
  279. });
  280. }
  281. }
  282. // Got through all the servers
  283. for (i = 0; i < previous.servers.length; i++) {
  284. var prevServer = previous.servers[i];
  285. // Go through all current servers
  286. for (j = 0; j < current.servers.length; j++) {
  287. var currServer = current.servers[j];
  288. // Matching server
  289. if (prevServer.address.toLowerCase() === currServer.address.toLowerCase()) {
  290. // We had a change in state
  291. if (prevServer.type !== currServer.type) {
  292. diff.servers.push({
  293. address: prevServer.address,
  294. from: prevServer.type,
  295. to: currServer.type
  296. });
  297. }
  298. }
  299. }
  300. }
  301. // Return difference
  302. return diff;
  303. }
  304. /**
  305. * Shared function to determine clusterTime for a given topology
  306. *
  307. * @param {*} topology
  308. * @param {*} clusterTime
  309. */
  310. function resolveClusterTime(topology, $clusterTime) {
  311. if (topology.clusterTime == null) {
  312. topology.clusterTime = $clusterTime;
  313. } else {
  314. if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
  315. topology.clusterTime = $clusterTime;
  316. }
  317. }
  318. }
  319. // NOTE: this is a temporary move until the topologies can be more formally refactored
  320. // to share code.
  321. const SessionMixins = {
  322. endSessions: function(sessions, callback) {
  323. if (!Array.isArray(sessions)) {
  324. sessions = [sessions];
  325. }
  326. // TODO:
  327. // When connected to a sharded cluster the endSessions command
  328. // can be sent to any mongos. When connected to a replica set the
  329. // endSessions command MUST be sent to the primary if the primary
  330. // is available, otherwise it MUST be sent to any available secondary.
  331. // Is it enough to use: ReadPreference.primaryPreferred ?
  332. this.command(
  333. 'admin.$cmd',
  334. { endSessions: sessions },
  335. { readPreference: ReadPreference.primaryPreferred },
  336. () => {
  337. // intentionally ignored, per spec
  338. if (typeof callback === 'function') callback();
  339. }
  340. );
  341. }
  342. };
  343. const RETRYABLE_WIRE_VERSION = 6;
  344. /**
  345. * Determines whether the provided topology supports retryable writes
  346. *
  347. * @param {Mongos|Replset} topology
  348. */
  349. const isRetryableWritesSupported = function(topology) {
  350. const maxWireVersion = topology.lastIsMaster().maxWireVersion;
  351. if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
  352. return false;
  353. }
  354. if (!topology.logicalSessionTimeoutMinutes) {
  355. return false;
  356. }
  357. return true;
  358. };
  359. module.exports.SessionMixins = SessionMixins;
  360. module.exports.resolveClusterTime = resolveClusterTime;
  361. module.exports.inquireServerState = inquireServerState;
  362. module.exports.getTopologyType = getTopologyType;
  363. module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged;
  364. module.exports.emitTopologyDescriptionChanged = emitTopologyDescriptionChanged;
  365. module.exports.cloneOptions = cloneOptions;
  366. module.exports.createClientInfo = createClientInfo;
  367. module.exports.createCompressionInfo = createCompressionInfo;
  368. module.exports.clone = clone;
  369. module.exports.diff = diff;
  370. module.exports.Interval = Interval;
  371. module.exports.Timeout = Timeout;
  372. module.exports.isRetryableWritesSupported = isRetryableWritesSupported;