123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- 'use strict';
- // Implementation of OP_MSG spec:
- // https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
- //
- // struct Section {
- // uint8 payloadType;
- // union payload {
- // document document; // payloadType == 0
- // struct sequence { // payloadType == 1
- // int32 size;
- // cstring identifier;
- // document* documents;
- // };
- // };
- // };
- // struct OP_MSG {
- // struct MsgHeader {
- // int32 messageLength;
- // int32 requestID;
- // int32 responseTo;
- // int32 opCode = 2013;
- // };
- // uint32 flagBits;
- // Section+ sections;
- // [uint32 checksum;]
- // };
- const opcodes = require('../wireprotocol/shared').opcodes;
- const databaseNamespace = require('../wireprotocol/shared').databaseNamespace;
- const ReadPreference = require('../topologies/read_preference');
- // Incrementing request id
- let _requestId = 0;
- // Msg Flags
- const OPTS_CHECKSUM_PRESENT = 1;
- const OPTS_MORE_TO_COME = 2;
- const OPTS_EXHAUST_ALLOWED = 1 << 16;
- class Msg {
- constructor(bson, ns, command, options) {
- // Basic options needed to be passed in
- if (command == null) throw new Error('query must be specified for query');
- // Basic options
- this.bson = bson;
- this.ns = ns;
- this.command = command;
- this.command.$db = databaseNamespace(ns);
- if (options.readPreference && options.readPreference.mode !== ReadPreference.PRIMARY) {
- this.command.$readPreference = options.readPreference.toJSON();
- }
- // Ensure empty options
- this.options = options || {};
- // Additional options
- this.requestId = Msg.getRequestId();
- // Serialization option
- this.serializeFunctions =
- typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
- this.ignoreUndefined =
- typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
- this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
- this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
- // flags
- this.checksumPresent = false;
- this.moreToCome = options.moreToCome || false;
- this.exhaustAllowed = false;
- }
- toBin() {
- const buffers = [];
- let flags = 0;
- if (this.checksumPresent) {
- flags |= OPTS_CHECKSUM_PRESENT;
- }
- if (this.moreToCome) {
- flags |= OPTS_MORE_TO_COME;
- }
- if (this.exhaustAllowed) {
- flags |= OPTS_EXHAUST_ALLOWED;
- }
- const header = new Buffer(
- 4 * 4 + // Header
- 4 // Flags
- );
- buffers.push(header);
- let totalLength = header.length;
- const command = this.command;
- totalLength += this.makeDocumentSegment(buffers, command);
- header.writeInt32LE(totalLength, 0); // messageLength
- header.writeInt32LE(this.requestId, 4); // requestID
- header.writeInt32LE(0, 8); // responseTo
- header.writeInt32LE(opcodes.OP_MSG, 12); // opCode
- header.writeUInt32LE(flags, 16); // flags
- return buffers;
- }
- makeDocumentSegment(buffers, document) {
- const payloadTypeBuffer = new Buffer(1);
- payloadTypeBuffer[0] = 0;
- const documentBuffer = this.serializeBson(document);
- buffers.push(payloadTypeBuffer);
- buffers.push(documentBuffer);
- return payloadTypeBuffer.length + documentBuffer.length;
- }
- serializeBson(document) {
- return this.bson.serialize(document, {
- checkKeys: this.checkKeys,
- serializeFunctions: this.serializeFunctions,
- ignoreUndefined: this.ignoreUndefined
- });
- }
- }
- Msg.getRequestId = function() {
- return ++_requestId;
- };
- class BinMsg {
- constructor(bson, message, msgHeader, msgBody, opts) {
- opts = opts || { promoteLongs: true, promoteValues: true, promoteBuffers: false };
- this.parsed = false;
- this.raw = message;
- this.data = msgBody;
- this.bson = bson;
- this.opts = opts;
- // Read the message header
- this.length = msgHeader.length;
- this.requestId = msgHeader.requestId;
- this.responseTo = msgHeader.responseTo;
- this.opCode = msgHeader.opCode;
- this.fromCompressed = msgHeader.fromCompressed;
- // Read response flags
- this.responseFlags = msgBody.readInt32LE(0);
- this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
- this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
- this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
- this.promoteLongs = typeof opts.promoteLongs === 'boolean' ? opts.promoteLongs : true;
- this.promoteValues = typeof opts.promoteValues === 'boolean' ? opts.promoteValues : true;
- this.promoteBuffers = typeof opts.promoteBuffers === 'boolean' ? opts.promoteBuffers : false;
- this.documents = [];
- }
- isParsed() {
- return this.parsed;
- }
- parse(options) {
- // Don't parse again if not needed
- if (this.parsed) return;
- options = options || {};
- this.index = 4;
- // Allow the return of raw documents instead of parsing
- const raw = options.raw || false;
- const documentsReturnedIn = options.documentsReturnedIn || null;
- const promoteLongs =
- typeof options.promoteLongs === 'boolean' ? options.promoteLongs : this.opts.promoteLongs;
- const promoteValues =
- typeof options.promoteValues === 'boolean' ? options.promoteValues : this.opts.promoteValues;
- const promoteBuffers =
- typeof options.promoteBuffers === 'boolean'
- ? options.promoteBuffers
- : this.opts.promoteBuffers;
- // Set up the options
- const _options = {
- promoteLongs: promoteLongs,
- promoteValues: promoteValues,
- promoteBuffers: promoteBuffers
- };
- while (this.index < this.data.length) {
- const payloadType = this.data.readUInt8(this.index++);
- if (payloadType === 1) {
- console.error('TYPE 1');
- } else if (payloadType === 0) {
- const bsonSize = this.data.readUInt32LE(this.index);
- const bin = this.data.slice(this.index, this.index + bsonSize);
- this.documents.push(raw ? bin : this.bson.deserialize(bin, _options));
- this.index += bsonSize;
- }
- }
- if (this.documents.length === 1 && documentsReturnedIn != null && raw) {
- const fieldsAsRaw = {};
- fieldsAsRaw[documentsReturnedIn] = true;
- _options.fieldsAsRaw = fieldsAsRaw;
- const doc = this.bson.deserialize(this.documents[0], _options);
- this.documents = [doc];
- }
- this.parsed = true;
- }
- }
- module.exports = { Msg, BinMsg };
|