From 3901c1bd49e08fc6b1a8fe166daa8be6b8c34d6d Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Wed, 13 Jan 2021 16:09:33 +0100 Subject: [PATCH] Add EmailClient --- EmailClient.py | 370 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 EmailClient.py diff --git a/EmailClient.py b/EmailClient.py new file mode 100644 index 0000000..cffb248 --- /dev/null +++ b/EmailClient.py @@ -0,0 +1,370 @@ +# -*- coding: utf-8 -*- + +import logging +import os +import smtplib +import email.utils +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase +from email.encoders import encode_base64 + +log = logging.getLogger(__name__) + + +class EmailClient(object): + """ + Email client + + This class abstract all interactions with the SMTP server. + """ + + smtp_host = None + smtp_port = None + smtp_ssl = None + smtp_tls = None + smtp_user = None + smtp_password = None + smtp_debug = None + + sender_name = None + sender_email = None + + catch_all_addr = False + just_try = False + + encoding = 'utf-8' + + templates = dict() + + def __init__(self, smtp_host=None, smtp_port=None, smtp_ssl=None, smtp_tls=None, smtp_user=None, smtp_password=None, smtp_debug=None, + sender_name=None, sender_email=None, catch_all_addr=None, just_try=None, encoding=None, templates=None): + self.smtp_host = smtp_host if smtp_host else 'localhost' + self.smtp_port = smtp_port if smtp_port else 25 + self.smtp_ssl = True if smtp_ssl else False + self.smtp_tls = True if smtp_tls else False + self.smtp_user = smtp_user if smtp_user else None + self.smtp_password = smtp_password if smtp_password else None + self.smtp_debug = True if smtp_debug else False + + self.sender_name = sender_name if sender_name else "No reply" + self.sender_email = sender_email if sender_email else "noreply@localhost" + self.catch_all_addr = catch_all_addr if catch_all_addr else False + self.just_try = just_try if just_try else False + + assert templates is None or isinstance(templates, dict) + self.templates = templates if templates else dict() + + if encoding: + self.encoding = encoding + + def forge_message(self, rcpt_to, subject=None, html_body=None, text_body=None, attachment_files=None, + sender_name=None, sender_email=None, encoding=None, template=None, **template_vars): # pylint: disable=too-many-arguments,too-many-locals + """ + Forge a message + + :param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient. + :param subject: The subject of the email. + :param html_body: The HTML body of the email + :param text_body: The plain text body of the email + :param attachment_files: List of filepaths to attach + :param sender_name: Custom sender name (default: as defined on initialization) + :param sender_email: Custom sender email (default: as defined on initialization) + :param encoding: Email content encoding (default: as defined on initialization) + :param template: The name of a template to use to forge this email + + All other parameters will be consider as template variables. + """ + msg = MIMEMultipart('alternative') + msg['To'] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to + msg['From'] = email.utils.formataddr((sender_email or self.sender_name, sender_email or self.sender_email)) + if subject: + msg['Subject'] = subject.format(**template_vars) + msg['Date'] = email.utils.formatdate(None, True) + encoding = encoding if encoding else self.encoding + if template: + assert template in templates, "Unknwon template %s" % template + # Handle subject from template + if not subject: + assert templates[template].get('subject'), 'No subject defined in template %s' % template + msg['Subject'] = templates[template]['subject'].format(**template_vars) + + # Put HTML part in last one to prefered it + parts = [] + if templates[template].get('text'): + parts.append((templates[template].get('text'), 'plain')) + if templates[template].get('html'): + parts.append((templates[template].get('html'), 'html')) + + for body, mime_type in parts: + msg.attach(MIMEText(body.format(**template_vars).encode(encoding), mime_type, _charset=encoding)) + else: + assert subject, 'No subject provided' + if text_body: + msg.attach(MIMEText(text_body.encode(encoding), 'plain', _charset=encoding)) + if html_body: + msg.attach(MIMEText(html_body.encode(encoding), 'html', _charset=encoding)) + if attachment_files: + for filepath in attachment_files: + with open(filepath, 'rb') as fp: + part = MIMEBase('application', "octet-stream") + part.set_payload(fp.read()) + encode_base64(part) + part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(filepath)) + msg.attach(part) + return msg + + def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args): + """ + Send an email + + :param rcpt_to: The recipient of the email. Could be a tuple(name, email) + or just the email of the recipient. + :param msg: The message of this email (as MIMEBase or derivated classes) + :param subject: The subject of the email (only if the message is not provided + using msg parameter) + :param just_try: Enable just try mode (do not really send email, default: as defined on initialization) + + All other parameters will be consider as parameters to forge the message + (only if the message is not provided using msg parameter). + """ + msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args) + + if just_try or self.just_try: + log.debug('Just-try mode: do not really send this email to %s (subject="%s")', rcpt_to, subject or msg.get('subject', 'No subject')) + return True + + if self.catch_all_addr: + catch_addr = self.catch_all_addr + log.debug('Catch email originaly send to %s to %s', rcpt_to, catch_addr) + rcpt_to = catch_addr + + try: + if self.smtp_ssl: + logging.info("Establish SSL connection to server %s:%s", self.smtp_host, self.smtp_port) + server = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) + else: + logging.info("Establish connection to server %s:%s", self.smtp_host, self.smtp_port) + server = smtplib.SMTP(self.smtp_host, self.smtp_port) + if self.smtp_tls: + logging.info('Start TLS on SMTP connection') + server.starttls() + except smtplib.SMTPException: + log.error('Error connecting to SMTP server %s:%s', self.smtp_host, self.smtp_port, exc_info=True) + return False + + if self.smtp_debug: + server.set_debuglevel(True) + + if self.smtp_user and self.smtp_password: + try: + log.info('Try to authenticate on SMTP connection as %s', self.smtp_user) + server.login(self.smtp_user, self.smtp_password) + except smtplib.SMTPException: + log.error('Error authenticating on SMTP server %s:%s with user %s', self.smtp_host, self.smtp_port, self.smtp_user, exc_info=True) + return False + + error = False + try: + log.info('Sending email to %s', rcpt_to) + server.sendmail(self.sender_email, [rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to], msg.as_string()) + except smtplib.SMTPException: + error = True + log.error('Error sending email to %s', rcpt_to, exc_info=True) + finally: + server.quit() + + return not error + + +if __name__ == '__main__': + """ Run tests """ + import datetime + import sys + + import argparse + + # Options parser + parser = argparse.ArgumentParser() + + parser.add_argument( + '-v', '--verbose', + action="store_true", + dest="verbose", + help="Enable verbose mode" + ) + + parser.add_argument( + '-d', '--debug', + action="store_true", + dest="debug", + help="Enable debug mode" + ) + + parser.add_argument( + '-l', '--log-file', + action="store", + type=str, + dest="logfile", + help="Log file path" + ) + + parser.add_argument( + '-j', '--just-try', + action="store_true", + dest="just_try", + help="Enable just-try mode" + ) + + email_opts = parser.add_argument_group('Email options') + + email_opts.add_argument( + '-H', '--smtp-host', + action="store", + type=str, + dest="email_smtp_host", + help="SMTP host" + ) + + email_opts.add_argument( + '-P', '--smtp-port', + action="store", + type=int, + dest="email_smtp_port", + help="SMTP port" + ) + + email_opts.add_argument( + '-S', '--smtp-ssl', + action="store_true", + dest="email_smtp_ssl", + help="Use SSL" + ) + + email_opts.add_argument( + '-T', '--smtp-tls', + action="store_true", + dest="email_smtp_tls", + help="Use TLS" + ) + + email_opts.add_argument( + '-u', '--smtp-user', + action="store", + type=str, + dest="email_smtp_user", + help="SMTP username" + ) + + email_opts.add_argument( + '-p', '--smtp-password', + action="store", + type=str, + dest="email_smtp_password", + help="SMTP password" + ) + + email_opts.add_argument( + '-D', '--smtp-debug', + action="store_true", + dest="email_smtp_debug", + help="Debug SMTP connection" + ) + + email_opts.add_argument( + '-e', '--email-encoding', + action="store", + type=str, + dest="email_encoding", + help="SMTP encoding" + ) + + email_opts.add_argument( + '-f', '--sender-name', + action="store", + type=str, + dest="email_sender_name", + help="Sender name" + ) + + email_opts.add_argument( + '-F', '--sender-email', + action="store", + type=str, + dest="email_sender_email", + help="Sender email" + ) + + email_opts.add_argument( + '-C', '--catch-all', + action="store", + type=str, + dest="email_catch_all", + help="Catch all sent email: specify catch recipient email address" + ) + + test_opts = parser.add_argument_group('Test email options') + + test_opts.add_argument( + '-t', '--to', + action="store", + type=str, + dest="test_to", + help="Test email recipient", + ) + + options = parser.parse_args() + + if not options.test_to: + parser.error('You must specify test email recipient using -t/--to parameter') + sys.exit(1) + + # Initialize logs + logformat = '%(asctime)s - Test EmailClient - %(levelname)s - %(message)s' + if options.debug: + loglevel = logging.DEBUG + elif options.verbose: + loglevel = logging.INFO + else: + loglevel = logging.WARNING + + if options.logfile: + logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat) + else: + logging.basicConfig(level=loglevel, format=logformat) + + if options.email_smtp_user and not options.email_smtp_password: + import getpass + options.email_smtp_password = getpass.getpass('Please enter SMTP password: ') + + templates = dict( + test=dict( + subject="Test email", + text="Just a test email sent at {sent_date}.", + html="Just a test email." + ) + ) + + logging.info('Initialize Email client') + email_client = EmailClient( + smtp_host=options.email_smtp_host, + smtp_port=options.email_smtp_port, + smtp_ssl=options.email_smtp_ssl, + smtp_tls=options.email_smtp_tls, + smtp_user=options.email_smtp_user, + smtp_password=options.email_smtp_password, + smtp_debug=options.email_smtp_debug, + sender_name=options.email_sender_name, + sender_email=options.email_sender_email, + catch_all_addr=options.email_catch_all, + just_try=options.just_try, + encoding=options.email_encoding, + templates=templates + ) + + logging.info('Send a test email to %s', options.test_to) + if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()): + logging.info('Test email sent') + sys.exit(0) + logging.error('Fail to send test email') + sys.exit(1)