123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- 'use strict';
- const EventEmitter = require('events');
- const packageData = require('../../package.json');
- const shared = require('../shared');
- const LeWindows = require('../sendmail-transport/le-windows');
- /**
- * Generates a Transport object for AWS SES
- *
- * Possible options can be the following:
- *
- * * **sendingRate** optional Number specifying how many messages per second should be delivered to SES
- * * **maxConnections** optional Number specifying max number of parallel connections to SES
- *
- * @constructor
- * @param {Object} optional config parameter
- */
- class SESTransport extends EventEmitter {
- constructor(options) {
- super();
- options = options || {};
- this.options = options || {};
- this.ses = this.options.SES;
- this.name = 'SESTransport';
- this.version = packageData.version;
- this.logger = shared.getLogger(this.options, {
- component: this.options.component || 'ses-transport'
- });
- // parallel sending connections
- this.maxConnections = Number(this.options.maxConnections) || Infinity;
- this.connections = 0;
- // max messages per second
- this.sendingRate = Number(this.options.sendingRate) || Infinity;
- this.sendingRateTTL = null;
- this.rateInterval = 1000;
- this.rateMessages = [];
- this.pending = [];
- this.idling = true;
- setImmediate(() => {
- if (this.idling) {
- this.emit('idle');
- }
- });
- }
- /**
- * Schedules a sending of a message
- *
- * @param {Object} emailMessage MailComposer object
- * @param {Function} callback Callback function to run when the sending is completed
- */
- send(mail, callback) {
- if (this.connections >= this.maxConnections) {
- this.idling = false;
- return this.pending.push({
- mail,
- callback
- });
- }
- if (!this._checkSendingRate()) {
- this.idling = false;
- return this.pending.push({
- mail,
- callback
- });
- }
- this._send(mail, (...args) => {
- setImmediate(() => callback(...args));
- this._sent();
- });
- }
- _checkRatedQueue() {
- if (this.connections >= this.maxConnections || !this._checkSendingRate()) {
- return;
- }
- if (!this.pending.length) {
- if (!this.idling) {
- this.idling = true;
- this.emit('idle');
- }
- return;
- }
- let next = this.pending.shift();
- this._send(next.mail, (...args) => {
- setImmediate(() => next.callback(...args));
- this._sent();
- });
- }
- _checkSendingRate() {
- clearTimeout(this.sendingRateTTL);
- let now = Date.now();
- let oldest = false;
- // delete older messages
- for (let i = this.rateMessages.length - 1; i >= 0; i--) {
- if (this.rateMessages[i].ts >= now - this.rateInterval && (!oldest || this.rateMessages[i].ts < oldest)) {
- oldest = this.rateMessages[i].ts;
- }
- if (this.rateMessages[i].ts < now - this.rateInterval && !this.rateMessages[i].pending) {
- this.rateMessages.splice(i, 1);
- }
- }
- if (this.rateMessages.length < this.sendingRate) {
- return true;
- }
- let delay = Math.max(oldest + 1001, now + 20);
- this.sendingRateTTL = setTimeout(() => this._checkRatedQueue(), now - delay);
- try {
- this.sendingRateTTL.unref();
- } catch (E) {
- // Ignore. Happens on envs with non-node timer implementation
- }
- return false;
- }
- _sent() {
- this.connections--;
- this._checkRatedQueue();
- }
- /**
- * Returns true if there are free slots in the queue
- */
- isIdle() {
- return this.idling;
- }
- /**
- * Compiles a mailcomposer message and forwards it to SES
- *
- * @param {Object} emailMessage MailComposer object
- * @param {Function} callback Callback function to run when the sending is completed
- */
- _send(mail, callback) {
- let statObject = {
- ts: Date.now(),
- pending: true
- };
- this.connections++;
- this.rateMessages.push(statObject);
- let envelope = mail.data.envelope || mail.message.getEnvelope();
- let messageId = mail.message.messageId();
- let recipients = [].concat(envelope.to || []);
- if (recipients.length > 3) {
- recipients.push('...and ' + recipients.splice(2).length + ' more');
- }
- this.logger.info(
- {
- tnx: 'send',
- messageId
- },
- 'Sending message %s to <%s>',
- messageId,
- recipients.join(', ')
- );
- let getRawMessage = next => {
- // do not use Message-ID and Date in DKIM signature
- if (!mail.data._dkim) {
- mail.data._dkim = {};
- }
- if (mail.data._dkim.skipFields && typeof mail.data._dkim.skipFields === 'string') {
- mail.data._dkim.skipFields += ':date:message-id';
- } else {
- mail.data._dkim.skipFields = 'date:message-id';
- }
- let sourceStream = mail.message.createReadStream();
- let stream = sourceStream.pipe(new LeWindows());
- let chunks = [];
- let chunklen = 0;
- stream.on('readable', () => {
- let chunk;
- while ((chunk = stream.read()) !== null) {
- chunks.push(chunk);
- chunklen += chunk.length;
- }
- });
- sourceStream.once('error', err => stream.emit('error', err));
- stream.once('error', err => {
- next(err);
- });
- stream.once('end', () => next(null, Buffer.concat(chunks, chunklen)));
- };
- setImmediate(() =>
- getRawMessage((err, raw) => {
- if (err) {
- this.logger.error(
- {
- err,
- tnx: 'send',
- messageId
- },
- 'Failed creating message for %s. %s',
- messageId,
- err.message
- );
- statObject.pending = false;
- return callback(err);
- }
- let sesMessage = {
- RawMessage: {
- // required
- Data: raw // required
- },
- Source: envelope.from,
- Destinations: envelope.to
- };
- Object.keys(mail.data.ses || {}).forEach(key => {
- sesMessage[key] = mail.data.ses[key];
- });
- this.ses.sendRawEmail(sesMessage, (err, data) => {
- if (err) {
- this.logger.error(
- {
- err,
- tnx: 'send'
- },
- 'Send error for %s: %s',
- messageId,
- err.message
- );
- statObject.pending = false;
- return callback(err);
- }
- let region = (this.ses.config && this.ses.config.region) || 'us-east-1';
- if (region === 'us-east-1') {
- region = 'email';
- }
- statObject.pending = false;
- callback(null, {
- envelope: {
- from: envelope.from,
- to: envelope.to
- },
- messageId: '<' + data.MessageId + (!/@/.test(data.MessageId) ? '@' + region + '.amazonses.com' : '') + '>',
- response: data.MessageId,
- raw
- });
- });
- })
- );
- }
- /**
- * Verifies SES configuration
- *
- * @param {Function} callback Callback function
- */
- verify(callback) {
- let promise;
- if (!callback) {
- promise = new Promise((resolve, reject) => {
- callback = shared.callbackPromise(resolve, reject);
- });
- }
- this.ses.sendRawEmail(
- {
- RawMessage: {
- // required
- Data: 'From: invalid@invalid\r\nTo: invalid@invalid\r\n Subject: Invalid\r\n\r\nInvalid'
- },
- Source: 'invalid@invalid',
- Destinations: ['invalid@invalid']
- },
- err => {
- if (err && err.code !== 'InvalidParameterValue') {
- return callback(err);
- }
- return callback(null, true);
- }
- );
- return promise;
- }
- }
- module.exports = SESTransport;
|