message-parser.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. 'use strict';
  2. const Transform = require('stream').Transform;
  3. /**
  4. * MessageParser instance is a transform stream that separates message headers
  5. * from the rest of the body. Headers are emitted with the 'headers' event. Message
  6. * body is passed on as the resulting stream.
  7. */
  8. class MessageParser extends Transform {
  9. constructor(options) {
  10. super(options);
  11. this.lastBytes = Buffer.alloc(4);
  12. this.headersParsed = false;
  13. this.headerBytes = 0;
  14. this.headerChunks = [];
  15. this.rawHeaders = false;
  16. this.bodySize = 0;
  17. }
  18. /**
  19. * Keeps count of the last 4 bytes in order to detect line breaks on chunk boundaries
  20. *
  21. * @param {Buffer} data Next data chunk from the stream
  22. */
  23. updateLastBytes(data) {
  24. let lblen = this.lastBytes.length;
  25. let nblen = Math.min(data.length, lblen);
  26. // shift existing bytes
  27. for (let i = 0, len = lblen - nblen; i < len; i++) {
  28. this.lastBytes[i] = this.lastBytes[i + nblen];
  29. }
  30. // add new bytes
  31. for (let i = 1; i <= nblen; i++) {
  32. this.lastBytes[lblen - i] = data[data.length - i];
  33. }
  34. }
  35. /**
  36. * Finds and removes message headers from the remaining body. We want to keep
  37. * headers separated until final delivery to be able to modify these
  38. *
  39. * @param {Buffer} data Next chunk of data
  40. * @return {Boolean} Returns true if headers are already found or false otherwise
  41. */
  42. checkHeaders(data) {
  43. if (this.headersParsed) {
  44. return true;
  45. }
  46. let lblen = this.lastBytes.length;
  47. let headerPos = 0;
  48. this.curLinePos = 0;
  49. for (let i = 0, len = this.lastBytes.length + data.length; i < len; i++) {
  50. let chr;
  51. if (i < lblen) {
  52. chr = this.lastBytes[i];
  53. } else {
  54. chr = data[i - lblen];
  55. }
  56. if (chr === 0x0a && i) {
  57. let pr1 = i - 1 < lblen ? this.lastBytes[i - 1] : data[i - 1 - lblen];
  58. let pr2 = i > 1 ? (i - 2 < lblen ? this.lastBytes[i - 2] : data[i - 2 - lblen]) : false;
  59. if (pr1 === 0x0a) {
  60. this.headersParsed = true;
  61. headerPos = i - lblen + 1;
  62. this.headerBytes += headerPos;
  63. break;
  64. } else if (pr1 === 0x0d && pr2 === 0x0a) {
  65. this.headersParsed = true;
  66. headerPos = i - lblen + 1;
  67. this.headerBytes += headerPos;
  68. break;
  69. }
  70. }
  71. }
  72. if (this.headersParsed) {
  73. this.headerChunks.push(data.slice(0, headerPos));
  74. this.rawHeaders = Buffer.concat(this.headerChunks, this.headerBytes);
  75. this.headerChunks = null;
  76. this.emit('headers', this.parseHeaders());
  77. if (data.length - 1 > headerPos) {
  78. let chunk = data.slice(headerPos);
  79. this.bodySize += chunk.length;
  80. // this would be the first chunk of data sent downstream
  81. setImmediate(() => this.push(chunk));
  82. }
  83. return false;
  84. } else {
  85. this.headerBytes += data.length;
  86. this.headerChunks.push(data);
  87. }
  88. // store last 4 bytes to catch header break
  89. this.updateLastBytes(data);
  90. return false;
  91. }
  92. _transform(chunk, encoding, callback) {
  93. if (!chunk || !chunk.length) {
  94. return callback();
  95. }
  96. if (typeof chunk === 'string') {
  97. chunk = Buffer.from(chunk, encoding);
  98. }
  99. let headersFound;
  100. try {
  101. headersFound = this.checkHeaders(chunk);
  102. } catch (E) {
  103. return callback(E);
  104. }
  105. if (headersFound) {
  106. this.bodySize += chunk.length;
  107. this.push(chunk);
  108. }
  109. setImmediate(callback);
  110. }
  111. _flush(callback) {
  112. if (this.headerChunks) {
  113. let chunk = Buffer.concat(this.headerChunks, this.headerBytes);
  114. this.bodySize += chunk.length;
  115. this.push(chunk);
  116. this.headerChunks = null;
  117. }
  118. callback();
  119. }
  120. parseHeaders() {
  121. let lines = (this.rawHeaders || '').toString().split(/\r?\n/);
  122. for (let i = lines.length - 1; i > 0; i--) {
  123. if (/^\s/.test(lines[i])) {
  124. lines[i - 1] += '\n' + lines[i];
  125. lines.splice(i, 1);
  126. }
  127. }
  128. return lines.filter(line => line.trim()).map(line => ({
  129. key: line
  130. .substr(0, line.indexOf(':'))
  131. .trim()
  132. .toLowerCase(),
  133. line
  134. }));
  135. }
  136. }
  137. module.exports = MessageParser;