# -*- coding: utf-8 -*- """ Email client """ import logging import os import smtplib import email.utils from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.encoders import encode_base64 from mako.template import Template as MakoTemplate log = logging.getLogger(__name__) class EmailClient(object): # pylint: disable=useless-object-inheritance,too-many-instance-attributes """ Email client This class abstract all interactions with the SMTP server. """ smtp_host = None smtp_port = None smtp_ssl = None smtp_tls = None smtp_user = None smtp_password = None smtp_debug = None sender_name = None sender_email = None catch_all_addr = False just_try = False encoding = 'utf-8' templates = dict() def __init__(self, smtp_host=None, smtp_port=None, smtp_ssl=None, smtp_tls=None, smtp_user=None, smtp_password=None, smtp_debug=None, sender_name=None, sender_email=None, catch_all_addr=None, just_try=None, encoding=None, templates=None): self.smtp_host = smtp_host if smtp_host else 'localhost' self.smtp_port = smtp_port if smtp_port else 25 self.smtp_ssl = bool(smtp_ssl) self.smtp_tls = bool(smtp_tls) self.smtp_user = smtp_user if smtp_user else None self.smtp_password = smtp_password if smtp_password else None self.smtp_debug = bool(smtp_debug) self.sender_name = sender_name if sender_name else "No reply" self.sender_email = sender_email if sender_email else "noreply@localhost" self.catch_all_addr = catch_all_addr if catch_all_addr else False self.just_try = just_try if just_try else False assert templates is None or isinstance(templates, dict) self.templates = templates if templates else dict() if encoding: self.encoding = encoding def forge_message(self, rcpt_to, subject=None, html_body=None, text_body=None, attachment_files=None, attachment_payloads=None, sender_name=None, sender_email=None, encoding=None, template=None, **template_vars): # pylint: disable=too-many-arguments,too-many-locals """ Forge a message :param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient. :param subject: The subject of the email. :param html_body: The HTML body of the email :param text_body: The plain text body of the email :param attachment_files: List of filepaths to attach :param attachment_payloads: List of tuples with filename and payload to attach :param sender_name: Custom sender name (default: as defined on initialization) :param sender_email: Custom sender email (default: as defined on initialization) :param encoding: Email content encoding (default: as defined on initialization) :param template: The name of a template to use to forge this email All other parameters will be consider as template variables. """ msg = MIMEMultipart('alternative') msg['To'] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to msg['From'] = email.utils.formataddr((sender_name or self.sender_name, sender_email or self.sender_email)) if subject: msg['Subject'] = subject.format(**template_vars) msg['Date'] = email.utils.formatdate(None, True) encoding = encoding if encoding else self.encoding if template: assert template in self.templates, "Unknwon template %s" % template # Handle subject from template if not subject: assert self.templates[template].get('subject'), 'No subject defined in template %s' % template msg['Subject'] = self.templates[template]['subject'].format(**template_vars) # Put HTML part in last one to prefered it parts = [] if self.templates[template].get('text'): if isinstance(self.templates[template]['text'], MakoTemplate): parts.append((self.templates[template]['text'].render(**template_vars), 'plain')) else: parts.append((self.templates[template]['text'].format(**template_vars), 'plain')) if self.templates[template].get('html'): if isinstance(self.templates[template]['html'], MakoTemplate): parts.append((self.templates[template]['html'].render(**template_vars), 'html')) else: parts.append((self.templates[template]['html'].format(**template_vars), 'html')) for body, mime_type in parts: msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding)) else: assert subject, 'No subject provided' if text_body: msg.attach(MIMEText(text_body.encode(encoding), 'plain', _charset=encoding)) if html_body: msg.attach(MIMEText(html_body.encode(encoding), 'html', _charset=encoding)) if attachment_files: for filepath in attachment_files: with open(filepath, 'rb') as fp: part = MIMEBase('application', "octet-stream") part.set_payload(fp.read()) encode_base64(part) part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(filepath)) msg.attach(part) if attachment_payloads: for filename, payload in attachment_payloads: part = MIMEBase('application', "octet-stream") part.set_payload(payload) encode_base64(part) part.add_header('Content-Disposition', 'attachment; filename="%s"' % filename) msg.attach(part) return msg def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args): """ Send an email :param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient. :param msg: The message of this email (as MIMEBase or derivated classes) :param subject: The subject of the email (only if the message is not provided using msg parameter) :param just_try: Enable just try mode (do not really send email, default: as defined on initialization) All other parameters will be consider as parameters to forge the message (only if the message is not provided using msg parameter). """ msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args) if just_try or self.just_try: log.debug('Just-try mode: do not really send this email to %s (subject="%s")', rcpt_to, subject or msg.get('subject', 'No subject')) return True if self.catch_all_addr: catch_addr = self.catch_all_addr log.debug('Catch email originaly send to %s to %s', rcpt_to, catch_addr) rcpt_to = catch_addr try: if self.smtp_ssl: logging.info("Establish SSL connection to server %s:%s", self.smtp_host, self.smtp_port) server = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) else: logging.info("Establish connection to server %s:%s", self.smtp_host, self.smtp_port) server = smtplib.SMTP(self.smtp_host, self.smtp_port) if self.smtp_tls: logging.info('Start TLS on SMTP connection') server.starttls() except smtplib.SMTPException: log.error('Error connecting to SMTP server %s:%s', self.smtp_host, self.smtp_port, exc_info=True) return False if self.smtp_debug: server.set_debuglevel(True) if self.smtp_user and self.smtp_password: try: log.info('Try to authenticate on SMTP connection as %s', self.smtp_user) server.login(self.smtp_user, self.smtp_password) except smtplib.SMTPException: log.error('Error authenticating on SMTP server %s:%s with user %s', self.smtp_host, self.smtp_port, self.smtp_user, exc_info=True) return False error = False try: log.info('Sending email to %s', rcpt_to) server.sendmail(self.sender_email, [rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to], msg.as_string()) except smtplib.SMTPException: error = True log.error('Error sending email to %s', rcpt_to, exc_info=True) finally: server.quit() return not error if __name__ == '__main__': # Run tests import datetime import sys import argparse # Options parser parser = argparse.ArgumentParser() parser.add_argument( '-v', '--verbose', action="store_true", dest="verbose", help="Enable verbose mode" ) parser.add_argument( '-d', '--debug', action="store_true", dest="debug", help="Enable debug mode" ) parser.add_argument( '-l', '--log-file', action="store", type=str, dest="logfile", help="Log file path" ) parser.add_argument( '-j', '--just-try', action="store_true", dest="just_try", help="Enable just-try mode" ) email_opts = parser.add_argument_group('Email options') email_opts.add_argument( '-H', '--smtp-host', action="store", type=str, dest="email_smtp_host", help="SMTP host" ) email_opts.add_argument( '-P', '--smtp-port', action="store", type=int, dest="email_smtp_port", help="SMTP port" ) email_opts.add_argument( '-S', '--smtp-ssl', action="store_true", dest="email_smtp_ssl", help="Use SSL" ) email_opts.add_argument( '-T', '--smtp-tls', action="store_true", dest="email_smtp_tls", help="Use TLS" ) email_opts.add_argument( '-u', '--smtp-user', action="store", type=str, dest="email_smtp_user", help="SMTP username" ) email_opts.add_argument( '-p', '--smtp-password', action="store", type=str, dest="email_smtp_password", help="SMTP password" ) email_opts.add_argument( '-D', '--smtp-debug', action="store_true", dest="email_smtp_debug", help="Debug SMTP connection" ) email_opts.add_argument( '-e', '--email-encoding', action="store", type=str, dest="email_encoding", help="SMTP encoding" ) email_opts.add_argument( '-f', '--sender-name', action="store", type=str, dest="email_sender_name", help="Sender name" ) email_opts.add_argument( '-F', '--sender-email', action="store", type=str, dest="email_sender_email", help="Sender email" ) email_opts.add_argument( '-C', '--catch-all', action="store", type=str, dest="email_catch_all", help="Catch all sent email: specify catch recipient email address" ) test_opts = parser.add_argument_group('Test email options') test_opts.add_argument( '-t', '--to', action="store", type=str, dest="test_to", help="Test email recipient", ) test_opts.add_argument( '-m', '--mako', action="store_true", dest="test_mako", help="Test mako templating", ) options = parser.parse_args() if not options.test_to: parser.error('You must specify test email recipient using -t/--to parameter') sys.exit(1) # Initialize logs logformat = '%(asctime)s - Test EmailClient - %(levelname)s - %(message)s' if options.debug: loglevel = logging.DEBUG elif options.verbose: loglevel = logging.INFO else: loglevel = logging.WARNING if options.logfile: logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat) else: logging.basicConfig(level=loglevel, format=logformat) if options.email_smtp_user and not options.email_smtp_password: import getpass options.email_smtp_password = getpass.getpass('Please enter SMTP password: ') logging.info('Initialize Email client') email_client = EmailClient( smtp_host=options.email_smtp_host, smtp_port=options.email_smtp_port, smtp_ssl=options.email_smtp_ssl, smtp_tls=options.email_smtp_tls, smtp_user=options.email_smtp_user, smtp_password=options.email_smtp_password, smtp_debug=options.email_smtp_debug, sender_name=options.email_sender_name, sender_email=options.email_sender_email, catch_all_addr=options.email_catch_all, just_try=options.just_try, encoding=options.email_encoding, templates=dict( test=dict( subject="Test email", text=( "Just a test email sent at {sent_date}." if not options.test_mako else MakoTemplate("Just a test email sent at ${sent_date}.") ), html=( "Just a test email. (sent at {sent_date})" if not options.test_mako else MakoTemplate("Just a test email. (sent at ${sent_date})") ) ) ) ) logging.info('Send a test email to %s', options.test_to) if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()): logging.info('Test email sent') sys.exit(0) logging.error('Fail to send test email') sys.exit(1)