# -*- coding: utf-8 -*-
""" Email helpers """
import logging
import os
import smtplib
import email.utils
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.encoders import encode_base64
from mako.template import Template as MakoTemplate
log = logging.getLogger(__name__)
class EmailClient(object): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
"""
Email client
This class abstract all interactions with the SMTP server.
"""
smtp_host = None
smtp_port = None
smtp_ssl = None
smtp_tls = None
smtp_user = None
smtp_password = None
smtp_debug = None
sender_name = None
sender_email = None
catch_all_addr = False
just_try = False
encoding = 'utf-8'
templates = dict()
def __init__(self, smtp_host=None, smtp_port=None, smtp_ssl=None, smtp_tls=None, smtp_user=None, smtp_password=None, smtp_debug=None,
sender_name=None, sender_email=None, catch_all_addr=None, just_try=None, encoding=None, templates=None):
self.smtp_host = smtp_host if smtp_host else 'localhost'
self.smtp_port = smtp_port if smtp_port else 25
self.smtp_ssl = bool(smtp_ssl)
self.smtp_tls = bool(smtp_tls)
self.smtp_user = smtp_user if smtp_user else None
self.smtp_password = smtp_password if smtp_password else None
self.smtp_debug = bool(smtp_debug)
self.sender_name = sender_name if sender_name else "No reply"
self.sender_email = sender_email if sender_email else "noreply@localhost"
self.catch_all_addr = catch_all_addr if catch_all_addr else False
self.just_try = just_try if just_try else False
assert templates is None or isinstance(templates, dict)
self.templates = templates if templates else dict()
if encoding:
self.encoding = encoding
def forge_message(self, rcpt_to, subject=None, html_body=None, text_body=None, attachment_files=None,
attachment_payloads=None, sender_name=None, sender_email=None, encoding=None,
template=None, **template_vars): # pylint: disable=too-many-arguments,too-many-locals
"""
Forge a message
:param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient.
:param subject: The subject of the email.
:param html_body: The HTML body of the email
:param text_body: The plain text body of the email
:param attachment_files: List of filepaths to attach
:param attachment_payloads: List of tuples with filename and payload to attach
:param sender_name: Custom sender name (default: as defined on initialization)
:param sender_email: Custom sender email (default: as defined on initialization)
:param encoding: Email content encoding (default: as defined on initialization)
:param template: The name of a template to use to forge this email
All other parameters will be consider as template variables.
"""
msg = MIMEMultipart('alternative')
msg['To'] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to
msg['From'] = email.utils.formataddr((sender_name or self.sender_name, sender_email or self.sender_email))
if subject:
msg['Subject'] = subject.format(**template_vars)
msg['Date'] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self.encoding
if template:
assert template in self.templates, "Unknwon template %s" % template
# Handle subject from template
if not subject:
assert self.templates[template].get('subject'), 'No subject defined in template %s' % template
msg['Subject'] = self.templates[template]['subject'].format(**template_vars)
# Put HTML part in last one to prefered it
parts = []
if self.templates[template].get('text'):
if isinstance(self.templates[template]['text'], MakoTemplate):
parts.append((self.templates[template]['text'].render(**template_vars), 'plain'))
else:
parts.append((self.templates[template]['text'].format(**template_vars), 'plain'))
if self.templates[template].get('html'):
if isinstance(self.templates[template]['html'], MakoTemplate):
parts.append((self.templates[template]['html'].render(**template_vars), 'html'))
else:
parts.append((self.templates[template]['html'].format(**template_vars), 'html'))
for body, mime_type in parts:
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
else:
assert subject, 'No subject provided'
if text_body:
msg.attach(MIMEText(text_body.encode(encoding), 'plain', _charset=encoding))
if html_body:
msg.attach(MIMEText(html_body.encode(encoding), 'html', _charset=encoding))
if attachment_files:
for filepath in attachment_files:
with open(filepath, 'rb') as fp:
part = MIMEBase('application', "octet-stream")
part.set_payload(fp.read())
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(filepath))
msg.attach(part)
if attachment_payloads:
for filename, payload in attachment_payloads:
part = MIMEBase('application', "octet-stream")
part.set_payload(payload)
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % filename)
msg.attach(part)
return msg
def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args):
"""
Send an email
:param rcpt_to: The recipient of the email. Could be a tuple(name, email)
or just the email of the recipient.
:param msg: The message of this email (as MIMEBase or derivated classes)
:param subject: The subject of the email (only if the message is not provided
using msg parameter)
:param just_try: Enable just try mode (do not really send email, default: as defined on initialization)
All other parameters will be consider as parameters to forge the message
(only if the message is not provided using msg parameter).
"""
msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args)
if just_try or self.just_try:
log.debug('Just-try mode: do not really send this email to %s (subject="%s")', rcpt_to, subject or msg.get('subject', 'No subject'))
return True
if self.catch_all_addr:
catch_addr = self.catch_all_addr
log.debug('Catch email originaly send to %s to %s', rcpt_to, catch_addr)
rcpt_to = catch_addr
try:
if self.smtp_ssl:
logging.info("Establish SSL connection to server %s:%s", self.smtp_host, self.smtp_port)
server = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port)
else:
logging.info("Establish connection to server %s:%s", self.smtp_host, self.smtp_port)
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
if self.smtp_tls:
logging.info('Start TLS on SMTP connection')
server.starttls()
except smtplib.SMTPException:
log.error('Error connecting to SMTP server %s:%s', self.smtp_host, self.smtp_port, exc_info=True)
return False
if self.smtp_debug:
server.set_debuglevel(True)
if self.smtp_user and self.smtp_password:
try:
log.info('Try to authenticate on SMTP connection as %s', self.smtp_user)
server.login(self.smtp_user, self.smtp_password)
except smtplib.SMTPException:
log.error('Error authenticating on SMTP server %s:%s with user %s', self.smtp_host, self.smtp_port, self.smtp_user, exc_info=True)
return False
error = False
try:
log.info('Sending email to %s', rcpt_to)
server.sendmail(self.sender_email, [rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to], msg.as_string())
except smtplib.SMTPException:
error = True
log.error('Error sending email to %s', rcpt_to, exc_info=True)
finally:
server.quit()
return not error
if __name__ == '__main__':
# Run tests
import datetime
import sys
import argparse
# Options parser
parser = argparse.ArgumentParser()
parser.add_argument(
'-v', '--verbose',
action="store_true",
dest="verbose",
help="Enable verbose mode"
)
parser.add_argument(
'-d', '--debug',
action="store_true",
dest="debug",
help="Enable debug mode"
)
parser.add_argument(
'-l', '--log-file',
action="store",
type=str,
dest="logfile",
help="Log file path"
)
parser.add_argument(
'-j', '--just-try',
action="store_true",
dest="just_try",
help="Enable just-try mode"
)
email_opts = parser.add_argument_group('Email options')
email_opts.add_argument(
'-H', '--smtp-host',
action="store",
type=str,
dest="email_smtp_host",
help="SMTP host"
)
email_opts.add_argument(
'-P', '--smtp-port',
action="store",
type=int,
dest="email_smtp_port",
help="SMTP port"
)
email_opts.add_argument(
'-S', '--smtp-ssl',
action="store_true",
dest="email_smtp_ssl",
help="Use SSL"
)
email_opts.add_argument(
'-T', '--smtp-tls',
action="store_true",
dest="email_smtp_tls",
help="Use TLS"
)
email_opts.add_argument(
'-u', '--smtp-user',
action="store",
type=str,
dest="email_smtp_user",
help="SMTP username"
)
email_opts.add_argument(
'-p', '--smtp-password',
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password"
)
email_opts.add_argument(
'-D', '--smtp-debug',
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection"
)
email_opts.add_argument(
'-e', '--email-encoding',
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding"
)
email_opts.add_argument(
'-f', '--sender-name',
action="store",
type=str,
dest="email_sender_name",
help="Sender name"
)
email_opts.add_argument(
'-F', '--sender-email',
action="store",
type=str,
dest="email_sender_email",
help="Sender email"
)
email_opts.add_argument(
'-C', '--catch-all',
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address"
)
test_opts = parser.add_argument_group('Test email options')
test_opts.add_argument(
'-t', '--to',
action="store",
type=str,
dest="test_to",
help="Test email recipient",
)
test_opts.add_argument(
'-m', '--mako',
action="store_true",
dest="test_mako",
help="Test mako templating",
)
options = parser.parse_args()
if not options.test_to:
parser.error('You must specify test email recipient using -t/--to parameter')
sys.exit(1)
# Initialize logs
logformat = '%(asctime)s - Test EmailClient - %(levelname)s - %(message)s'
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
if options.logfile:
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
if options.email_smtp_user and not options.email_smtp_password:
import getpass
options.email_smtp_password = getpass.getpass('Please enter SMTP password: ')
logging.info('Initialize Email client')
email_client = EmailClient(
smtp_host=options.email_smtp_host,
smtp_port=options.email_smtp_port,
smtp_ssl=options.email_smtp_ssl,
smtp_tls=options.email_smtp_tls,
smtp_user=options.email_smtp_user,
smtp_password=options.email_smtp_password,
smtp_debug=options.email_smtp_debug,
sender_name=options.email_sender_name,
sender_email=options.email_sender_email,
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates=dict(
test=dict(
subject="Test email",
text=(
"Just a test email sent at {sent_date}." if not options.test_mako else
MakoTemplate("Just a test email sent at ${sent_date}.")
),
html=(
"Just a test email. (sent at {sent_date})" if not options.test_mako else
MakoTemplate("Just a test email. (sent at ${sent_date})")
)
)
)
)
logging.info('Send a test email to %s', options.test_to)
if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()):
logging.info('Test email sent')
sys.exit(0)
logging.error('Fail to send test email')
sys.exit(1)