123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600 |
- 'use strict';
- const EventEmitter = require('events');
- const PoolResource = require('./pool-resource');
- const SMTPConnection = require('../smtp-connection');
- const wellKnown = require('../well-known');
- const shared = require('../shared');
- const packageData = require('../../package.json');
- /**
- * Creates a SMTP pool transport object for Nodemailer
- *
- * @constructor
- * @param {Object} options SMTP Connection options
- */
- class SMTPPool extends EventEmitter {
- constructor(options) {
- super();
- options = options || {};
- if (typeof options === 'string') {
- options = {
- url: options
- };
- }
- let urlData;
- let service = options.service;
- if (typeof options.getSocket === 'function') {
- this.getSocket = options.getSocket;
- }
- if (options.url) {
- urlData = shared.parseConnectionUrl(options.url);
- service = service || urlData.service;
- }
- this.options = shared.assign(
- false, // create new object
- options, // regular options
- urlData, // url options
- service && wellKnown(service) // wellknown options
- );
- this.options.maxConnections = this.options.maxConnections || 5;
- this.options.maxMessages = this.options.maxMessages || 100;
- this.logger = shared.getLogger(this.options, {
- component: this.options.component || 'smtp-pool'
- });
- // temporary object
- let connection = new SMTPConnection(this.options);
- this.name = 'SMTP (pool)';
- this.version = packageData.version + '[client:' + connection.version + ']';
- this._rateLimit = {
- counter: 0,
- timeout: null,
- waiting: [],
- checkpoint: false,
- delta: Number(this.options.rateDelta) || 1000,
- limit: Number(this.options.rateLimit) || 0
- };
- this._closed = false;
- this._queue = [];
- this._connections = [];
- this._connectionCounter = 0;
- this.idling = true;
- setImmediate(() => {
- if (this.idling) {
- this.emit('idle');
- }
- });
- }
- /**
- * Placeholder function for creating proxy sockets. This method immediatelly returns
- * without a socket
- *
- * @param {Object} options Connection options
- * @param {Function} callback Callback function to run with the socket keys
- */
- getSocket(options, callback) {
- // return immediatelly
- return setImmediate(() => callback(null, false));
- }
- /**
- * Queues an e-mail to be sent using the selected settings
- *
- * @param {Object} mail Mail object
- * @param {Function} callback Callback function
- */
- send(mail, callback) {
- if (this._closed) {
- return false;
- }
- this._queue.push({
- mail,
- callback
- });
- if (this.idling && this._queue.length >= this.options.maxConnections) {
- this.idling = false;
- }
- setImmediate(() => this._processMessages());
- return true;
- }
- /**
- * Closes all connections in the pool. If there is a message being sent, the connection
- * is closed later
- */
- close() {
- let connection;
- let len = this._connections.length;
- this._closed = true;
- // clear rate limit timer if it exists
- clearTimeout(this._rateLimit.timeout);
- if (!len && !this._queue.length) {
- return;
- }
- // remove all available connections
- for (let i = len - 1; i >= 0; i--) {
- if (this._connections[i] && this._connections[i].available) {
- connection = this._connections[i];
- connection.close();
- this.logger.info(
- {
- tnx: 'connection',
- cid: connection.id,
- action: 'removed'
- },
- 'Connection #%s removed',
- connection.id
- );
- }
- }
- if (len && !this._connections.length) {
- this.logger.debug(
- {
- tnx: 'connection'
- },
- 'All connections removed'
- );
- }
- if (!this._queue.length) {
- return;
- }
- // make sure that entire queue would be cleaned
- let invokeCallbacks = () => {
- if (!this._queue.length) {
- this.logger.debug(
- {
- tnx: 'connection'
- },
- 'Pending queue entries cleared'
- );
- return;
- }
- let entry = this._queue.shift();
- if (entry && typeof entry.callback === 'function') {
- try {
- entry.callback(new Error('Connection pool was closed'));
- } catch (E) {
- this.logger.error(
- {
- err: E,
- tnx: 'callback',
- cid: connection.id
- },
- 'Callback error for #%s: %s',
- connection.id,
- E.message
- );
- }
- }
- setImmediate(invokeCallbacks);
- };
- setImmediate(invokeCallbacks);
- }
- /**
- * Check the queue and available connections. If there is a message to be sent and there is
- * an available connection, then use this connection to send the mail
- */
- _processMessages() {
- let connection;
- let i, len;
- // do nothing if already closed
- if (this._closed) {
- return;
- }
- // do nothing if queue is empty
- if (!this._queue.length) {
- if (!this.idling) {
- // no pending jobs
- this.idling = true;
- this.emit('idle');
- }
- return;
- }
- // find first available connection
- for (i = 0, len = this._connections.length; i < len; i++) {
- if (this._connections[i].available) {
- connection = this._connections[i];
- break;
- }
- }
- if (!connection && this._connections.length < this.options.maxConnections) {
- connection = this._createConnection();
- }
- if (!connection) {
- // no more free connection slots available
- this.idling = false;
- return;
- }
- // check if there is free space in the processing queue
- if (!this.idling && this._queue.length < this.options.maxConnections) {
- this.idling = true;
- this.emit('idle');
- }
- let entry = (connection.queueEntry = this._queue.shift());
- entry.messageId = (connection.queueEntry.mail.message.getHeader('message-id') || '').replace(/[<>\s]/g, '');
- connection.available = false;
- this.logger.debug(
- {
- tnx: 'pool',
- cid: connection.id,
- messageId: entry.messageId,
- action: 'assign'
- },
- 'Assigned message <%s> to #%s (%s)',
- entry.messageId,
- connection.id,
- connection.messages + 1
- );
- if (this._rateLimit.limit) {
- this._rateLimit.counter++;
- if (!this._rateLimit.checkpoint) {
- this._rateLimit.checkpoint = Date.now();
- }
- }
- connection.send(entry.mail, (err, info) => {
- // only process callback if current handler is not changed
- if (entry === connection.queueEntry) {
- try {
- entry.callback(err, info);
- } catch (E) {
- this.logger.error(
- {
- err: E,
- tnx: 'callback',
- cid: connection.id
- },
- 'Callback error for #%s: %s',
- connection.id,
- E.message
- );
- }
- connection.queueEntry = false;
- }
- });
- }
- /**
- * Creates a new pool resource
- */
- _createConnection() {
- let connection = new PoolResource(this);
- connection.id = ++this._connectionCounter;
- this.logger.info(
- {
- tnx: 'pool',
- cid: connection.id,
- action: 'conection'
- },
- 'Created new pool resource #%s',
- connection.id
- );
- // resource comes available
- connection.on('available', () => {
- this.logger.debug(
- {
- tnx: 'connection',
- cid: connection.id,
- action: 'available'
- },
- 'Connection #%s became available',
- connection.id
- );
- if (this._closed) {
- // if already closed run close() that will remove this connections from connections list
- this.close();
- } else {
- // check if there's anything else to send
- this._processMessages();
- }
- });
- // resource is terminated with an error
- connection.once('error', err => {
- if (err.code !== 'EMAXLIMIT') {
- this.logger.error(
- {
- err,
- tnx: 'pool',
- cid: connection.id
- },
- 'Pool Error for #%s: %s',
- connection.id,
- err.message
- );
- } else {
- this.logger.debug(
- {
- tnx: 'pool',
- cid: connection.id,
- action: 'maxlimit'
- },
- 'Max messages limit exchausted for #%s',
- connection.id
- );
- }
- if (connection.queueEntry) {
- try {
- connection.queueEntry.callback(err);
- } catch (E) {
- this.logger.error(
- {
- err: E,
- tnx: 'callback',
- cid: connection.id
- },
- 'Callback error for #%s: %s',
- connection.id,
- E.message
- );
- }
- connection.queueEntry = false;
- }
- // remove the erroneus connection from connections list
- this._removeConnection(connection);
- this._continueProcessing();
- });
- connection.once('close', () => {
- this.logger.info(
- {
- tnx: 'connection',
- cid: connection.id,
- action: 'closed'
- },
- 'Connection #%s was closed',
- connection.id
- );
- this._removeConnection(connection);
- if (connection.queueEntry) {
- // If the connection closed when sending, add the message to the queue again
- // Note that we must wait a bit.. because the callback of the 'error' handler might be called
- // in the next event loop
- setTimeout(() => {
- if (connection.queueEntry) {
- this.logger.debug(
- {
- tnx: 'pool',
- cid: connection.id,
- messageId: connection.queueEntry.messageId,
- action: 'requeue'
- },
- 'Re-queued message <%s> for #%s',
- connection.queueEntry.messageId,
- connection.id
- );
- this._queue.unshift(connection.queueEntry);
- connection.queueEntry = false;
- }
- this._continueProcessing();
- }, 50);
- } else {
- this._continueProcessing();
- }
- });
- this._connections.push(connection);
- return connection;
- }
- /**
- * Continue to process message if the pool hasn't closed
- */
- _continueProcessing() {
- if (this._closed) {
- this.close();
- } else {
- setTimeout(() => this._processMessages(), 100);
- }
- }
- /**
- * Remove resource from pool
- *
- * @param {Object} connection The PoolResource to remove
- */
- _removeConnection(connection) {
- let index = this._connections.indexOf(connection);
- if (index !== -1) {
- this._connections.splice(index, 1);
- }
- }
- /**
- * Checks if connections have hit current rate limit and if so, queues the availability callback
- *
- * @param {Function} callback Callback function to run once rate limiter has been cleared
- */
- _checkRateLimit(callback) {
- if (!this._rateLimit.limit) {
- return callback();
- }
- let now = Date.now();
- if (this._rateLimit.counter < this._rateLimit.limit) {
- return callback();
- }
- this._rateLimit.waiting.push(callback);
- if (this._rateLimit.checkpoint <= now - this._rateLimit.delta) {
- return this._clearRateLimit();
- } else if (!this._rateLimit.timeout) {
- this._rateLimit.timeout = setTimeout(() => this._clearRateLimit(), this._rateLimit.delta - (now - this._rateLimit.checkpoint));
- this._rateLimit.checkpoint = now;
- }
- }
- /**
- * Clears current rate limit limitation and runs paused callback
- */
- _clearRateLimit() {
- clearTimeout(this._rateLimit.timeout);
- this._rateLimit.timeout = null;
- this._rateLimit.counter = 0;
- this._rateLimit.checkpoint = false;
- // resume all paused connections
- while (this._rateLimit.waiting.length) {
- let cb = this._rateLimit.waiting.shift();
- setImmediate(cb);
- }
- }
- /**
- * Returns true if there are free slots in the queue
- */
- isIdle() {
- return this.idling;
- }
- /**
- * Verifies SMTP configuration
- *
- * @param {Function} callback Callback function
- */
- verify(callback) {
- let promise;
- if (!callback) {
- promise = new Promise((resolve, reject) => {
- callback = shared.callbackPromise(resolve, reject);
- });
- }
- let auth = new PoolResource(this).auth;
- this.getSocket(this.options, (err, socketOptions) => {
- if (err) {
- return callback(err);
- }
- let options = this.options;
- if (socketOptions && socketOptions.connection) {
- this.logger.info(
- {
- tnx: 'proxy',
- remoteAddress: socketOptions.connection.remoteAddress,
- remotePort: socketOptions.connection.remotePort,
- destHost: options.host || '',
- destPort: options.port || '',
- action: 'connected'
- },
- 'Using proxied socket from %s:%s to %s:%s',
- socketOptions.connection.remoteAddress,
- socketOptions.connection.remotePort,
- options.host || '',
- options.port || ''
- );
- options = shared.assign(false, options);
- Object.keys(socketOptions).forEach(key => {
- options[key] = socketOptions[key];
- });
- }
- let connection = new SMTPConnection(options);
- let returned = false;
- connection.once('error', err => {
- if (returned) {
- return;
- }
- returned = true;
- connection.close();
- return callback(err);
- });
- connection.once('end', () => {
- if (returned) {
- return;
- }
- returned = true;
- return callback(new Error('Connection closed'));
- });
- let finalize = () => {
- if (returned) {
- return;
- }
- returned = true;
- connection.quit();
- return callback(null, true);
- };
- connection.connect(() => {
- if (returned) {
- return;
- }
- if (auth) {
- connection.login(auth, err => {
- if (returned) {
- return;
- }
- if (err) {
- returned = true;
- connection.close();
- return callback(err);
- }
- finalize();
- });
- } else {
- finalize();
- }
- });
- });
- return promise;
- }
- }
- // expose to the world
- module.exports = SMTPPool;
|