python-mylib/mylib/email.py

595 lines
21 KiB
Python

""" Email client to forge and send emails """
import email.utils
import logging
import os
import smtplib
from email.encoders import encode_base64
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from mako.template import Template as MakoTemplate
from mylib.config import (
BooleanOption,
ConfigurableObject,
IntegerOption,
PasswordOption,
StringOption,
)
log = logging.getLogger(__name__)
class EmailClient(
ConfigurableObject
): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
"""
Email client
This class abstract all interactions with the SMTP server.
"""
_config_name = "email"
_config_comment = "Email"
_defaults = {
"smtp_host": "localhost",
"smtp_port": 25,
"smtp_ssl": False,
"smtp_tls": False,
"smtp_user": None,
"smtp_password": None,
"smtp_debug": False,
"sender_name": "No reply",
"sender_email": "noreply@localhost",
"encoding": "utf-8",
"catch_all_addr": None,
"just_try": False,
"templates_path": None,
}
templates = {}
def __init__(self, templates=None, initialize=False, **kwargs):
super().__init__(**kwargs)
assert templates is None or isinstance(templates, dict)
self.templates = templates if templates else {}
if initialize:
self.initialize()
# pylint: disable=arguments-differ,arguments-renamed
def configure(self, use_smtp=True, **kwargs):
"""Configure options on registered mylib.Config object"""
section = super().configure(
just_try_help=kwargs.pop("just_try_help", "Just-try mode: do not really send emails"),
**kwargs,
)
if use_smtp:
section.add_option(
StringOption,
"smtp_host",
default=self._defaults["smtp_host"],
comment="SMTP server hostname/IP address",
)
section.add_option(
IntegerOption,
"smtp_port",
default=self._defaults["smtp_port"],
comment="SMTP server port",
)
section.add_option(
BooleanOption,
"smtp_ssl",
default=self._defaults["smtp_ssl"],
comment="Use SSL on SMTP server connection",
)
section.add_option(
BooleanOption,
"smtp_tls",
default=self._defaults["smtp_tls"],
comment="Use TLS on SMTP server connection",
)
section.add_option(
StringOption,
"smtp_user",
default=self._defaults["smtp_user"],
comment="SMTP authentication username",
)
section.add_option(
PasswordOption,
"smtp_password",
default=self._defaults["smtp_password"],
comment='SMTP authentication password (set to "keyring" to use XDG keyring)',
username_option="smtp_user",
keyring_value="keyring",
)
section.add_option(
BooleanOption,
"smtp_debug",
default=self._defaults["smtp_debug"],
comment="Enable SMTP debugging",
)
section.add_option(
StringOption,
"sender_name",
default=self._defaults["sender_name"],
comment="Sender name",
)
section.add_option(
StringOption,
"sender_email",
default=self._defaults["sender_email"],
comment="Sender email address",
)
section.add_option(
StringOption, "encoding", default=self._defaults["encoding"], comment="Email encoding"
)
section.add_option(
StringOption,
"catch_all_addr",
default=self._defaults["catch_all_addr"],
comment="Catch all sent emails to this specified email address",
)
section.add_option(
StringOption,
"templates_path",
default=self._defaults["templates_path"],
comment="Path to templates directory",
)
return section
def initialize(self, *args, **kwargs): # pylint: disable=arguments-differ
"""Configuration initialized hook"""
super().initialize(*args, **kwargs)
self.load_templates_directory()
def load_templates_directory(self, templates_path=None):
"""Load templates from specified directory"""
if templates_path is None:
templates_path = self._get_option("templates_path")
if not templates_path:
return
log.debug("Load email templates from %s directory", templates_path)
for filename in os.listdir(templates_path):
filepath = os.path.join(templates_path, filename)
if not os.path.isfile(filepath):
continue
template_name, template_type = os.path.splitext(filename)
if template_type not in [".html", ".txt", ".subject"]:
continue
template_type = "text" if template_type == ".txt" else template_type[1:]
if template_name not in self.templates:
self.templates[template_name] = {}
log.debug("Load email template %s %s from %s", template_name, template_type, filepath)
with open(filepath, encoding="utf8") as file_desc:
self.templates[template_name][template_type] = MakoTemplate(
file_desc.read()
) # nosec
def forge_message(
self,
recipients,
subject=None,
html_body=None,
text_body=None, # pylint: disable=too-many-arguments,too-many-locals
attachment_files=None,
attachment_payloads=None,
sender_name=None,
sender_email=None,
encoding=None,
template=None,
cc=None,
**template_vars,
):
"""
Forge a message
:param recipients: The recipient(s) of the email. List of tuple(name, email) or
just the email of the recipients.
:param subject: The subject of the email.
:param html_body: The HTML body of the email
:param text_body: The plain text body of the email
:param attachment_files: List of filepaths to attach
:param attachment_payloads: List of tuples with filename and payload to attach
:param sender_name: Custom sender name (default: as defined on initialization)
:param sender_email: Custom sender email (default: as defined on initialization)
:param encoding: Email content encoding (default: as defined on initialization)
:param template: The name of a template to use to forge this email
:param cc: Optional list of CC recipient addresses.
List of tuple(name, email) or just the email of the recipients.
All other parameters will be consider as template variables.
"""
recipients = [recipients] if not isinstance(recipients, list) else recipients
msg = MIMEMultipart("alternative")
msg["To"] = ", ".join(
[
email.utils.formataddr(recipient) if isinstance(recipient, tuple) else recipient
for recipient in recipients
]
)
if cc:
cc = [cc] if not isinstance(cc, list) else cc
msg["Cc"] = ", ".join(
[
email.utils.formataddr(recipient) if isinstance(recipient, tuple) else recipient
for recipient in cc
]
)
msg["From"] = email.utils.formataddr(
(
sender_name or self._get_option("sender_name"),
sender_email or self._get_option("sender_email"),
)
)
if subject:
msg["Subject"] = (
subject.render(**template_vars)
if isinstance(subject, MakoTemplate)
else subject.format(**template_vars)
)
msg["Date"] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self._get_option("encoding")
if template:
assert template in self.templates, f"Unknown template {template}"
# Handle subject from template
if not subject:
assert self.templates[template].get(
"subject"
), f"No subject defined in template {template}"
msg["Subject"] = (
self.templates[template]["subject"].render(**template_vars)
if isinstance(self.templates[template]["subject"], MakoTemplate)
else self.templates[template]["subject"].format(**template_vars)
)
# Put HTML part in last one to preferred it
parts = []
if self.templates[template].get("text"):
if isinstance(self.templates[template]["text"], MakoTemplate):
parts.append(
(self.templates[template]["text"].render(**template_vars), "plain")
)
else:
parts.append(
(self.templates[template]["text"].format(**template_vars), "plain")
)
if self.templates[template].get("html"):
if isinstance(self.templates[template]["html"], MakoTemplate):
parts.append((self.templates[template]["html"].render(**template_vars), "html"))
else:
parts.append((self.templates[template]["html"].format(**template_vars), "html"))
for body, mime_type in parts:
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
else:
assert subject, "No subject provided"
if text_body:
msg.attach(MIMEText(text_body.encode(encoding), "plain", _charset=encoding))
if html_body:
msg.attach(MIMEText(html_body.encode(encoding), "html", _charset=encoding))
if attachment_files:
for filepath in attachment_files:
with open(filepath, "rb") as fp:
part = MIMEBase("application", "octet-stream")
part.set_payload(fp.read())
encode_base64(part)
part.add_header(
"Content-Disposition",
f'attachment; filename="{os.path.basename(filepath)}"',
)
msg.attach(part)
if attachment_payloads:
for filename, payload in attachment_payloads:
part = MIMEBase("application", "octet-stream")
part.set_payload(payload)
encode_base64(part)
part.add_header("Content-Disposition", f'attachment; filename="{filename}"')
msg.attach(part)
return msg
def send(
self, recipients, msg=None, subject=None, just_try=None, cc=None, bcc=None, **forge_args
):
"""
Send an email
:param recipients: The recipient(s) of the email. List of tuple(name, email) or
just the email of the recipients.
:param msg: The message of this email (as MIMEBase or derivated classes)
:param subject: The subject of the email (only if the message is not provided
using msg parameter)
:param just_try: Enable just try mode (do not really send email, default: as defined on
initialization)
:param cc: Optional list of CC recipient addresses. List of tuple(name, email) or
just the email of the recipients.
:param bcc: Optional list of BCC recipient addresses. List of tuple(name, email) or
just the email of the recipients.
All other parameters will be consider as parameters to forge the message
(only if the message is not provided using msg parameter).
"""
recipients = [recipients] if not isinstance(recipients, list) else recipients
msg = msg if msg else self.forge_message(recipients, subject, cc=cc, **forge_args)
catch_addr = self._get_option("catch_all_addr")
if catch_addr:
log.debug(
"Catch email originally send to %s (CC:%s, BCC:%s) to %s",
", ".join(recipients),
", ".join(cc) if isinstance(cc, list) else cc,
", ".join(bcc) if isinstance(bcc, list) else bcc,
catch_addr,
)
recipients = catch_addr if isinstance(catch_addr, list) else [catch_addr]
else:
if cc:
recipients.extend(
[
recipient[1] if isinstance(recipient, tuple) else recipient
for recipient in (cc if isinstance(cc, list) else [cc])
]
)
if bcc:
recipients.extend(
[
recipient[1] if isinstance(recipient, tuple) else recipient
for recipient in (bcc if isinstance(bcc, list) else [bcc])
]
)
if just_try if just_try is not None else self._just_try:
log.debug(
'Just-try mode: do not really send this email to %s (subject="%s")',
", ".join(recipients),
subject or msg.get("subject", "No subject"),
)
return True
smtp_host = self._get_option("smtp_host")
smtp_port = self._get_option("smtp_port")
try:
if self._get_option("smtp_ssl"):
logging.info("Establish SSL connection to server %s:%s", smtp_host, smtp_port)
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
else:
logging.info("Establish connection to server %s:%s", smtp_host, smtp_port)
server = smtplib.SMTP(smtp_host, smtp_port)
if self._get_option("smtp_tls"):
logging.info("Start TLS on SMTP connection")
server.starttls()
except smtplib.SMTPException:
log.error("Error connecting to SMTP server %s:%s", smtp_host, smtp_port, exc_info=True)
return False
if self._get_option("smtp_debug"):
server.set_debuglevel(True)
smtp_user = self._get_option("smtp_user")
smtp_password = self._get_option("smtp_password")
if smtp_user and smtp_password:
try:
log.info("Try to authenticate on SMTP connection as %s", smtp_user)
server.login(smtp_user, smtp_password)
except smtplib.SMTPException:
log.error(
"Error authenticating on SMTP server %s:%s with user %s",
smtp_host,
smtp_port,
smtp_user,
exc_info=True,
)
return False
error = False
try:
log.info("Sending email to %s", ", ".join(recipients))
server.sendmail(
self._get_option("sender_email"),
[
recipient[1] if isinstance(recipient, tuple) else recipient
for recipient in recipients
],
msg.as_string(),
)
except smtplib.SMTPException:
error = True
log.error("Error sending email to %s", ", ".join(recipients), exc_info=True)
finally:
server.quit()
return not error
if __name__ == "__main__":
# Run tests
import argparse
import datetime
import sys
# Options parser
parser = argparse.ArgumentParser()
parser.add_argument(
"-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode"
)
parser.add_argument(
"-d", "--debug", action="store_true", dest="debug", help="Enable debug mode"
)
parser.add_argument(
"-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path"
)
parser.add_argument(
"-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode"
)
email_opts = parser.add_argument_group("Email options")
email_opts.add_argument(
"-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host"
)
email_opts.add_argument(
"-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port"
)
email_opts.add_argument(
"-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL"
)
email_opts.add_argument(
"-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS"
)
email_opts.add_argument(
"-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username"
)
email_opts.add_argument(
"-p",
"--smtp-password",
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password",
)
email_opts.add_argument(
"-D",
"--smtp-debug",
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection",
)
email_opts.add_argument(
"-e",
"--email-encoding",
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding",
)
email_opts.add_argument(
"-f",
"--sender-name",
action="store",
type=str,
dest="email_sender_name",
help="Sender name",
)
email_opts.add_argument(
"-F",
"--sender-email",
action="store",
type=str,
dest="email_sender_email",
help="Sender email",
)
email_opts.add_argument(
"-C",
"--catch-all",
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address",
)
test_opts = parser.add_argument_group("Test email options")
test_opts.add_argument(
"-t",
"--to",
action="store",
type=str,
dest="test_to",
help="Test email recipient",
)
test_opts.add_argument(
"-m",
"--mako",
action="store_true",
dest="test_mako",
help="Test mako templating",
)
options = parser.parse_args()
if not options.test_to:
parser.error("You must specify test email recipient using -t/--to parameter")
sys.exit(1)
# Initialize logs
logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s"
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
if options.logfile:
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
if options.email_smtp_user and not options.email_smtp_password:
import getpass
options.email_smtp_password = getpass.getpass("Please enter SMTP password: ")
logging.info("Initialize Email client")
email_client = EmailClient(
smtp_host=options.email_smtp_host,
smtp_port=options.email_smtp_port,
smtp_ssl=options.email_smtp_ssl,
smtp_tls=options.email_smtp_tls,
smtp_user=options.email_smtp_user,
smtp_password=options.email_smtp_password,
smtp_debug=options.email_smtp_debug,
sender_name=options.email_sender_name,
sender_email=options.email_sender_email,
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates={
"test": {
"subject": "Test email",
"text": (
"Just a test email sent at {sent_date}."
if not options.test_mako
else MakoTemplate("Just a test email sent at ${sent_date | h}.") # nosec
),
"html": (
"<strong>Just a test email.</strong> <small>(sent at {sent_date | h})</small>"
if not options.test_mako
else MakoTemplate( # nosec
"<strong>Just a test email.</strong> "
"<small>(sent at ${sent_date | h})</small>"
)
),
}
},
)
logging.info("Send a test email to %s", options.test_to)
if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()):
logging.info("Test email sent")
sys.exit(0)
logging.error("Fail to send test email")
sys.exit(1)