123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- 'use strict';
- // FIXME:
- // replace this Transform mess with a method that pipes input argument to output argument
- const MessageParser = require('./message-parser');
- const RelaxedBody = require('./relaxed-body');
- const sign = require('./sign');
- const PassThrough = require('stream').PassThrough;
- const fs = require('fs');
- const path = require('path');
- const crypto = require('crypto');
- const DKIM_ALGO = 'sha256';
- const MAX_MESSAGE_SIZE = 128 * 1024; // buffer messages larger than this to disk
- /*
- // Usage:
- let dkim = new DKIM({
- domainName: 'example.com',
- keySelector: 'key-selector',
- privateKey,
- cacheDir: '/tmp'
- });
- dkim.sign(input).pipe(process.stdout);
- // Where inputStream is a rfc822 message (either a stream, string or Buffer)
- // and outputStream is a DKIM signed rfc822 message
- */
- class DKIMSigner {
- constructor(options, keys, input, output) {
- this.options = options || {};
- this.keys = keys;
- this.cacheTreshold = Number(this.options.cacheTreshold) || MAX_MESSAGE_SIZE;
- this.hashAlgo = this.options.hashAlgo || DKIM_ALGO;
- this.cacheDir = this.options.cacheDir || false;
- this.chunks = [];
- this.chunklen = 0;
- this.readPos = 0;
- this.cachePath = this.cacheDir ? path.join(this.cacheDir, 'message.' + Date.now() + '-' + crypto.randomBytes(14).toString('hex')) : false;
- this.cache = false;
- this.headers = false;
- this.bodyHash = false;
- this.parser = false;
- this.relaxedBody = false;
- this.input = input;
- this.output = output;
- this.output.usingCache = false;
- this.errored = false;
- this.input.on('error', err => {
- this.errored = true;
- this.cleanup();
- output.emit('error', err);
- });
- }
- cleanup() {
- if (!this.cache || !this.cachePath) {
- return;
- }
- fs.unlink(this.cachePath, () => false);
- }
- createReadCache() {
- // pipe remainings to cache file
- this.cache = fs.createReadStream(this.cachePath);
- this.cache.once('error', err => {
- this.cleanup();
- this.output.emit('error', err);
- });
- this.cache.once('close', () => {
- this.cleanup();
- });
- this.cache.pipe(this.output);
- }
- sendNextChunk() {
- if (this.errored) {
- return;
- }
- if (this.readPos >= this.chunks.length) {
- if (!this.cache) {
- return this.output.end();
- }
- return this.createReadCache();
- }
- let chunk = this.chunks[this.readPos++];
- if (this.output.write(chunk) === false) {
- return this.output.once('drain', () => {
- this.sendNextChunk();
- });
- }
- setImmediate(() => this.sendNextChunk());
- }
- sendSignedOutput() {
- let keyPos = 0;
- let signNextKey = () => {
- if (keyPos >= this.keys.length) {
- this.output.write(this.parser.rawHeaders);
- return setImmediate(() => this.sendNextChunk());
- }
- let key = this.keys[keyPos++];
- let dkimField = sign(this.headers, this.hashAlgo, this.bodyHash, {
- domainName: key.domainName,
- keySelector: key.keySelector,
- privateKey: key.privateKey,
- headerFieldNames: this.options.headerFieldNames,
- skipFields: this.options.skipFields
- });
- if (dkimField) {
- this.output.write(Buffer.from(dkimField + '\r\n'));
- }
- return setImmediate(signNextKey);
- };
- if (this.bodyHash && this.headers) {
- return signNextKey();
- }
- this.output.write(this.parser.rawHeaders);
- this.sendNextChunk();
- }
- createWriteCache() {
- this.output.usingCache = true;
- // pipe remainings to cache file
- this.cache = fs.createWriteStream(this.cachePath);
- this.cache.once('error', err => {
- this.cleanup();
- // drain input
- this.relaxedBody.unpipe(this.cache);
- this.relaxedBody.on('readable', () => {
- while (this.relaxedBody.read() !== null) {
- // do nothing
- }
- });
- this.errored = true;
- // emit error
- this.output.emit('error', err);
- });
- this.cache.once('close', () => {
- this.sendSignedOutput();
- });
- this.relaxedBody.removeAllListeners('readable');
- this.relaxedBody.pipe(this.cache);
- }
- signStream() {
- this.parser = new MessageParser();
- this.relaxedBody = new RelaxedBody({
- hashAlgo: this.hashAlgo
- });
- this.parser.on('headers', value => {
- this.headers = value;
- });
- this.relaxedBody.on('hash', value => {
- this.bodyHash = value;
- });
- this.relaxedBody.on('readable', () => {
- let chunk;
- if (this.cache) {
- return;
- }
- while ((chunk = this.relaxedBody.read()) !== null) {
- this.chunks.push(chunk);
- this.chunklen += chunk.length;
- if (this.chunklen >= this.cacheTreshold && this.cachePath) {
- return this.createWriteCache();
- }
- }
- });
- this.relaxedBody.on('end', () => {
- if (this.cache) {
- return;
- }
- this.sendSignedOutput();
- });
- this.parser.pipe(this.relaxedBody);
- setImmediate(() => this.input.pipe(this.parser));
- }
- }
- class DKIM {
- constructor(options) {
- this.options = options || {};
- this.keys = [].concat(
- this.options.keys || {
- domainName: options.domainName,
- keySelector: options.keySelector,
- privateKey: options.privateKey
- }
- );
- }
- sign(input, extraOptions) {
- let output = new PassThrough();
- let inputStream = input;
- let writeValue = false;
- if (Buffer.isBuffer(input)) {
- writeValue = input;
- inputStream = new PassThrough();
- } else if (typeof input === 'string') {
- writeValue = Buffer.from(input);
- inputStream = new PassThrough();
- }
- let options = this.options;
- if (extraOptions && Object.keys(extraOptions).length) {
- options = {};
- Object.keys(this.options || {}).forEach(key => {
- options[key] = this.options[key];
- });
- Object.keys(extraOptions || {}).forEach(key => {
- if (!(key in options)) {
- options[key] = extraOptions[key];
- }
- });
- }
- let signer = new DKIMSigner(options, this.keys, inputStream, output);
- setImmediate(() => {
- signer.signStream();
- if (writeValue) {
- setImmediate(() => {
- inputStream.end(writeValue);
- });
- }
- });
- return output;
- }
- }
- module.exports = DKIM;
|