# -*- coding: utf-8 -*-
""" Email client to forge and send emails """
import logging
import os
import smtplib
import email.utils
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.encoders import encode_base64
from mako.template import Template as MakoTemplate
log = logging.getLogger(__name__)
class EmailClient(object): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
"""
Email client
This class abstract all interactions with the SMTP server.
"""
smtp_host = None
smtp_port = None
smtp_ssl = None
smtp_tls = None
smtp_user = None
smtp_password = None
smtp_debug = None
sender_name = None
sender_email = None
catch_all_addr = False
just_try = False
encoding = 'utf-8'
templates = dict()
def __init__(self, smtp_host=None, smtp_port=None, smtp_ssl=None, smtp_tls=None, smtp_user=None, smtp_password=None, smtp_debug=None,
sender_name=None, sender_email=None, catch_all_addr=None, just_try=None, encoding=None, templates=None):
self.smtp_host = smtp_host if smtp_host else 'localhost'
self.smtp_port = smtp_port if smtp_port else 25
self.smtp_ssl = bool(smtp_ssl)
self.smtp_tls = bool(smtp_tls)
self.smtp_user = smtp_user if smtp_user else None
self.smtp_password = smtp_password if smtp_password else None
self.smtp_debug = bool(smtp_debug)
self.sender_name = sender_name if sender_name else "No reply"
self.sender_email = sender_email if sender_email else "noreply@localhost"
self.catch_all_addr = catch_all_addr if catch_all_addr else False
self.just_try = just_try if just_try else False
assert templates is None or isinstance(templates, dict)
self.templates = templates if templates else dict()
if encoding:
self.encoding = encoding
def forge_message(self, rcpt_to, subject=None, html_body=None, text_body=None, attachment_files=None,
attachment_payloads=None, sender_name=None, sender_email=None, encoding=None,
template=None, **template_vars): # pylint: disable=too-many-arguments,too-many-locals
"""
Forge a message
:param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient.
:param subject: The subject of the email.
:param html_body: The HTML body of the email
:param text_body: The plain text body of the email
:param attachment_files: List of filepaths to attach
:param attachment_payloads: List of tuples with filename and payload to attach
:param sender_name: Custom sender name (default: as defined on initialization)
:param sender_email: Custom sender email (default: as defined on initialization)
:param encoding: Email content encoding (default: as defined on initialization)
:param template: The name of a template to use to forge this email
All other parameters will be consider as template variables.
"""
msg = MIMEMultipart('alternative')
msg['To'] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to
msg['From'] = email.utils.formataddr((sender_name or self.sender_name, sender_email or self.sender_email))
if subject:
msg['Subject'] = subject.format(**template_vars)
msg['Date'] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self.encoding
if template:
assert template in self.templates, "Unknwon template %s" % template
# Handle subject from template
if not subject:
assert self.templates[template].get('subject'), 'No subject defined in template %s' % template
msg['Subject'] = self.templates[template]['subject'].format(**template_vars)
# Put HTML part in last one to prefered it
parts = []
if self.templates[template].get('text'):
if isinstance(self.templates[template]['text'], MakoTemplate):
parts.append((self.templates[template]['text'].render(**template_vars), 'plain'))
else:
parts.append((self.templates[template]['text'].format(**template_vars), 'plain'))
if self.templates[template].get('html'):
if isinstance(self.templates[template]['html'], MakoTemplate):
parts.append((self.templates[template]['html'].render(**template_vars), 'html'))
else:
parts.append((self.templates[template]['html'].format(**template_vars), 'html'))
for body, mime_type in parts:
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
else:
assert subject, 'No subject provided'
if text_body:
msg.attach(MIMEText(text_body.encode(encoding), 'plain', _charset=encoding))
if html_body:
msg.attach(MIMEText(html_body.encode(encoding), 'html', _charset=encoding))
if attachment_files:
for filepath in attachment_files:
with open(filepath, 'rb') as fp:
part = MIMEBase('application', "octet-stream")
part.set_payload(fp.read())
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(filepath))
msg.attach(part)
if attachment_payloads:
for filename, payload in attachment_payloads:
part = MIMEBase('application', "octet-stream")
part.set_payload(payload)
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % filename)
msg.attach(part)
return msg
def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args):
"""
Send an email
:param rcpt_to: The recipient of the email. Could be a tuple(name, email)
or just the email of the recipient.
:param msg: The message of this email (as MIMEBase or derivated classes)
:param subject: The subject of the email (only if the message is not provided
using msg parameter)
:param just_try: Enable just try mode (do not really send email, default: as defined on initialization)
All other parameters will be consider as parameters to forge the message
(only if the message is not provided using msg parameter).
"""
msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args)
if just_try or self.just_try:
log.debug('Just-try mode: do not really send this email to %s (subject="%s")', rcpt_to, subject or msg.get('subject', 'No subject'))
return True
if self.catch_all_addr:
catch_addr = self.catch_all_addr
log.debug('Catch email originaly send to %s to %s', rcpt_to, catch_addr)
rcpt_to = catch_addr
try:
if self.smtp_ssl:
logging.info("Establish SSL connection to server %s:%s", self.smtp_host, self.smtp_port)
server = smtplib.SMTP_SSL(self.smtp_host, self.smtp_port)
else:
logging.info("Establish connection to server %s:%s", self.smtp_host, self.smtp_port)
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
if self.smtp_tls:
logging.info('Start TLS on SMTP connection')
server.starttls()
except smtplib.SMTPException:
log.error('Error connecting to SMTP server %s:%s', self.smtp_host, self.smtp_port, exc_info=True)
return False
if self.smtp_debug:
server.set_debuglevel(True)
if self.smtp_user and self.smtp_password:
try:
log.info('Try to authenticate on SMTP connection as %s', self.smtp_user)
server.login(self.smtp_user, self.smtp_password)
except smtplib.SMTPException:
log.error('Error authenticating on SMTP server %s:%s with user %s', self.smtp_host, self.smtp_port, self.smtp_user, exc_info=True)
return False
error = False
try:
log.info('Sending email to %s', rcpt_to)
server.sendmail(self.sender_email, [rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to], msg.as_string())
except smtplib.SMTPException:
error = True
log.error('Error sending email to %s', rcpt_to, exc_info=True)
finally:
server.quit()
return not error
if __name__ == '__main__':
# Run tests
import datetime
import sys
import argparse
# Options parser
parser = argparse.ArgumentParser()
parser.add_argument(
'-v', '--verbose',
action="store_true",
dest="verbose",
help="Enable verbose mode"
)
parser.add_argument(
'-d', '--debug',
action="store_true",
dest="debug",
help="Enable debug mode"
)
parser.add_argument(
'-l', '--log-file',
action="store",
type=str,
dest="logfile",
help="Log file path"
)
parser.add_argument(
'-j', '--just-try',
action="store_true",
dest="just_try",
help="Enable just-try mode"
)
email_opts = parser.add_argument_group('Email options')
email_opts.add_argument(
'-H', '--smtp-host',
action="store",
type=str,
dest="email_smtp_host",
help="SMTP host"
)
email_opts.add_argument(
'-P', '--smtp-port',
action="store",
type=int,
dest="email_smtp_port",
help="SMTP port"
)
email_opts.add_argument(
'-S', '--smtp-ssl',
action="store_true",
dest="email_smtp_ssl",
help="Use SSL"
)
email_opts.add_argument(
'-T', '--smtp-tls',
action="store_true",
dest="email_smtp_tls",
help="Use TLS"
)
email_opts.add_argument(
'-u', '--smtp-user',
action="store",
type=str,
dest="email_smtp_user",
help="SMTP username"
)
email_opts.add_argument(
'-p', '--smtp-password',
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password"
)
email_opts.add_argument(
'-D', '--smtp-debug',
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection"
)
email_opts.add_argument(
'-e', '--email-encoding',
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding"
)
email_opts.add_argument(
'-f', '--sender-name',
action="store",
type=str,
dest="email_sender_name",
help="Sender name"
)
email_opts.add_argument(
'-F', '--sender-email',
action="store",
type=str,
dest="email_sender_email",
help="Sender email"
)
email_opts.add_argument(
'-C', '--catch-all',
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address"
)
test_opts = parser.add_argument_group('Test email options')
test_opts.add_argument(
'-t', '--to',
action="store",
type=str,
dest="test_to",
help="Test email recipient",
)
test_opts.add_argument(
'-m', '--mako',
action="store_true",
dest="test_mako",
help="Test mako templating",
)
options = parser.parse_args()
if not options.test_to:
parser.error('You must specify test email recipient using -t/--to parameter')
sys.exit(1)
# Initialize logs
logformat = '%(asctime)s - Test EmailClient - %(levelname)s - %(message)s'
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
if options.logfile:
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
if options.email_smtp_user and not options.email_smtp_password:
import getpass
options.email_smtp_password = getpass.getpass('Please enter SMTP password: ')
logging.info('Initialize Email client')
email_client = EmailClient(
smtp_host=options.email_smtp_host,
smtp_port=options.email_smtp_port,
smtp_ssl=options.email_smtp_ssl,
smtp_tls=options.email_smtp_tls,
smtp_user=options.email_smtp_user,
smtp_password=options.email_smtp_password,
smtp_debug=options.email_smtp_debug,
sender_name=options.email_sender_name,
sender_email=options.email_sender_email,
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates=dict(
test=dict(
subject="Test email",
text=(
"Just a test email sent at {sent_date}." if not options.test_mako else
MakoTemplate("Just a test email sent at ${sent_date}.")
),
html=(
"Just a test email. (sent at {sent_date})" if not options.test_mako else
MakoTemplate("Just a test email. (sent at ${sent_date})")
)
)
)
)
logging.info('Send a test email to %s', options.test_to)
if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()):
logging.info('Test email sent')
sys.exit(0)
logging.error('Fail to send test email')
sys.exit(1)