pool-resource.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. 'use strict';
  2. const SMTPConnection = require('../smtp-connection');
  3. const assign = require('../shared').assign;
  4. const XOAuth2 = require('../xoauth2');
  5. const EventEmitter = require('events');
  6. /**
  7. * Creates an element for the pool
  8. *
  9. * @constructor
  10. * @param {Object} options SMTPPool instance
  11. */
  12. class PoolResource extends EventEmitter {
  13. constructor(pool) {
  14. super();
  15. this.pool = pool;
  16. this.options = pool.options;
  17. this.logger = this.pool.logger;
  18. if (this.options.auth) {
  19. switch ((this.options.auth.type || '').toString().toUpperCase()) {
  20. case 'OAUTH2': {
  21. let oauth2 = new XOAuth2(this.options.auth, this.logger);
  22. oauth2.provisionCallback = (this.pool.mailer && this.pool.mailer.get('oauth2_provision_cb')) || oauth2.provisionCallback;
  23. this.auth = {
  24. type: 'OAUTH2',
  25. user: this.options.auth.user,
  26. oauth2,
  27. method: 'XOAUTH2'
  28. };
  29. oauth2.on('token', token => this.pool.mailer.emit('token', token));
  30. oauth2.on('error', err => this.emit('error', err));
  31. break;
  32. }
  33. default:
  34. if (!this.options.auth.user && !this.options.auth.pass) {
  35. break;
  36. }
  37. this.auth = {
  38. type: (this.options.auth.type || '').toString().toUpperCase() || 'LOGIN',
  39. user: this.options.auth.user,
  40. credentials: {
  41. user: this.options.auth.user || '',
  42. pass: this.options.auth.pass,
  43. options: this.options.auth.options
  44. },
  45. method: (this.options.auth.method || '').trim().toUpperCase() || this.options.authMethod || false
  46. };
  47. }
  48. }
  49. this._connection = false;
  50. this._connected = false;
  51. this.messages = 0;
  52. this.available = true;
  53. }
  54. /**
  55. * Initiates a connection to the SMTP server
  56. *
  57. * @param {Function} callback Callback function to run once the connection is established or failed
  58. */
  59. connect(callback) {
  60. this.pool.getSocket(this.options, (err, socketOptions) => {
  61. if (err) {
  62. return callback(err);
  63. }
  64. let returned = false;
  65. let options = this.options;
  66. if (socketOptions && socketOptions.connection) {
  67. this.logger.info(
  68. {
  69. tnx: 'proxy',
  70. remoteAddress: socketOptions.connection.remoteAddress,
  71. remotePort: socketOptions.connection.remotePort,
  72. destHost: options.host || '',
  73. destPort: options.port || '',
  74. action: 'connected'
  75. },
  76. 'Using proxied socket from %s:%s to %s:%s',
  77. socketOptions.connection.remoteAddress,
  78. socketOptions.connection.remotePort,
  79. options.host || '',
  80. options.port || ''
  81. );
  82. options = assign(false, options);
  83. Object.keys(socketOptions).forEach(key => {
  84. options[key] = socketOptions[key];
  85. });
  86. }
  87. this.connection = new SMTPConnection(options);
  88. this.connection.once('error', err => {
  89. this.emit('error', err);
  90. if (returned) {
  91. return;
  92. }
  93. returned = true;
  94. return callback(err);
  95. });
  96. this.connection.once('end', () => {
  97. this.close();
  98. if (returned) {
  99. return;
  100. }
  101. returned = true;
  102. let timer = setTimeout(() => {
  103. if (returned) {
  104. return;
  105. }
  106. // still have not returned, this means we have an unexpected connection close
  107. let err = new Error('Unexpected socket close');
  108. if (this.connection && this.connection._socket && this.connection._socket.upgrading) {
  109. // starttls connection errors
  110. err.code = 'ETLS';
  111. }
  112. callback(err);
  113. }, 1000);
  114. try {
  115. timer.unref();
  116. } catch (E) {
  117. // Ignore. Happens on envs with non-node timer implementation
  118. }
  119. });
  120. this.connection.connect(() => {
  121. if (returned) {
  122. return;
  123. }
  124. if (this.auth) {
  125. this.connection.login(this.auth, err => {
  126. if (returned) {
  127. return;
  128. }
  129. returned = true;
  130. if (err) {
  131. this.connection.close();
  132. this.emit('error', err);
  133. return callback(err);
  134. }
  135. this._connected = true;
  136. callback(null, true);
  137. });
  138. } else {
  139. returned = true;
  140. this._connected = true;
  141. return callback(null, true);
  142. }
  143. });
  144. });
  145. }
  146. /**
  147. * Sends an e-mail to be sent using the selected settings
  148. *
  149. * @param {Object} mail Mail object
  150. * @param {Function} callback Callback function
  151. */
  152. send(mail, callback) {
  153. if (!this._connected) {
  154. return this.connect(err => {
  155. if (err) {
  156. return callback(err);
  157. }
  158. return this.send(mail, callback);
  159. });
  160. }
  161. let envelope = mail.message.getEnvelope();
  162. let messageId = mail.message.messageId();
  163. let recipients = [].concat(envelope.to || []);
  164. if (recipients.length > 3) {
  165. recipients.push('...and ' + recipients.splice(2).length + ' more');
  166. }
  167. this.logger.info(
  168. {
  169. tnx: 'send',
  170. messageId,
  171. cid: this.id
  172. },
  173. 'Sending message %s using #%s to <%s>',
  174. messageId,
  175. this.id,
  176. recipients.join(', ')
  177. );
  178. if (mail.data.dsn) {
  179. envelope.dsn = mail.data.dsn;
  180. }
  181. this.connection.send(envelope, mail.message.createReadStream(), (err, info) => {
  182. this.messages++;
  183. if (err) {
  184. this.connection.close();
  185. this.emit('error', err);
  186. return callback(err);
  187. }
  188. info.envelope = {
  189. from: envelope.from,
  190. to: envelope.to
  191. };
  192. info.messageId = messageId;
  193. setImmediate(() => {
  194. let err;
  195. if (this.messages >= this.options.maxMessages) {
  196. err = new Error('Resource exhausted');
  197. err.code = 'EMAXLIMIT';
  198. this.connection.close();
  199. this.emit('error', err);
  200. } else {
  201. this.pool._checkRateLimit(() => {
  202. this.available = true;
  203. this.emit('available');
  204. });
  205. }
  206. });
  207. callback(null, info);
  208. });
  209. }
  210. /**
  211. * Closes the connection
  212. */
  213. close() {
  214. this._connected = false;
  215. if (this.auth && this.auth.oauth2) {
  216. this.auth.oauth2.removeAllListeners();
  217. }
  218. if (this.connection) {
  219. this.connection.close();
  220. }
  221. this.emit('close');
  222. }
  223. }
  224. module.exports = PoolResource;