msg.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. 'use strict';
  2. // Implementation of OP_MSG spec:
  3. // https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
  4. //
  5. // struct Section {
  6. // uint8 payloadType;
  7. // union payload {
  8. // document document; // payloadType == 0
  9. // struct sequence { // payloadType == 1
  10. // int32 size;
  11. // cstring identifier;
  12. // document* documents;
  13. // };
  14. // };
  15. // };
  16. // struct OP_MSG {
  17. // struct MsgHeader {
  18. // int32 messageLength;
  19. // int32 requestID;
  20. // int32 responseTo;
  21. // int32 opCode = 2013;
  22. // };
  23. // uint32 flagBits;
  24. // Section+ sections;
  25. // [uint32 checksum;]
  26. // };
  27. const opcodes = require('../wireprotocol/shared').opcodes;
  28. const databaseNamespace = require('../wireprotocol/shared').databaseNamespace;
  29. const ReadPreference = require('../topologies/read_preference');
  30. // Incrementing request id
  31. let _requestId = 0;
  32. // Msg Flags
  33. const OPTS_CHECKSUM_PRESENT = 1;
  34. const OPTS_MORE_TO_COME = 2;
  35. const OPTS_EXHAUST_ALLOWED = 1 << 16;
  36. class Msg {
  37. constructor(bson, ns, command, options) {
  38. // Basic options needed to be passed in
  39. if (command == null) throw new Error('query must be specified for query');
  40. // Basic options
  41. this.bson = bson;
  42. this.ns = ns;
  43. this.command = command;
  44. this.command.$db = databaseNamespace(ns);
  45. if (options.readPreference && options.readPreference.mode !== ReadPreference.PRIMARY) {
  46. this.command.$readPreference = options.readPreference.toJSON();
  47. }
  48. // Ensure empty options
  49. this.options = options || {};
  50. // Additional options
  51. this.requestId = Msg.getRequestId();
  52. // Serialization option
  53. this.serializeFunctions =
  54. typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
  55. this.ignoreUndefined =
  56. typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
  57. this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
  58. this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
  59. // flags
  60. this.checksumPresent = false;
  61. this.moreToCome = options.moreToCome || false;
  62. this.exhaustAllowed = false;
  63. }
  64. toBin() {
  65. const buffers = [];
  66. let flags = 0;
  67. if (this.checksumPresent) {
  68. flags |= OPTS_CHECKSUM_PRESENT;
  69. }
  70. if (this.moreToCome) {
  71. flags |= OPTS_MORE_TO_COME;
  72. }
  73. if (this.exhaustAllowed) {
  74. flags |= OPTS_EXHAUST_ALLOWED;
  75. }
  76. const header = new Buffer(
  77. 4 * 4 + // Header
  78. 4 // Flags
  79. );
  80. buffers.push(header);
  81. let totalLength = header.length;
  82. const command = this.command;
  83. totalLength += this.makeDocumentSegment(buffers, command);
  84. header.writeInt32LE(totalLength, 0); // messageLength
  85. header.writeInt32LE(this.requestId, 4); // requestID
  86. header.writeInt32LE(0, 8); // responseTo
  87. header.writeInt32LE(opcodes.OP_MSG, 12); // opCode
  88. header.writeUInt32LE(flags, 16); // flags
  89. return buffers;
  90. }
  91. makeDocumentSegment(buffers, document) {
  92. const payloadTypeBuffer = new Buffer(1);
  93. payloadTypeBuffer[0] = 0;
  94. const documentBuffer = this.serializeBson(document);
  95. buffers.push(payloadTypeBuffer);
  96. buffers.push(documentBuffer);
  97. return payloadTypeBuffer.length + documentBuffer.length;
  98. }
  99. serializeBson(document) {
  100. return this.bson.serialize(document, {
  101. checkKeys: this.checkKeys,
  102. serializeFunctions: this.serializeFunctions,
  103. ignoreUndefined: this.ignoreUndefined
  104. });
  105. }
  106. }
  107. Msg.getRequestId = function() {
  108. return ++_requestId;
  109. };
  110. class BinMsg {
  111. constructor(bson, message, msgHeader, msgBody, opts) {
  112. opts = opts || { promoteLongs: true, promoteValues: true, promoteBuffers: false };
  113. this.parsed = false;
  114. this.raw = message;
  115. this.data = msgBody;
  116. this.bson = bson;
  117. this.opts = opts;
  118. // Read the message header
  119. this.length = msgHeader.length;
  120. this.requestId = msgHeader.requestId;
  121. this.responseTo = msgHeader.responseTo;
  122. this.opCode = msgHeader.opCode;
  123. this.fromCompressed = msgHeader.fromCompressed;
  124. // Read response flags
  125. this.responseFlags = msgBody.readInt32LE(0);
  126. this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
  127. this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
  128. this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
  129. this.promoteLongs = typeof opts.promoteLongs === 'boolean' ? opts.promoteLongs : true;
  130. this.promoteValues = typeof opts.promoteValues === 'boolean' ? opts.promoteValues : true;
  131. this.promoteBuffers = typeof opts.promoteBuffers === 'boolean' ? opts.promoteBuffers : false;
  132. this.documents = [];
  133. }
  134. isParsed() {
  135. return this.parsed;
  136. }
  137. parse(options) {
  138. // Don't parse again if not needed
  139. if (this.parsed) return;
  140. options = options || {};
  141. this.index = 4;
  142. // Allow the return of raw documents instead of parsing
  143. const raw = options.raw || false;
  144. const documentsReturnedIn = options.documentsReturnedIn || null;
  145. const promoteLongs =
  146. typeof options.promoteLongs === 'boolean' ? options.promoteLongs : this.opts.promoteLongs;
  147. const promoteValues =
  148. typeof options.promoteValues === 'boolean' ? options.promoteValues : this.opts.promoteValues;
  149. const promoteBuffers =
  150. typeof options.promoteBuffers === 'boolean'
  151. ? options.promoteBuffers
  152. : this.opts.promoteBuffers;
  153. // Set up the options
  154. const _options = {
  155. promoteLongs: promoteLongs,
  156. promoteValues: promoteValues,
  157. promoteBuffers: promoteBuffers
  158. };
  159. while (this.index < this.data.length) {
  160. const payloadType = this.data.readUInt8(this.index++);
  161. if (payloadType === 1) {
  162. console.error('TYPE 1');
  163. } else if (payloadType === 0) {
  164. const bsonSize = this.data.readUInt32LE(this.index);
  165. const bin = this.data.slice(this.index, this.index + bsonSize);
  166. this.documents.push(raw ? bin : this.bson.deserialize(bin, _options));
  167. this.index += bsonSize;
  168. }
  169. }
  170. if (this.documents.length === 1 && documentsReturnedIn != null && raw) {
  171. const fieldsAsRaw = {};
  172. fieldsAsRaw[documentsReturnedIn] = true;
  173. _options.fieldsAsRaw = fieldsAsRaw;
  174. const doc = this.bson.deserialize(this.documents[0], _options);
  175. this.documents = [doc];
  176. }
  177. this.parsed = true;
  178. }
  179. }
  180. module.exports = { Msg, BinMsg };