""" Email client to forge and send emails """ import email.utils import logging import os import smtplib from email.encoders import encode_base64 from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from mako.template import Template as MakoTemplate from mylib.config import ( BooleanOption, ConfigurableObject, IntegerOption, PasswordOption, StringOption, ) log = logging.getLogger(__name__) class EmailClient( ConfigurableObject ): # pylint: disable=useless-object-inheritance,too-many-instance-attributes """ Email client This class abstract all interactions with the SMTP server. """ _config_name = "email" _config_comment = "Email" _defaults = { "smtp_host": "localhost", "smtp_port": 25, "smtp_ssl": False, "smtp_tls": False, "smtp_user": None, "smtp_password": None, "smtp_debug": False, "sender_name": "No reply", "sender_email": "noreply@localhost", "encoding": "utf-8", "catch_all_addr": None, "just_try": False, "templates_path": None, } templates = {} def __init__(self, templates=None, initialize=False, **kwargs): super().__init__(**kwargs) assert templates is None or isinstance(templates, dict) self.templates = templates if templates else {} if initialize: self.initialize() # pylint: disable=arguments-differ,arguments-renamed def configure(self, use_smtp=True, just_try=True, **kwargs): """Configure options on registered mylib.Config object""" section = super().configure(**kwargs) if use_smtp: section.add_option( StringOption, "smtp_host", default=self._defaults["smtp_host"], comment="SMTP server hostname/IP address", ) section.add_option( IntegerOption, "smtp_port", default=self._defaults["smtp_port"], comment="SMTP server port", ) section.add_option( BooleanOption, "smtp_ssl", default=self._defaults["smtp_ssl"], comment="Use SSL on SMTP server connection", ) section.add_option( BooleanOption, "smtp_tls", default=self._defaults["smtp_tls"], comment="Use TLS on SMTP server connection", ) section.add_option( StringOption, "smtp_user", default=self._defaults["smtp_user"], comment="SMTP authentication username", ) section.add_option( PasswordOption, "smtp_password", default=self._defaults["smtp_password"], comment='SMTP authentication password (set to "keyring" to use XDG keyring)', username_option="smtp_user", keyring_value="keyring", ) section.add_option( BooleanOption, "smtp_debug", default=self._defaults["smtp_debug"], comment="Enable SMTP debugging", ) section.add_option( StringOption, "sender_name", default=self._defaults["sender_name"], comment="Sender name", ) section.add_option( StringOption, "sender_email", default=self._defaults["sender_email"], comment="Sender email address", ) section.add_option( StringOption, "encoding", default=self._defaults["encoding"], comment="Email encoding" ) section.add_option( StringOption, "catch_all_addr", default=self._defaults["catch_all_addr"], comment="Catch all sent emails to this specified email address", ) if just_try: section.add_option( BooleanOption, "just_try", default=self._defaults["just_try"], comment="Just-try mode: do not really send emails", ) section.add_option( StringOption, "templates_path", default=self._defaults["templates_path"], comment="Path to templates directory", ) return section def initialize(self, *args, **kwargs): # pylint: disable=arguments-differ """Configuration initialized hook""" super().initialize(*args, **kwargs) self.load_templates_directory() def load_templates_directory(self, templates_path=None): """Load templates from specified directory""" if templates_path is None: templates_path = self._get_option("templates_path") if not templates_path: return log.debug("Load email templates from %s directory", templates_path) for filename in os.listdir(templates_path): filepath = os.path.join(templates_path, filename) if not os.path.isfile(filepath): continue template_name, template_type = os.path.splitext(filename) if template_type not in [".html", ".txt", ".subject"]: continue template_type = "text" if template_type == ".txt" else template_type[1:] if template_name not in self.templates: self.templates[template_name] = {} log.debug("Load email template %s %s from %s", template_name, template_type, filepath) with open(filepath, encoding="utf8") as file_desc: self.templates[template_name][template_type] = MakoTemplate(file_desc.read()) def forge_message( self, recipients, subject=None, html_body=None, text_body=None, # pylint: disable=too-many-arguments,too-many-locals attachment_files=None, attachment_payloads=None, sender_name=None, sender_email=None, encoding=None, template=None, **template_vars, ): """ Forge a message :param recipients: The recipient(s) of the email. List of tuple(name, email) or just the email of the recipients. :param subject: The subject of the email. :param html_body: The HTML body of the email :param text_body: The plain text body of the email :param attachment_files: List of filepaths to attach :param attachment_payloads: List of tuples with filename and payload to attach :param sender_name: Custom sender name (default: as defined on initialization) :param sender_email: Custom sender email (default: as defined on initialization) :param encoding: Email content encoding (default: as defined on initialization) :param template: The name of a template to use to forge this email All other parameters will be consider as template variables. """ recipients = [recipients] if not isinstance(recipients, list) else recipients msg = MIMEMultipart("alternative") msg["To"] = ", ".join( [ email.utils.formataddr(recipient) if isinstance(recipient, tuple) else recipient for recipient in recipients ] ) msg["From"] = email.utils.formataddr( ( sender_name or self._get_option("sender_name"), sender_email or self._get_option("sender_email"), ) ) if subject: msg["Subject"] = ( subject.render(**template_vars) if isinstance(subject, MakoTemplate) else subject.format(**template_vars) ) msg["Date"] = email.utils.formatdate(None, True) encoding = encoding if encoding else self._get_option("encoding") if template: assert template in self.templates, f"Unknwon template {template}" # Handle subject from template if not subject: assert self.templates[template].get( "subject" ), f"No subject defined in template {template}" msg["Subject"] = ( self.templates[template]["subject"].render(**template_vars) if isinstance(self.templates[template]["subject"], MakoTemplate) else self.templates[template]["subject"].format(**template_vars) ) # Put HTML part in last one to prefered it parts = [] if self.templates[template].get("text"): if isinstance(self.templates[template]["text"], MakoTemplate): parts.append( (self.templates[template]["text"].render(**template_vars), "plain") ) else: parts.append( (self.templates[template]["text"].format(**template_vars), "plain") ) if self.templates[template].get("html"): if isinstance(self.templates[template]["html"], MakoTemplate): parts.append((self.templates[template]["html"].render(**template_vars), "html")) else: parts.append((self.templates[template]["html"].format(**template_vars), "html")) for body, mime_type in parts: msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding)) else: assert subject, "No subject provided" if text_body: msg.attach(MIMEText(text_body.encode(encoding), "plain", _charset=encoding)) if html_body: msg.attach(MIMEText(html_body.encode(encoding), "html", _charset=encoding)) if attachment_files: for filepath in attachment_files: with open(filepath, "rb") as fp: part = MIMEBase("application", "octet-stream") part.set_payload(fp.read()) encode_base64(part) part.add_header( "Content-Disposition", f'attachment; filename="{os.path.basename(filepath)}"', ) msg.attach(part) if attachment_payloads: for filename, payload in attachment_payloads: part = MIMEBase("application", "octet-stream") part.set_payload(payload) encode_base64(part) part.add_header("Content-Disposition", f'attachment; filename="{filename}"') msg.attach(part) return msg def send(self, recipients, msg=None, subject=None, just_try=False, **forge_args): """ Send an email :param recipients: The recipient(s) of the email. List of tuple(name, email) or just the email of the recipients. :param msg: The message of this email (as MIMEBase or derivated classes) :param subject: The subject of the email (only if the message is not provided using msg parameter) :param just_try: Enable just try mode (do not really send email, default: as defined on initialization) All other parameters will be consider as parameters to forge the message (only if the message is not provided using msg parameter). """ recipients = [recipients] if not isinstance(recipients, list) else recipients msg = msg if msg else self.forge_message(recipients, subject, **forge_args) if just_try or self._get_option("just_try"): log.debug( 'Just-try mode: do not really send this email to %s (subject="%s")', ", ".join(recipients), subject or msg.get("subject", "No subject"), ) return True catch_addr = self._get_option("catch_all_addr") if catch_addr: log.debug("Catch email originaly send to %s to %s", ", ".join(recipients), catch_addr) recipients = catch_addr if isinstance(catch_addr, list) else list(catch_addr) smtp_host = self._get_option("smtp_host") smtp_port = self._get_option("smtp_port") try: if self._get_option("smtp_ssl"): logging.info("Establish SSL connection to server %s:%s", smtp_host, smtp_port) server = smtplib.SMTP_SSL(smtp_host, smtp_port) else: logging.info("Establish connection to server %s:%s", smtp_host, smtp_port) server = smtplib.SMTP(smtp_host, smtp_port) if self._get_option("smtp_tls"): logging.info("Start TLS on SMTP connection") server.starttls() except smtplib.SMTPException: log.error("Error connecting to SMTP server %s:%s", smtp_host, smtp_port, exc_info=True) return False if self._get_option("smtp_debug"): server.set_debuglevel(True) smtp_user = self._get_option("smtp_user") smtp_password = self._get_option("smtp_password") if smtp_user and smtp_password: try: log.info("Try to authenticate on SMTP connection as %s", smtp_user) server.login(smtp_user, smtp_password) except smtplib.SMTPException: log.error( "Error authenticating on SMTP server %s:%s with user %s", smtp_host, smtp_port, smtp_user, exc_info=True, ) return False error = False try: log.info("Sending email to %s", ", ".join(recipients)) server.sendmail( self._get_option("sender_email"), [ recipient[1] if isinstance(recipient, tuple) else recipient for recipient in recipients ], msg.as_string(), ) except smtplib.SMTPException: error = True log.error("Error sending email to %s", ", ".join(recipients), exc_info=True) finally: server.quit() return not error if __name__ == "__main__": # Run tests import argparse import datetime import sys # Options parser parser = argparse.ArgumentParser() parser.add_argument( "-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode" ) parser.add_argument( "-d", "--debug", action="store_true", dest="debug", help="Enable debug mode" ) parser.add_argument( "-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path" ) parser.add_argument( "-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode" ) email_opts = parser.add_argument_group("Email options") email_opts.add_argument( "-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host" ) email_opts.add_argument( "-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port" ) email_opts.add_argument( "-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL" ) email_opts.add_argument( "-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS" ) email_opts.add_argument( "-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username" ) email_opts.add_argument( "-p", "--smtp-password", action="store", type=str, dest="email_smtp_password", help="SMTP password", ) email_opts.add_argument( "-D", "--smtp-debug", action="store_true", dest="email_smtp_debug", help="Debug SMTP connection", ) email_opts.add_argument( "-e", "--email-encoding", action="store", type=str, dest="email_encoding", help="SMTP encoding", ) email_opts.add_argument( "-f", "--sender-name", action="store", type=str, dest="email_sender_name", help="Sender name", ) email_opts.add_argument( "-F", "--sender-email", action="store", type=str, dest="email_sender_email", help="Sender email", ) email_opts.add_argument( "-C", "--catch-all", action="store", type=str, dest="email_catch_all", help="Catch all sent email: specify catch recipient email address", ) test_opts = parser.add_argument_group("Test email options") test_opts.add_argument( "-t", "--to", action="store", type=str, dest="test_to", help="Test email recipient", ) test_opts.add_argument( "-m", "--mako", action="store_true", dest="test_mako", help="Test mako templating", ) options = parser.parse_args() if not options.test_to: parser.error("You must specify test email recipient using -t/--to parameter") sys.exit(1) # Initialize logs logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s" if options.debug: loglevel = logging.DEBUG elif options.verbose: loglevel = logging.INFO else: loglevel = logging.WARNING if options.logfile: logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat) else: logging.basicConfig(level=loglevel, format=logformat) if options.email_smtp_user and not options.email_smtp_password: import getpass options.email_smtp_password = getpass.getpass("Please enter SMTP password: ") logging.info("Initialize Email client") email_client = EmailClient( smtp_host=options.email_smtp_host, smtp_port=options.email_smtp_port, smtp_ssl=options.email_smtp_ssl, smtp_tls=options.email_smtp_tls, smtp_user=options.email_smtp_user, smtp_password=options.email_smtp_password, smtp_debug=options.email_smtp_debug, sender_name=options.email_sender_name, sender_email=options.email_sender_email, catch_all_addr=options.email_catch_all, just_try=options.just_try, encoding=options.email_encoding, templates=dict( test=dict( subject="Test email", text=( "Just a test email sent at {sent_date}." if not options.test_mako else MakoTemplate("Just a test email sent at ${sent_date}.") ), html=( "Just a test email. (sent at {sent_date})" if not options.test_mako else MakoTemplate( "Just a test email. (sent at ${sent_date})" ) ), ) ), ) logging.info("Send a test email to %s", options.test_to) if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()): logging.info("Test email sent") sys.exit(0) logging.error("Fail to send test email") sys.exit(1)