index.js 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const packageData = require('../../package.json');
  4. const shared = require('../shared');
  5. const LeWindows = require('../sendmail-transport/le-windows');
  6. /**
  7. * Generates a Transport object for AWS SES
  8. *
  9. * Possible options can be the following:
  10. *
  11. * * **sendingRate** optional Number specifying how many messages per second should be delivered to SES
  12. * * **maxConnections** optional Number specifying max number of parallel connections to SES
  13. *
  14. * @constructor
  15. * @param {Object} optional config parameter
  16. */
  17. class SESTransport extends EventEmitter {
  18. constructor(options) {
  19. super();
  20. options = options || {};
  21. this.options = options || {};
  22. this.ses = this.options.SES;
  23. this.name = 'SESTransport';
  24. this.version = packageData.version;
  25. this.logger = shared.getLogger(this.options, {
  26. component: this.options.component || 'ses-transport'
  27. });
  28. // parallel sending connections
  29. this.maxConnections = Number(this.options.maxConnections) || Infinity;
  30. this.connections = 0;
  31. // max messages per second
  32. this.sendingRate = Number(this.options.sendingRate) || Infinity;
  33. this.sendingRateTTL = null;
  34. this.rateInterval = 1000;
  35. this.rateMessages = [];
  36. this.pending = [];
  37. this.idling = true;
  38. setImmediate(() => {
  39. if (this.idling) {
  40. this.emit('idle');
  41. }
  42. });
  43. }
  44. /**
  45. * Schedules a sending of a message
  46. *
  47. * @param {Object} emailMessage MailComposer object
  48. * @param {Function} callback Callback function to run when the sending is completed
  49. */
  50. send(mail, callback) {
  51. if (this.connections >= this.maxConnections) {
  52. this.idling = false;
  53. return this.pending.push({
  54. mail,
  55. callback
  56. });
  57. }
  58. if (!this._checkSendingRate()) {
  59. this.idling = false;
  60. return this.pending.push({
  61. mail,
  62. callback
  63. });
  64. }
  65. this._send(mail, (...args) => {
  66. setImmediate(() => callback(...args));
  67. this._sent();
  68. });
  69. }
  70. _checkRatedQueue() {
  71. if (this.connections >= this.maxConnections || !this._checkSendingRate()) {
  72. return;
  73. }
  74. if (!this.pending.length) {
  75. if (!this.idling) {
  76. this.idling = true;
  77. this.emit('idle');
  78. }
  79. return;
  80. }
  81. let next = this.pending.shift();
  82. this._send(next.mail, (...args) => {
  83. setImmediate(() => next.callback(...args));
  84. this._sent();
  85. });
  86. }
  87. _checkSendingRate() {
  88. clearTimeout(this.sendingRateTTL);
  89. let now = Date.now();
  90. let oldest = false;
  91. // delete older messages
  92. for (let i = this.rateMessages.length - 1; i >= 0; i--) {
  93. if (this.rateMessages[i].ts >= now - this.rateInterval && (!oldest || this.rateMessages[i].ts < oldest)) {
  94. oldest = this.rateMessages[i].ts;
  95. }
  96. if (this.rateMessages[i].ts < now - this.rateInterval && !this.rateMessages[i].pending) {
  97. this.rateMessages.splice(i, 1);
  98. }
  99. }
  100. if (this.rateMessages.length < this.sendingRate) {
  101. return true;
  102. }
  103. let delay = Math.max(oldest + 1001, now + 20);
  104. this.sendingRateTTL = setTimeout(() => this._checkRatedQueue(), now - delay);
  105. try {
  106. this.sendingRateTTL.unref();
  107. } catch (E) {
  108. // Ignore. Happens on envs with non-node timer implementation
  109. }
  110. return false;
  111. }
  112. _sent() {
  113. this.connections--;
  114. this._checkRatedQueue();
  115. }
  116. /**
  117. * Returns true if there are free slots in the queue
  118. */
  119. isIdle() {
  120. return this.idling;
  121. }
  122. /**
  123. * Compiles a mailcomposer message and forwards it to SES
  124. *
  125. * @param {Object} emailMessage MailComposer object
  126. * @param {Function} callback Callback function to run when the sending is completed
  127. */
  128. _send(mail, callback) {
  129. let statObject = {
  130. ts: Date.now(),
  131. pending: true
  132. };
  133. this.connections++;
  134. this.rateMessages.push(statObject);
  135. let envelope = mail.data.envelope || mail.message.getEnvelope();
  136. let messageId = mail.message.messageId();
  137. let recipients = [].concat(envelope.to || []);
  138. if (recipients.length > 3) {
  139. recipients.push('...and ' + recipients.splice(2).length + ' more');
  140. }
  141. this.logger.info(
  142. {
  143. tnx: 'send',
  144. messageId
  145. },
  146. 'Sending message %s to <%s>',
  147. messageId,
  148. recipients.join(', ')
  149. );
  150. let getRawMessage = next => {
  151. // do not use Message-ID and Date in DKIM signature
  152. if (!mail.data._dkim) {
  153. mail.data._dkim = {};
  154. }
  155. if (mail.data._dkim.skipFields && typeof mail.data._dkim.skipFields === 'string') {
  156. mail.data._dkim.skipFields += ':date:message-id';
  157. } else {
  158. mail.data._dkim.skipFields = 'date:message-id';
  159. }
  160. let sourceStream = mail.message.createReadStream();
  161. let stream = sourceStream.pipe(new LeWindows());
  162. let chunks = [];
  163. let chunklen = 0;
  164. stream.on('readable', () => {
  165. let chunk;
  166. while ((chunk = stream.read()) !== null) {
  167. chunks.push(chunk);
  168. chunklen += chunk.length;
  169. }
  170. });
  171. sourceStream.once('error', err => stream.emit('error', err));
  172. stream.once('error', err => {
  173. next(err);
  174. });
  175. stream.once('end', () => next(null, Buffer.concat(chunks, chunklen)));
  176. };
  177. setImmediate(() =>
  178. getRawMessage((err, raw) => {
  179. if (err) {
  180. this.logger.error(
  181. {
  182. err,
  183. tnx: 'send',
  184. messageId
  185. },
  186. 'Failed creating message for %s. %s',
  187. messageId,
  188. err.message
  189. );
  190. statObject.pending = false;
  191. return callback(err);
  192. }
  193. let sesMessage = {
  194. RawMessage: {
  195. // required
  196. Data: raw // required
  197. },
  198. Source: envelope.from,
  199. Destinations: envelope.to
  200. };
  201. Object.keys(mail.data.ses || {}).forEach(key => {
  202. sesMessage[key] = mail.data.ses[key];
  203. });
  204. this.ses.sendRawEmail(sesMessage, (err, data) => {
  205. if (err) {
  206. this.logger.error(
  207. {
  208. err,
  209. tnx: 'send'
  210. },
  211. 'Send error for %s: %s',
  212. messageId,
  213. err.message
  214. );
  215. statObject.pending = false;
  216. return callback(err);
  217. }
  218. let region = (this.ses.config && this.ses.config.region) || 'us-east-1';
  219. if (region === 'us-east-1') {
  220. region = 'email';
  221. }
  222. statObject.pending = false;
  223. callback(null, {
  224. envelope: {
  225. from: envelope.from,
  226. to: envelope.to
  227. },
  228. messageId: '<' + data.MessageId + (!/@/.test(data.MessageId) ? '@' + region + '.amazonses.com' : '') + '>',
  229. response: data.MessageId,
  230. raw
  231. });
  232. });
  233. })
  234. );
  235. }
  236. /**
  237. * Verifies SES configuration
  238. *
  239. * @param {Function} callback Callback function
  240. */
  241. verify(callback) {
  242. let promise;
  243. if (!callback) {
  244. promise = new Promise((resolve, reject) => {
  245. callback = shared.callbackPromise(resolve, reject);
  246. });
  247. }
  248. this.ses.sendRawEmail(
  249. {
  250. RawMessage: {
  251. // required
  252. Data: 'From: invalid@invalid\r\nTo: invalid@invalid\r\n Subject: Invalid\r\n\r\nInvalid'
  253. },
  254. Source: 'invalid@invalid',
  255. Destinations: ['invalid@invalid']
  256. },
  257. err => {
  258. if (err && err.code !== 'InvalidParameterValue') {
  259. return callback(err);
  260. }
  261. return callback(null, true);
  262. }
  263. );
  264. return promise;
  265. }
  266. }
  267. module.exports = SESTransport;