sessions.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const MongoNetworkError = require('./error').MongoNetworkError;
  10. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  11. const Transaction = require('./transactions').Transaction;
  12. const TxnState = require('./transactions').TxnState;
  13. const isPromiseLike = require('./utils').isPromiseLike;
  14. const ReadPreference = require('./topologies/read_preference');
  15. const isTransactionCommand = require('./transactions').isTransactionCommand;
  16. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  17. function assertAlive(session, callback) {
  18. if (session.serverSession == null) {
  19. const error = new MongoError('Cannot use a session that has ended');
  20. if (typeof callback === 'function') {
  21. callback(error, null);
  22. return false;
  23. }
  24. throw error;
  25. }
  26. return true;
  27. }
  28. /**
  29. * Options to pass when creating a Client Session
  30. * @typedef {Object} SessionOptions
  31. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  32. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  33. */
  34. /**
  35. * A BSON document reflecting the lsid of a {@link ClientSession}
  36. * @typedef {Object} SessionId
  37. */
  38. /**
  39. * A class representing a client session on the server
  40. * WARNING: not meant to be instantiated directly.
  41. * @class
  42. * @hideconstructor
  43. */
  44. class ClientSession extends EventEmitter {
  45. /**
  46. * Create a client session.
  47. * WARNING: not meant to be instantiated directly
  48. *
  49. * @param {Topology} topology The current client's topology (Internal Class)
  50. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  51. * @param {SessionOptions} [options] Optional settings
  52. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  53. */
  54. constructor(topology, sessionPool, options, clientOptions) {
  55. super();
  56. if (topology == null) {
  57. throw new Error('ClientSession requires a topology');
  58. }
  59. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  60. throw new Error('ClientSession requires a ServerSessionPool');
  61. }
  62. options = options || {};
  63. this.topology = topology;
  64. this.sessionPool = sessionPool;
  65. this.hasEnded = false;
  66. this.serverSession = sessionPool.acquire();
  67. this.clientOptions = clientOptions;
  68. this.supports = {
  69. causalConsistency:
  70. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  71. };
  72. options = options || {};
  73. if (typeof options.initialClusterTime !== 'undefined') {
  74. this.clusterTime = options.initialClusterTime;
  75. } else {
  76. this.clusterTime = null;
  77. }
  78. this.operationTime = null;
  79. this.explicit = !!options.explicit;
  80. this.owner = options.owner;
  81. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  82. this.transaction = new Transaction();
  83. }
  84. /**
  85. * The server id associated with this session
  86. * @type {SessionId}
  87. */
  88. get id() {
  89. return this.serverSession.id;
  90. }
  91. /**
  92. * Ends this session on the server
  93. *
  94. * @param {Object} [options] Optional settings. Currently reserved for future use
  95. * @param {Function} [callback] Optional callback for completion of this operation
  96. */
  97. endSession(options, callback) {
  98. if (typeof options === 'function') (callback = options), (options = {});
  99. options = options || {};
  100. if (this.hasEnded) {
  101. if (typeof callback === 'function') callback(null, null);
  102. return;
  103. }
  104. if (this.serverSession && this.inTransaction()) {
  105. this.abortTransaction(); // pass in callback?
  106. }
  107. // mark the session as ended, and emit a signal
  108. this.hasEnded = true;
  109. this.emit('ended', this);
  110. // release the server session back to the pool
  111. this.sessionPool.release(this.serverSession);
  112. this.serverSession = null;
  113. // spec indicates that we should ignore all errors for `endSessions`
  114. if (typeof callback === 'function') callback(null, null);
  115. }
  116. /**
  117. * Advances the operationTime for a ClientSession.
  118. *
  119. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  120. */
  121. advanceOperationTime(operationTime) {
  122. if (this.operationTime == null) {
  123. this.operationTime = operationTime;
  124. return;
  125. }
  126. if (operationTime.greaterThan(this.operationTime)) {
  127. this.operationTime = operationTime;
  128. }
  129. }
  130. /**
  131. * Used to determine if this session equals another
  132. * @param {ClientSession} session
  133. * @return {boolean} true if the sessions are equal
  134. */
  135. equals(session) {
  136. if (!(session instanceof ClientSession)) {
  137. return false;
  138. }
  139. return this.id.id.buffer.equals(session.id.id.buffer);
  140. }
  141. /**
  142. * Increment the transaction number on the internal ServerSession
  143. */
  144. incrementTransactionNumber() {
  145. this.serverSession.txnNumber++;
  146. }
  147. /**
  148. * @returns {boolean} whether this session is currently in a transaction or not
  149. */
  150. inTransaction() {
  151. return this.transaction.isActive;
  152. }
  153. /**
  154. * Starts a new transaction with the given options.
  155. *
  156. * @param {TransactionOptions} options Options for the transaction
  157. */
  158. startTransaction(options) {
  159. assertAlive(this);
  160. if (this.inTransaction()) {
  161. throw new MongoError('Transaction already in progress');
  162. }
  163. // increment txnNumber
  164. this.incrementTransactionNumber();
  165. // create transaction state
  166. this.transaction = new Transaction(
  167. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  168. );
  169. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  170. }
  171. /**
  172. * Commits the currently active transaction in this session.
  173. *
  174. * @param {Function} [callback] optional callback for completion of this operation
  175. * @return {Promise} A promise is returned if no callback is provided
  176. */
  177. commitTransaction(callback) {
  178. if (typeof callback === 'function') {
  179. endTransaction(this, 'commitTransaction', callback);
  180. return;
  181. }
  182. return new Promise((resolve, reject) => {
  183. endTransaction(
  184. this,
  185. 'commitTransaction',
  186. (err, reply) => (err ? reject(err) : resolve(reply))
  187. );
  188. });
  189. }
  190. /**
  191. * Aborts the currently active transaction in this session.
  192. *
  193. * @param {Function} [callback] optional callback for completion of this operation
  194. * @return {Promise} A promise is returned if no callback is provided
  195. */
  196. abortTransaction(callback) {
  197. if (typeof callback === 'function') {
  198. endTransaction(this, 'abortTransaction', callback);
  199. return;
  200. }
  201. return new Promise((resolve, reject) => {
  202. endTransaction(
  203. this,
  204. 'abortTransaction',
  205. (err, reply) => (err ? reject(err) : resolve(reply))
  206. );
  207. });
  208. }
  209. /**
  210. * This is here to ensure that ClientSession is never serialized to BSON.
  211. * @ignore
  212. */
  213. toBSON() {
  214. throw new Error('ClientSession cannot be serialized to BSON.');
  215. }
  216. /**
  217. * A user provided function to be run within a transaction
  218. *
  219. * @callback WithTransactionCallback
  220. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  221. * @returns {Promise} The resulting Promise of operations run within this transaction
  222. */
  223. /**
  224. * Runs a provided lambda within a transaction, retrying either the commit operation
  225. * or entire transaction as needed (and when the error permits) to better ensure that
  226. * the transaction can complete successfully.
  227. *
  228. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  229. * return a Promise will result in undefined behavior.
  230. *
  231. * @param {WithTransactionCallback} fn
  232. * @param {TransactionOptions} [options] Optional settings for the transaction
  233. */
  234. withTransaction(fn, options) {
  235. const startTime = Date.now();
  236. return attemptTransaction(this, startTime, fn, options);
  237. }
  238. }
  239. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  240. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  241. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  242. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  243. 'CannotSatisfyWriteConcern',
  244. 'UnknownReplWriteConcern',
  245. 'UnsatisfiableWriteConcern'
  246. ]);
  247. function hasNotTimedOut(startTime, max) {
  248. return Date.now() - startTime < max;
  249. }
  250. function isUnknownTransactionCommitResult(err) {
  251. return (
  252. !NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  253. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  254. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE
  255. );
  256. }
  257. function attemptTransactionCommit(session, startTime, fn, options) {
  258. return session.commitTransaction().catch(err => {
  259. if (err instanceof MongoError && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
  260. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  261. return attemptTransactionCommit(session, startTime, fn, options);
  262. }
  263. if (err.hasErrorLabel('TransientTransactionError')) {
  264. return attemptTransaction(session, startTime, fn, options);
  265. }
  266. }
  267. throw err;
  268. });
  269. }
  270. const USER_EXPLICIT_TXN_END_STATES = new Set([
  271. TxnState.NO_TRANSACTION,
  272. TxnState.TRANSACTION_COMMITTED,
  273. TxnState.TRANSACTION_ABORTED
  274. ]);
  275. function userExplicitlyEndedTransaction(session) {
  276. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  277. }
  278. function attemptTransaction(session, startTime, fn, options) {
  279. session.startTransaction(options);
  280. let promise;
  281. try {
  282. promise = fn(session);
  283. } catch (err) {
  284. promise = Promise.reject(err);
  285. }
  286. if (!isPromiseLike(promise)) {
  287. session.abortTransaction();
  288. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  289. }
  290. return promise
  291. .then(() => {
  292. if (userExplicitlyEndedTransaction(session)) {
  293. return;
  294. }
  295. return attemptTransactionCommit(session, startTime, fn, options);
  296. })
  297. .catch(err => {
  298. function maybeRetryOrThrow(err) {
  299. if (
  300. err instanceof MongoError &&
  301. err.hasErrorLabel('TransientTransactionError') &&
  302. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  303. ) {
  304. return attemptTransaction(session, startTime, fn, options);
  305. }
  306. throw err;
  307. }
  308. if (session.transaction.isActive) {
  309. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  310. }
  311. return maybeRetryOrThrow(err);
  312. });
  313. }
  314. function endTransaction(session, commandName, callback) {
  315. if (!assertAlive(session, callback)) {
  316. // checking result in case callback was called
  317. return;
  318. }
  319. // handle any initial problematic cases
  320. let txnState = session.transaction.state;
  321. if (txnState === TxnState.NO_TRANSACTION) {
  322. callback(new MongoError('No transaction started'));
  323. return;
  324. }
  325. if (commandName === 'commitTransaction') {
  326. if (
  327. txnState === TxnState.STARTING_TRANSACTION ||
  328. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  329. ) {
  330. // the transaction was never started, we can safely exit here
  331. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  332. callback(null, null);
  333. return;
  334. }
  335. if (txnState === TxnState.TRANSACTION_ABORTED) {
  336. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  337. return;
  338. }
  339. } else {
  340. if (txnState === TxnState.STARTING_TRANSACTION) {
  341. // the transaction was never started, we can safely exit here
  342. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  343. callback(null, null);
  344. return;
  345. }
  346. if (txnState === TxnState.TRANSACTION_ABORTED) {
  347. callback(new MongoError('Cannot call abortTransaction twice'));
  348. return;
  349. }
  350. if (
  351. txnState === TxnState.TRANSACTION_COMMITTED ||
  352. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  353. ) {
  354. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  355. return;
  356. }
  357. }
  358. // construct and send the command
  359. const command = { [commandName]: 1 };
  360. // apply a writeConcern if specified
  361. let writeConcern;
  362. if (session.transaction.options.writeConcern) {
  363. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  364. } else if (session.clientOptions && session.clientOptions.w) {
  365. writeConcern = { w: session.clientOptions.w };
  366. }
  367. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  368. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  369. }
  370. if (writeConcern) {
  371. Object.assign(command, { writeConcern });
  372. }
  373. function commandHandler(e, r) {
  374. if (commandName === 'commitTransaction') {
  375. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  376. if (
  377. e &&
  378. (e instanceof MongoNetworkError ||
  379. e instanceof MongoWriteConcernError ||
  380. isRetryableError(e))
  381. ) {
  382. if (e.errorLabels) {
  383. const idx = e.errorLabels.indexOf('TransientTransactionError');
  384. if (idx !== -1) {
  385. e.errorLabels.splice(idx, 1);
  386. }
  387. } else {
  388. e.errorLabels = [];
  389. }
  390. if (isUnknownTransactionCommitResult(e)) {
  391. e.errorLabels.push('UnknownTransactionCommitResult');
  392. // per txns spec, must unpin session in this case
  393. session.transaction.unpinServer();
  394. }
  395. }
  396. } else {
  397. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  398. }
  399. callback(e, r);
  400. }
  401. // The spec indicates that we should ignore all errors on `abortTransaction`
  402. function transactionError(err) {
  403. return commandName === 'commitTransaction' ? err : null;
  404. }
  405. if (
  406. commandName === 'commitTransaction' &&
  407. session.transaction.recoveryToken &&
  408. supportsRecoveryToken(session)
  409. ) {
  410. command.recoveryToken = session.transaction.recoveryToken;
  411. }
  412. // send the command
  413. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  414. if (err && isRetryableError(err)) {
  415. // SPEC-1185: apply majority write concern when retrying commitTransaction
  416. if (command.commitTransaction) {
  417. // per txns spec, must unpin session in this case
  418. session.transaction.unpinServer();
  419. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  420. w: 'majority'
  421. });
  422. }
  423. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  424. commandHandler(transactionError(_err), _reply)
  425. );
  426. }
  427. commandHandler(transactionError(err), reply);
  428. });
  429. }
  430. function supportsRecoveryToken(session) {
  431. const topology = session.topology;
  432. return !!topology.s.options.useRecoveryToken;
  433. }
  434. /**
  435. * Reflects the existence of a session on the server. Can be reused by the session pool.
  436. * WARNING: not meant to be instantiated directly. For internal use only.
  437. * @ignore
  438. */
  439. class ServerSession {
  440. constructor() {
  441. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  442. this.lastUse = Date.now();
  443. this.txnNumber = 0;
  444. }
  445. /**
  446. * Determines if the server session has timed out.
  447. * @ignore
  448. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  449. * @return {boolean} true if the session has timed out.
  450. */
  451. hasTimedOut(sessionTimeoutMinutes) {
  452. // Take the difference of the lastUse timestamp and now, which will result in a value in
  453. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  454. const idleTimeMinutes = Math.round(
  455. (((Date.now() - this.lastUse) % 86400000) % 3600000) / 60000
  456. );
  457. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  458. }
  459. }
  460. /**
  461. * Maintains a pool of Server Sessions.
  462. * For internal use only
  463. * @ignore
  464. */
  465. class ServerSessionPool {
  466. constructor(topology) {
  467. if (topology == null) {
  468. throw new Error('ServerSessionPool requires a topology');
  469. }
  470. this.topology = topology;
  471. this.sessions = [];
  472. }
  473. /**
  474. * Ends all sessions in the session pool.
  475. * @ignore
  476. */
  477. endAllPooledSessions() {
  478. if (this.sessions.length) {
  479. this.topology.endSessions(this.sessions.map(session => session.id));
  480. this.sessions = [];
  481. }
  482. }
  483. /**
  484. * Acquire a Server Session from the pool.
  485. * Iterates through each session in the pool, removing any stale sessions
  486. * along the way. The first non-stale session found is removed from the
  487. * pool and returned. If no non-stale session is found, a new ServerSession
  488. * is created.
  489. * @ignore
  490. * @returns {ServerSession}
  491. */
  492. acquire() {
  493. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  494. while (this.sessions.length) {
  495. const session = this.sessions.shift();
  496. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  497. return session;
  498. }
  499. }
  500. return new ServerSession();
  501. }
  502. /**
  503. * Release a session to the session pool
  504. * Adds the session back to the session pool if the session has not timed out yet.
  505. * This method also removes any stale sessions from the pool.
  506. * @ignore
  507. * @param {ServerSession} session The session to release to the pool
  508. */
  509. release(session) {
  510. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  511. while (this.sessions.length) {
  512. const session = this.sessions[this.sessions.length - 1];
  513. if (session.hasTimedOut(sessionTimeoutMinutes)) {
  514. this.sessions.pop();
  515. } else {
  516. break;
  517. }
  518. }
  519. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  520. this.sessions.unshift(session);
  521. }
  522. }
  523. }
  524. /**
  525. * Optionally decorate a command with sessions specific keys
  526. *
  527. * @param {ClientSession} session the session tracking transaction state
  528. * @param {Object} command the command to decorate
  529. * @param {Object} topology the topology for tracking the cluster time
  530. * @param {Object} [options] Optional settings passed to calling operation
  531. * @return {MongoError|null} An error, if some error condition was met
  532. */
  533. function applySession(session, command, options) {
  534. const serverSession = session.serverSession;
  535. if (serverSession == null) {
  536. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  537. return new MongoError('Cannot use a session that has ended');
  538. }
  539. // mark the last use of this session, and apply the `lsid`
  540. serverSession.lastUse = Date.now();
  541. command.lsid = serverSession.id;
  542. // first apply non-transaction-specific sessions data
  543. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  544. const isRetryableWrite = options.willRetryWrite;
  545. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  546. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  547. }
  548. // now attempt to apply transaction-specific sessions data
  549. if (!inTransaction) {
  550. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  551. session.transaction.transition(TxnState.NO_TRANSACTION);
  552. }
  553. // TODO: the following should only be applied to read operation per spec.
  554. // for causal consistency
  555. if (session.supports.causalConsistency && session.operationTime) {
  556. command.readConcern = command.readConcern || {};
  557. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  558. }
  559. return;
  560. }
  561. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  562. return new MongoError(
  563. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  564. );
  565. }
  566. // `autocommit` must always be false to differentiate from retryable writes
  567. command.autocommit = false;
  568. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  569. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  570. command.startTransaction = true;
  571. const readConcern =
  572. session.transaction.options.readConcern || session.clientOptions.readConcern;
  573. if (readConcern) {
  574. command.readConcern = readConcern;
  575. }
  576. if (session.supports.causalConsistency && session.operationTime) {
  577. command.readConcern = command.readConcern || {};
  578. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  579. }
  580. }
  581. }
  582. function updateSessionFromResponse(session, document) {
  583. if (document.$clusterTime) {
  584. resolveClusterTime(session, document.$clusterTime);
  585. }
  586. if (document.operationTime && session && session.supports.causalConsistency) {
  587. session.advanceOperationTime(document.operationTime);
  588. }
  589. if (document.recoveryToken && session && session.inTransaction()) {
  590. session.transaction._recoveryToken = document.recoveryToken;
  591. }
  592. }
  593. module.exports = {
  594. ClientSession,
  595. ServerSession,
  596. ServerSessionPool,
  597. TxnState,
  598. applySession,
  599. updateSessionFromResponse
  600. };