index.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const PoolResource = require('./pool-resource');
  4. const SMTPConnection = require('../smtp-connection');
  5. const wellKnown = require('../well-known');
  6. const shared = require('../shared');
  7. const packageData = require('../../package.json');
  8. /**
  9. * Creates a SMTP pool transport object for Nodemailer
  10. *
  11. * @constructor
  12. * @param {Object} options SMTP Connection options
  13. */
  14. class SMTPPool extends EventEmitter {
  15. constructor(options) {
  16. super();
  17. options = options || {};
  18. if (typeof options === 'string') {
  19. options = {
  20. url: options
  21. };
  22. }
  23. let urlData;
  24. let service = options.service;
  25. if (typeof options.getSocket === 'function') {
  26. this.getSocket = options.getSocket;
  27. }
  28. if (options.url) {
  29. urlData = shared.parseConnectionUrl(options.url);
  30. service = service || urlData.service;
  31. }
  32. this.options = shared.assign(
  33. false, // create new object
  34. options, // regular options
  35. urlData, // url options
  36. service && wellKnown(service) // wellknown options
  37. );
  38. this.options.maxConnections = this.options.maxConnections || 5;
  39. this.options.maxMessages = this.options.maxMessages || 100;
  40. this.logger = shared.getLogger(this.options, {
  41. component: this.options.component || 'smtp-pool'
  42. });
  43. // temporary object
  44. let connection = new SMTPConnection(this.options);
  45. this.name = 'SMTP (pool)';
  46. this.version = packageData.version + '[client:' + connection.version + ']';
  47. this._rateLimit = {
  48. counter: 0,
  49. timeout: null,
  50. waiting: [],
  51. checkpoint: false,
  52. delta: Number(this.options.rateDelta) || 1000,
  53. limit: Number(this.options.rateLimit) || 0
  54. };
  55. this._closed = false;
  56. this._queue = [];
  57. this._connections = [];
  58. this._connectionCounter = 0;
  59. this.idling = true;
  60. setImmediate(() => {
  61. if (this.idling) {
  62. this.emit('idle');
  63. }
  64. });
  65. }
  66. /**
  67. * Placeholder function for creating proxy sockets. This method immediatelly returns
  68. * without a socket
  69. *
  70. * @param {Object} options Connection options
  71. * @param {Function} callback Callback function to run with the socket keys
  72. */
  73. getSocket(options, callback) {
  74. // return immediatelly
  75. return setImmediate(() => callback(null, false));
  76. }
  77. /**
  78. * Queues an e-mail to be sent using the selected settings
  79. *
  80. * @param {Object} mail Mail object
  81. * @param {Function} callback Callback function
  82. */
  83. send(mail, callback) {
  84. if (this._closed) {
  85. return false;
  86. }
  87. this._queue.push({
  88. mail,
  89. callback
  90. });
  91. if (this.idling && this._queue.length >= this.options.maxConnections) {
  92. this.idling = false;
  93. }
  94. setImmediate(() => this._processMessages());
  95. return true;
  96. }
  97. /**
  98. * Closes all connections in the pool. If there is a message being sent, the connection
  99. * is closed later
  100. */
  101. close() {
  102. let connection;
  103. let len = this._connections.length;
  104. this._closed = true;
  105. // clear rate limit timer if it exists
  106. clearTimeout(this._rateLimit.timeout);
  107. if (!len && !this._queue.length) {
  108. return;
  109. }
  110. // remove all available connections
  111. for (let i = len - 1; i >= 0; i--) {
  112. if (this._connections[i] && this._connections[i].available) {
  113. connection = this._connections[i];
  114. connection.close();
  115. this.logger.info(
  116. {
  117. tnx: 'connection',
  118. cid: connection.id,
  119. action: 'removed'
  120. },
  121. 'Connection #%s removed',
  122. connection.id
  123. );
  124. }
  125. }
  126. if (len && !this._connections.length) {
  127. this.logger.debug(
  128. {
  129. tnx: 'connection'
  130. },
  131. 'All connections removed'
  132. );
  133. }
  134. if (!this._queue.length) {
  135. return;
  136. }
  137. // make sure that entire queue would be cleaned
  138. let invokeCallbacks = () => {
  139. if (!this._queue.length) {
  140. this.logger.debug(
  141. {
  142. tnx: 'connection'
  143. },
  144. 'Pending queue entries cleared'
  145. );
  146. return;
  147. }
  148. let entry = this._queue.shift();
  149. if (entry && typeof entry.callback === 'function') {
  150. try {
  151. entry.callback(new Error('Connection pool was closed'));
  152. } catch (E) {
  153. this.logger.error(
  154. {
  155. err: E,
  156. tnx: 'callback',
  157. cid: connection.id
  158. },
  159. 'Callback error for #%s: %s',
  160. connection.id,
  161. E.message
  162. );
  163. }
  164. }
  165. setImmediate(invokeCallbacks);
  166. };
  167. setImmediate(invokeCallbacks);
  168. }
  169. /**
  170. * Check the queue and available connections. If there is a message to be sent and there is
  171. * an available connection, then use this connection to send the mail
  172. */
  173. _processMessages() {
  174. let connection;
  175. let i, len;
  176. // do nothing if already closed
  177. if (this._closed) {
  178. return;
  179. }
  180. // do nothing if queue is empty
  181. if (!this._queue.length) {
  182. if (!this.idling) {
  183. // no pending jobs
  184. this.idling = true;
  185. this.emit('idle');
  186. }
  187. return;
  188. }
  189. // find first available connection
  190. for (i = 0, len = this._connections.length; i < len; i++) {
  191. if (this._connections[i].available) {
  192. connection = this._connections[i];
  193. break;
  194. }
  195. }
  196. if (!connection && this._connections.length < this.options.maxConnections) {
  197. connection = this._createConnection();
  198. }
  199. if (!connection) {
  200. // no more free connection slots available
  201. this.idling = false;
  202. return;
  203. }
  204. // check if there is free space in the processing queue
  205. if (!this.idling && this._queue.length < this.options.maxConnections) {
  206. this.idling = true;
  207. this.emit('idle');
  208. }
  209. let entry = (connection.queueEntry = this._queue.shift());
  210. entry.messageId = (connection.queueEntry.mail.message.getHeader('message-id') || '').replace(/[<>\s]/g, '');
  211. connection.available = false;
  212. this.logger.debug(
  213. {
  214. tnx: 'pool',
  215. cid: connection.id,
  216. messageId: entry.messageId,
  217. action: 'assign'
  218. },
  219. 'Assigned message <%s> to #%s (%s)',
  220. entry.messageId,
  221. connection.id,
  222. connection.messages + 1
  223. );
  224. if (this._rateLimit.limit) {
  225. this._rateLimit.counter++;
  226. if (!this._rateLimit.checkpoint) {
  227. this._rateLimit.checkpoint = Date.now();
  228. }
  229. }
  230. connection.send(entry.mail, (err, info) => {
  231. // only process callback if current handler is not changed
  232. if (entry === connection.queueEntry) {
  233. try {
  234. entry.callback(err, info);
  235. } catch (E) {
  236. this.logger.error(
  237. {
  238. err: E,
  239. tnx: 'callback',
  240. cid: connection.id
  241. },
  242. 'Callback error for #%s: %s',
  243. connection.id,
  244. E.message
  245. );
  246. }
  247. connection.queueEntry = false;
  248. }
  249. });
  250. }
  251. /**
  252. * Creates a new pool resource
  253. */
  254. _createConnection() {
  255. let connection = new PoolResource(this);
  256. connection.id = ++this._connectionCounter;
  257. this.logger.info(
  258. {
  259. tnx: 'pool',
  260. cid: connection.id,
  261. action: 'conection'
  262. },
  263. 'Created new pool resource #%s',
  264. connection.id
  265. );
  266. // resource comes available
  267. connection.on('available', () => {
  268. this.logger.debug(
  269. {
  270. tnx: 'connection',
  271. cid: connection.id,
  272. action: 'available'
  273. },
  274. 'Connection #%s became available',
  275. connection.id
  276. );
  277. if (this._closed) {
  278. // if already closed run close() that will remove this connections from connections list
  279. this.close();
  280. } else {
  281. // check if there's anything else to send
  282. this._processMessages();
  283. }
  284. });
  285. // resource is terminated with an error
  286. connection.once('error', err => {
  287. if (err.code !== 'EMAXLIMIT') {
  288. this.logger.error(
  289. {
  290. err,
  291. tnx: 'pool',
  292. cid: connection.id
  293. },
  294. 'Pool Error for #%s: %s',
  295. connection.id,
  296. err.message
  297. );
  298. } else {
  299. this.logger.debug(
  300. {
  301. tnx: 'pool',
  302. cid: connection.id,
  303. action: 'maxlimit'
  304. },
  305. 'Max messages limit exchausted for #%s',
  306. connection.id
  307. );
  308. }
  309. if (connection.queueEntry) {
  310. try {
  311. connection.queueEntry.callback(err);
  312. } catch (E) {
  313. this.logger.error(
  314. {
  315. err: E,
  316. tnx: 'callback',
  317. cid: connection.id
  318. },
  319. 'Callback error for #%s: %s',
  320. connection.id,
  321. E.message
  322. );
  323. }
  324. connection.queueEntry = false;
  325. }
  326. // remove the erroneus connection from connections list
  327. this._removeConnection(connection);
  328. this._continueProcessing();
  329. });
  330. connection.once('close', () => {
  331. this.logger.info(
  332. {
  333. tnx: 'connection',
  334. cid: connection.id,
  335. action: 'closed'
  336. },
  337. 'Connection #%s was closed',
  338. connection.id
  339. );
  340. this._removeConnection(connection);
  341. if (connection.queueEntry) {
  342. // If the connection closed when sending, add the message to the queue again
  343. // Note that we must wait a bit.. because the callback of the 'error' handler might be called
  344. // in the next event loop
  345. setTimeout(() => {
  346. if (connection.queueEntry) {
  347. this.logger.debug(
  348. {
  349. tnx: 'pool',
  350. cid: connection.id,
  351. messageId: connection.queueEntry.messageId,
  352. action: 'requeue'
  353. },
  354. 'Re-queued message <%s> for #%s',
  355. connection.queueEntry.messageId,
  356. connection.id
  357. );
  358. this._queue.unshift(connection.queueEntry);
  359. connection.queueEntry = false;
  360. }
  361. this._continueProcessing();
  362. }, 50);
  363. } else {
  364. this._continueProcessing();
  365. }
  366. });
  367. this._connections.push(connection);
  368. return connection;
  369. }
  370. /**
  371. * Continue to process message if the pool hasn't closed
  372. */
  373. _continueProcessing() {
  374. if (this._closed) {
  375. this.close();
  376. } else {
  377. setTimeout(() => this._processMessages(), 100);
  378. }
  379. }
  380. /**
  381. * Remove resource from pool
  382. *
  383. * @param {Object} connection The PoolResource to remove
  384. */
  385. _removeConnection(connection) {
  386. let index = this._connections.indexOf(connection);
  387. if (index !== -1) {
  388. this._connections.splice(index, 1);
  389. }
  390. }
  391. /**
  392. * Checks if connections have hit current rate limit and if so, queues the availability callback
  393. *
  394. * @param {Function} callback Callback function to run once rate limiter has been cleared
  395. */
  396. _checkRateLimit(callback) {
  397. if (!this._rateLimit.limit) {
  398. return callback();
  399. }
  400. let now = Date.now();
  401. if (this._rateLimit.counter < this._rateLimit.limit) {
  402. return callback();
  403. }
  404. this._rateLimit.waiting.push(callback);
  405. if (this._rateLimit.checkpoint <= now - this._rateLimit.delta) {
  406. return this._clearRateLimit();
  407. } else if (!this._rateLimit.timeout) {
  408. this._rateLimit.timeout = setTimeout(() => this._clearRateLimit(), this._rateLimit.delta - (now - this._rateLimit.checkpoint));
  409. this._rateLimit.checkpoint = now;
  410. }
  411. }
  412. /**
  413. * Clears current rate limit limitation and runs paused callback
  414. */
  415. _clearRateLimit() {
  416. clearTimeout(this._rateLimit.timeout);
  417. this._rateLimit.timeout = null;
  418. this._rateLimit.counter = 0;
  419. this._rateLimit.checkpoint = false;
  420. // resume all paused connections
  421. while (this._rateLimit.waiting.length) {
  422. let cb = this._rateLimit.waiting.shift();
  423. setImmediate(cb);
  424. }
  425. }
  426. /**
  427. * Returns true if there are free slots in the queue
  428. */
  429. isIdle() {
  430. return this.idling;
  431. }
  432. /**
  433. * Verifies SMTP configuration
  434. *
  435. * @param {Function} callback Callback function
  436. */
  437. verify(callback) {
  438. let promise;
  439. if (!callback) {
  440. promise = new Promise((resolve, reject) => {
  441. callback = shared.callbackPromise(resolve, reject);
  442. });
  443. }
  444. let auth = new PoolResource(this).auth;
  445. this.getSocket(this.options, (err, socketOptions) => {
  446. if (err) {
  447. return callback(err);
  448. }
  449. let options = this.options;
  450. if (socketOptions && socketOptions.connection) {
  451. this.logger.info(
  452. {
  453. tnx: 'proxy',
  454. remoteAddress: socketOptions.connection.remoteAddress,
  455. remotePort: socketOptions.connection.remotePort,
  456. destHost: options.host || '',
  457. destPort: options.port || '',
  458. action: 'connected'
  459. },
  460. 'Using proxied socket from %s:%s to %s:%s',
  461. socketOptions.connection.remoteAddress,
  462. socketOptions.connection.remotePort,
  463. options.host || '',
  464. options.port || ''
  465. );
  466. options = shared.assign(false, options);
  467. Object.keys(socketOptions).forEach(key => {
  468. options[key] = socketOptions[key];
  469. });
  470. }
  471. let connection = new SMTPConnection(options);
  472. let returned = false;
  473. connection.once('error', err => {
  474. if (returned) {
  475. return;
  476. }
  477. returned = true;
  478. connection.close();
  479. return callback(err);
  480. });
  481. connection.once('end', () => {
  482. if (returned) {
  483. return;
  484. }
  485. returned = true;
  486. return callback(new Error('Connection closed'));
  487. });
  488. let finalize = () => {
  489. if (returned) {
  490. return;
  491. }
  492. returned = true;
  493. connection.quit();
  494. return callback(null, true);
  495. };
  496. connection.connect(() => {
  497. if (returned) {
  498. return;
  499. }
  500. if (auth) {
  501. connection.login(auth, err => {
  502. if (returned) {
  503. return;
  504. }
  505. if (err) {
  506. returned = true;
  507. connection.close();
  508. return callback(err);
  509. }
  510. finalize();
  511. });
  512. } else {
  513. finalize();
  514. }
  515. });
  516. });
  517. return promise;
  518. }
  519. }
  520. // expose to the world
  521. module.exports = SMTPPool;