index.js 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. 'use strict';
  2. // FIXME:
  3. // replace this Transform mess with a method that pipes input argument to output argument
  4. const MessageParser = require('./message-parser');
  5. const RelaxedBody = require('./relaxed-body');
  6. const sign = require('./sign');
  7. const PassThrough = require('stream').PassThrough;
  8. const fs = require('fs');
  9. const path = require('path');
  10. const crypto = require('crypto');
  11. const DKIM_ALGO = 'sha256';
  12. const MAX_MESSAGE_SIZE = 128 * 1024; // buffer messages larger than this to disk
  13. /*
  14. // Usage:
  15. let dkim = new DKIM({
  16. domainName: 'example.com',
  17. keySelector: 'key-selector',
  18. privateKey,
  19. cacheDir: '/tmp'
  20. });
  21. dkim.sign(input).pipe(process.stdout);
  22. // Where inputStream is a rfc822 message (either a stream, string or Buffer)
  23. // and outputStream is a DKIM signed rfc822 message
  24. */
  25. class DKIMSigner {
  26. constructor(options, keys, input, output) {
  27. this.options = options || {};
  28. this.keys = keys;
  29. this.cacheTreshold = Number(this.options.cacheTreshold) || MAX_MESSAGE_SIZE;
  30. this.hashAlgo = this.options.hashAlgo || DKIM_ALGO;
  31. this.cacheDir = this.options.cacheDir || false;
  32. this.chunks = [];
  33. this.chunklen = 0;
  34. this.readPos = 0;
  35. this.cachePath = this.cacheDir ? path.join(this.cacheDir, 'message.' + Date.now() + '-' + crypto.randomBytes(14).toString('hex')) : false;
  36. this.cache = false;
  37. this.headers = false;
  38. this.bodyHash = false;
  39. this.parser = false;
  40. this.relaxedBody = false;
  41. this.input = input;
  42. this.output = output;
  43. this.output.usingCache = false;
  44. this.errored = false;
  45. this.input.on('error', err => {
  46. this.errored = true;
  47. this.cleanup();
  48. output.emit('error', err);
  49. });
  50. }
  51. cleanup() {
  52. if (!this.cache || !this.cachePath) {
  53. return;
  54. }
  55. fs.unlink(this.cachePath, () => false);
  56. }
  57. createReadCache() {
  58. // pipe remainings to cache file
  59. this.cache = fs.createReadStream(this.cachePath);
  60. this.cache.once('error', err => {
  61. this.cleanup();
  62. this.output.emit('error', err);
  63. });
  64. this.cache.once('close', () => {
  65. this.cleanup();
  66. });
  67. this.cache.pipe(this.output);
  68. }
  69. sendNextChunk() {
  70. if (this.errored) {
  71. return;
  72. }
  73. if (this.readPos >= this.chunks.length) {
  74. if (!this.cache) {
  75. return this.output.end();
  76. }
  77. return this.createReadCache();
  78. }
  79. let chunk = this.chunks[this.readPos++];
  80. if (this.output.write(chunk) === false) {
  81. return this.output.once('drain', () => {
  82. this.sendNextChunk();
  83. });
  84. }
  85. setImmediate(() => this.sendNextChunk());
  86. }
  87. sendSignedOutput() {
  88. let keyPos = 0;
  89. let signNextKey = () => {
  90. if (keyPos >= this.keys.length) {
  91. this.output.write(this.parser.rawHeaders);
  92. return setImmediate(() => this.sendNextChunk());
  93. }
  94. let key = this.keys[keyPos++];
  95. let dkimField = sign(this.headers, this.hashAlgo, this.bodyHash, {
  96. domainName: key.domainName,
  97. keySelector: key.keySelector,
  98. privateKey: key.privateKey,
  99. headerFieldNames: this.options.headerFieldNames,
  100. skipFields: this.options.skipFields
  101. });
  102. if (dkimField) {
  103. this.output.write(Buffer.from(dkimField + '\r\n'));
  104. }
  105. return setImmediate(signNextKey);
  106. };
  107. if (this.bodyHash && this.headers) {
  108. return signNextKey();
  109. }
  110. this.output.write(this.parser.rawHeaders);
  111. this.sendNextChunk();
  112. }
  113. createWriteCache() {
  114. this.output.usingCache = true;
  115. // pipe remainings to cache file
  116. this.cache = fs.createWriteStream(this.cachePath);
  117. this.cache.once('error', err => {
  118. this.cleanup();
  119. // drain input
  120. this.relaxedBody.unpipe(this.cache);
  121. this.relaxedBody.on('readable', () => {
  122. while (this.relaxedBody.read() !== null) {
  123. // do nothing
  124. }
  125. });
  126. this.errored = true;
  127. // emit error
  128. this.output.emit('error', err);
  129. });
  130. this.cache.once('close', () => {
  131. this.sendSignedOutput();
  132. });
  133. this.relaxedBody.removeAllListeners('readable');
  134. this.relaxedBody.pipe(this.cache);
  135. }
  136. signStream() {
  137. this.parser = new MessageParser();
  138. this.relaxedBody = new RelaxedBody({
  139. hashAlgo: this.hashAlgo
  140. });
  141. this.parser.on('headers', value => {
  142. this.headers = value;
  143. });
  144. this.relaxedBody.on('hash', value => {
  145. this.bodyHash = value;
  146. });
  147. this.relaxedBody.on('readable', () => {
  148. let chunk;
  149. if (this.cache) {
  150. return;
  151. }
  152. while ((chunk = this.relaxedBody.read()) !== null) {
  153. this.chunks.push(chunk);
  154. this.chunklen += chunk.length;
  155. if (this.chunklen >= this.cacheTreshold && this.cachePath) {
  156. return this.createWriteCache();
  157. }
  158. }
  159. });
  160. this.relaxedBody.on('end', () => {
  161. if (this.cache) {
  162. return;
  163. }
  164. this.sendSignedOutput();
  165. });
  166. this.parser.pipe(this.relaxedBody);
  167. setImmediate(() => this.input.pipe(this.parser));
  168. }
  169. }
  170. class DKIM {
  171. constructor(options) {
  172. this.options = options || {};
  173. this.keys = [].concat(
  174. this.options.keys || {
  175. domainName: options.domainName,
  176. keySelector: options.keySelector,
  177. privateKey: options.privateKey
  178. }
  179. );
  180. }
  181. sign(input, extraOptions) {
  182. let output = new PassThrough();
  183. let inputStream = input;
  184. let writeValue = false;
  185. if (Buffer.isBuffer(input)) {
  186. writeValue = input;
  187. inputStream = new PassThrough();
  188. } else if (typeof input === 'string') {
  189. writeValue = Buffer.from(input);
  190. inputStream = new PassThrough();
  191. }
  192. let options = this.options;
  193. if (extraOptions && Object.keys(extraOptions).length) {
  194. options = {};
  195. Object.keys(this.options || {}).forEach(key => {
  196. options[key] = this.options[key];
  197. });
  198. Object.keys(extraOptions || {}).forEach(key => {
  199. if (!(key in options)) {
  200. options[key] = extraOptions[key];
  201. }
  202. });
  203. }
  204. let signer = new DKIMSigner(options, this.keys, inputStream, output);
  205. setImmediate(() => {
  206. signer.signStream();
  207. if (writeValue) {
  208. setImmediate(() => {
  209. inputStream.end(writeValue);
  210. });
  211. }
  212. });
  213. return output;
  214. }
  215. }
  216. module.exports = DKIM;