594 lines
21 KiB
Python
594 lines
21 KiB
Python
""" Email client to forge and send emails """
|
|
|
|
import email.utils
|
|
import logging
|
|
import os
|
|
import smtplib
|
|
from email.encoders import encode_base64
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
|
|
from mako.template import Template as MakoTemplate
|
|
|
|
from mylib.config import (
|
|
BooleanOption,
|
|
ConfigurableObject,
|
|
IntegerOption,
|
|
PasswordOption,
|
|
StringOption,
|
|
)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailClient(
|
|
ConfigurableObject
|
|
): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
|
|
"""
|
|
Email client
|
|
|
|
This class abstract all interactions with the SMTP server.
|
|
"""
|
|
|
|
_config_name = "email"
|
|
_config_comment = "Email"
|
|
_defaults = {
|
|
"smtp_host": "localhost",
|
|
"smtp_port": 25,
|
|
"smtp_ssl": False,
|
|
"smtp_tls": False,
|
|
"smtp_user": None,
|
|
"smtp_password": None,
|
|
"smtp_debug": False,
|
|
"sender_name": "No reply",
|
|
"sender_email": "noreply@localhost",
|
|
"encoding": "utf-8",
|
|
"catch_all_addr": None,
|
|
"just_try": False,
|
|
"templates_path": None,
|
|
}
|
|
|
|
templates = {}
|
|
|
|
def __init__(self, templates=None, initialize=False, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
assert templates is None or isinstance(templates, dict)
|
|
self.templates = templates if templates else {}
|
|
if initialize:
|
|
self.initialize()
|
|
|
|
# pylint: disable=arguments-differ,arguments-renamed
|
|
def configure(self, use_smtp=True, **kwargs):
|
|
"""Configure options on registered mylib.Config object"""
|
|
section = super().configure(
|
|
just_try_help=kwargs.pop("just_try_help", "Just-try mode: do not really send emails"),
|
|
**kwargs,
|
|
)
|
|
|
|
if use_smtp:
|
|
section.add_option(
|
|
StringOption,
|
|
"smtp_host",
|
|
default=self._defaults["smtp_host"],
|
|
comment="SMTP server hostname/IP address",
|
|
)
|
|
section.add_option(
|
|
IntegerOption,
|
|
"smtp_port",
|
|
default=self._defaults["smtp_port"],
|
|
comment="SMTP server port",
|
|
)
|
|
section.add_option(
|
|
BooleanOption,
|
|
"smtp_ssl",
|
|
default=self._defaults["smtp_ssl"],
|
|
comment="Use SSL on SMTP server connection",
|
|
)
|
|
section.add_option(
|
|
BooleanOption,
|
|
"smtp_tls",
|
|
default=self._defaults["smtp_tls"],
|
|
comment="Use TLS on SMTP server connection",
|
|
)
|
|
section.add_option(
|
|
StringOption,
|
|
"smtp_user",
|
|
default=self._defaults["smtp_user"],
|
|
comment="SMTP authentication username",
|
|
)
|
|
section.add_option(
|
|
PasswordOption,
|
|
"smtp_password",
|
|
default=self._defaults["smtp_password"],
|
|
comment='SMTP authentication password (set to "keyring" to use XDG keyring)',
|
|
username_option="smtp_user",
|
|
keyring_value="keyring",
|
|
)
|
|
section.add_option(
|
|
BooleanOption,
|
|
"smtp_debug",
|
|
default=self._defaults["smtp_debug"],
|
|
comment="Enable SMTP debugging",
|
|
)
|
|
|
|
section.add_option(
|
|
StringOption,
|
|
"sender_name",
|
|
default=self._defaults["sender_name"],
|
|
comment="Sender name",
|
|
)
|
|
section.add_option(
|
|
StringOption,
|
|
"sender_email",
|
|
default=self._defaults["sender_email"],
|
|
comment="Sender email address",
|
|
)
|
|
section.add_option(
|
|
StringOption, "encoding", default=self._defaults["encoding"], comment="Email encoding"
|
|
)
|
|
section.add_option(
|
|
StringOption,
|
|
"catch_all_addr",
|
|
default=self._defaults["catch_all_addr"],
|
|
comment="Catch all sent emails to this specified email address",
|
|
)
|
|
|
|
section.add_option(
|
|
StringOption,
|
|
"templates_path",
|
|
default=self._defaults["templates_path"],
|
|
comment="Path to templates directory",
|
|
)
|
|
|
|
return section
|
|
|
|
def initialize(self, *args, **kwargs): # pylint: disable=arguments-differ
|
|
"""Configuration initialized hook"""
|
|
super().initialize(*args, **kwargs)
|
|
self.load_templates_directory()
|
|
|
|
def load_templates_directory(self, templates_path=None):
|
|
"""Load templates from specified directory"""
|
|
if templates_path is None:
|
|
templates_path = self._get_option("templates_path")
|
|
if not templates_path:
|
|
return
|
|
log.debug("Load email templates from %s directory", templates_path)
|
|
for filename in os.listdir(templates_path):
|
|
filepath = os.path.join(templates_path, filename)
|
|
if not os.path.isfile(filepath):
|
|
continue
|
|
template_name, template_type = os.path.splitext(filename)
|
|
if template_type not in [".html", ".txt", ".subject"]:
|
|
continue
|
|
template_type = "text" if template_type == ".txt" else template_type[1:]
|
|
if template_name not in self.templates:
|
|
self.templates[template_name] = {}
|
|
log.debug("Load email template %s %s from %s", template_name, template_type, filepath)
|
|
with open(filepath, encoding="utf8") as file_desc:
|
|
self.templates[template_name][template_type] = MakoTemplate(
|
|
file_desc.read()
|
|
) # nosec
|
|
|
|
def forge_message(
|
|
self,
|
|
recipients,
|
|
subject=None,
|
|
html_body=None,
|
|
text_body=None, # pylint: disable=too-many-arguments,too-many-locals
|
|
attachment_files=None,
|
|
attachment_payloads=None,
|
|
sender_name=None,
|
|
sender_email=None,
|
|
encoding=None,
|
|
template=None,
|
|
cc=None,
|
|
**template_vars,
|
|
):
|
|
"""
|
|
Forge a message
|
|
|
|
:param recipients: The recipient(s) of the email. List of tuple(name, email) or
|
|
just the email of the recipients.
|
|
:param subject: The subject of the email.
|
|
:param html_body: The HTML body of the email
|
|
:param text_body: The plain text body of the email
|
|
:param attachment_files: List of filepaths to attach
|
|
:param attachment_payloads: List of tuples with filename and payload to attach
|
|
:param sender_name: Custom sender name (default: as defined on initialization)
|
|
:param sender_email: Custom sender email (default: as defined on initialization)
|
|
:param encoding: Email content encoding (default: as defined on initialization)
|
|
:param template: The name of a template to use to forge this email
|
|
:param cc: Optional list of CC recipient addresses.
|
|
List of tuple(name, email) or just the email of the recipients.
|
|
|
|
All other parameters will be consider as template variables.
|
|
"""
|
|
recipients = [recipients] if not isinstance(recipients, list) else recipients
|
|
msg = MIMEMultipart("alternative")
|
|
msg["To"] = ", ".join(
|
|
[
|
|
email.utils.formataddr(recipient) if isinstance(recipient, tuple) else recipient
|
|
for recipient in recipients
|
|
]
|
|
)
|
|
|
|
if cc:
|
|
cc = [cc] if not isinstance(cc, list) else cc
|
|
msg["Cc"] = ", ".join(
|
|
[
|
|
email.utils.formataddr(recipient) if isinstance(recipient, tuple) else recipient
|
|
for recipient in cc
|
|
]
|
|
)
|
|
|
|
msg["From"] = email.utils.formataddr(
|
|
(
|
|
sender_name or self._get_option("sender_name"),
|
|
sender_email or self._get_option("sender_email"),
|
|
)
|
|
)
|
|
if subject:
|
|
msg["Subject"] = (
|
|
subject.render(**template_vars)
|
|
if isinstance(subject, MakoTemplate)
|
|
else subject.format(**template_vars)
|
|
)
|
|
msg["Date"] = email.utils.formatdate(None, True)
|
|
encoding = encoding if encoding else self._get_option("encoding")
|
|
if template:
|
|
assert template in self.templates, f"Unknown template {template}"
|
|
# Handle subject from template
|
|
if not subject:
|
|
assert self.templates[template].get(
|
|
"subject"
|
|
), f"No subject defined in template {template}"
|
|
msg["Subject"] = (
|
|
self.templates[template]["subject"].render(**template_vars)
|
|
if isinstance(self.templates[template]["subject"], MakoTemplate)
|
|
else self.templates[template]["subject"].format(**template_vars)
|
|
)
|
|
|
|
# Put HTML part in last one to preferred it
|
|
parts = []
|
|
if self.templates[template].get("text"):
|
|
if isinstance(self.templates[template]["text"], MakoTemplate):
|
|
parts.append(
|
|
(self.templates[template]["text"].render(**template_vars), "plain")
|
|
)
|
|
else:
|
|
parts.append(
|
|
(self.templates[template]["text"].format(**template_vars), "plain")
|
|
)
|
|
if self.templates[template].get("html"):
|
|
if isinstance(self.templates[template]["html"], MakoTemplate):
|
|
parts.append((self.templates[template]["html"].render(**template_vars), "html"))
|
|
else:
|
|
parts.append((self.templates[template]["html"].format(**template_vars), "html"))
|
|
|
|
for body, mime_type in parts:
|
|
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
|
|
else:
|
|
assert subject, "No subject provided"
|
|
if text_body:
|
|
msg.attach(MIMEText(text_body.encode(encoding), "plain", _charset=encoding))
|
|
if html_body:
|
|
msg.attach(MIMEText(html_body.encode(encoding), "html", _charset=encoding))
|
|
if attachment_files:
|
|
for filepath in attachment_files:
|
|
with open(filepath, "rb") as fp:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(fp.read())
|
|
encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
f'attachment; filename="{os.path.basename(filepath)}"',
|
|
)
|
|
msg.attach(part)
|
|
if attachment_payloads:
|
|
for filename, payload in attachment_payloads:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(payload)
|
|
encode_base64(part)
|
|
part.add_header("Content-Disposition", f'attachment; filename="{filename}"')
|
|
msg.attach(part)
|
|
return msg
|
|
|
|
def send(
|
|
self, recipients, msg=None, subject=None, just_try=None, cc=None, bcc=None, **forge_args
|
|
):
|
|
"""
|
|
Send an email
|
|
|
|
:param recipients: The recipient(s) of the email. List of tuple(name, email) or
|
|
just the email of the recipients.
|
|
:param msg: The message of this email (as MIMEBase or derivated classes)
|
|
:param subject: The subject of the email (only if the message is not provided
|
|
using msg parameter)
|
|
:param just_try: Enable just try mode (do not really send email, default: as defined on
|
|
initialization)
|
|
:param cc: Optional list of CC recipient addresses. List of tuple(name, email) or
|
|
just the email of the recipients.
|
|
:param bcc: Optional list of BCC recipient addresses. List of tuple(name, email) or
|
|
just the email of the recipients.
|
|
|
|
All other parameters will be consider as parameters to forge the message
|
|
(only if the message is not provided using msg parameter).
|
|
"""
|
|
recipients = [recipients] if not isinstance(recipients, list) else recipients
|
|
msg = msg if msg else self.forge_message(recipients, subject, cc=cc, **forge_args)
|
|
catch_addr = self._get_option("catch_all_addr")
|
|
if catch_addr:
|
|
log.debug(
|
|
"Catch email originally send to %s (CC:%s, BCC:%s) to %s",
|
|
", ".join(recipients),
|
|
", ".join(cc) if isinstance(cc, list) else cc,
|
|
", ".join(bcc) if isinstance(bcc, list) else bcc,
|
|
catch_addr,
|
|
)
|
|
recipients = catch_addr if isinstance(catch_addr, list) else [catch_addr]
|
|
else:
|
|
if cc:
|
|
recipients.extend(
|
|
[
|
|
recipient[1] if isinstance(recipient, tuple) else recipient
|
|
for recipient in (cc if isinstance(cc, list) else [cc])
|
|
]
|
|
)
|
|
if bcc:
|
|
recipients.extend(
|
|
[
|
|
recipient[1] if isinstance(recipient, tuple) else recipient
|
|
for recipient in (bcc if isinstance(bcc, list) else [bcc])
|
|
]
|
|
)
|
|
|
|
if just_try if just_try is not None else self._just_try:
|
|
log.debug(
|
|
'Just-try mode: do not really send this email to %s (subject="%s")',
|
|
", ".join(recipients),
|
|
subject or msg.get("subject", "No subject"),
|
|
)
|
|
return True
|
|
|
|
smtp_host = self._get_option("smtp_host")
|
|
smtp_port = self._get_option("smtp_port")
|
|
try:
|
|
if self._get_option("smtp_ssl"):
|
|
logging.info("Establish SSL connection to server %s:%s", smtp_host, smtp_port)
|
|
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
|
|
else:
|
|
logging.info("Establish connection to server %s:%s", smtp_host, smtp_port)
|
|
server = smtplib.SMTP(smtp_host, smtp_port)
|
|
if self._get_option("smtp_tls"):
|
|
logging.info("Start TLS on SMTP connection")
|
|
server.starttls()
|
|
except smtplib.SMTPException:
|
|
log.error("Error connecting to SMTP server %s:%s", smtp_host, smtp_port, exc_info=True)
|
|
return False
|
|
|
|
if self._get_option("smtp_debug"):
|
|
server.set_debuglevel(True)
|
|
|
|
smtp_user = self._get_option("smtp_user")
|
|
smtp_password = self._get_option("smtp_password")
|
|
if smtp_user and smtp_password:
|
|
try:
|
|
log.info("Try to authenticate on SMTP connection as %s", smtp_user)
|
|
server.login(smtp_user, smtp_password)
|
|
except smtplib.SMTPException:
|
|
log.error(
|
|
"Error authenticating on SMTP server %s:%s with user %s",
|
|
smtp_host,
|
|
smtp_port,
|
|
smtp_user,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
|
|
error = False
|
|
try:
|
|
log.info("Sending email to %s", ", ".join(recipients))
|
|
server.sendmail(
|
|
self._get_option("sender_email"),
|
|
[
|
|
recipient[1] if isinstance(recipient, tuple) else recipient
|
|
for recipient in recipients
|
|
],
|
|
msg.as_string(),
|
|
)
|
|
except smtplib.SMTPException:
|
|
error = True
|
|
log.error("Error sending email to %s", ", ".join(recipients), exc_info=True)
|
|
finally:
|
|
server.quit()
|
|
|
|
return not error
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run tests
|
|
import argparse
|
|
import datetime
|
|
import sys
|
|
|
|
# Options parser
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-d", "--debug", action="store_true", dest="debug", help="Enable debug mode"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode"
|
|
)
|
|
|
|
email_opts = parser.add_argument_group("Email options")
|
|
|
|
email_opts.add_argument(
|
|
"-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-p",
|
|
"--smtp-password",
|
|
action="store",
|
|
type=str,
|
|
dest="email_smtp_password",
|
|
help="SMTP password",
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-D",
|
|
"--smtp-debug",
|
|
action="store_true",
|
|
dest="email_smtp_debug",
|
|
help="Debug SMTP connection",
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-e",
|
|
"--email-encoding",
|
|
action="store",
|
|
type=str,
|
|
dest="email_encoding",
|
|
help="SMTP encoding",
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-f",
|
|
"--sender-name",
|
|
action="store",
|
|
type=str,
|
|
dest="email_sender_name",
|
|
help="Sender name",
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-F",
|
|
"--sender-email",
|
|
action="store",
|
|
type=str,
|
|
dest="email_sender_email",
|
|
help="Sender email",
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
"-C",
|
|
"--catch-all",
|
|
action="store",
|
|
type=str,
|
|
dest="email_catch_all",
|
|
help="Catch all sent email: specify catch recipient email address",
|
|
)
|
|
|
|
test_opts = parser.add_argument_group("Test email options")
|
|
|
|
test_opts.add_argument(
|
|
"-t",
|
|
"--to",
|
|
action="store",
|
|
type=str,
|
|
dest="test_to",
|
|
help="Test email recipient",
|
|
)
|
|
|
|
test_opts.add_argument(
|
|
"-m",
|
|
"--mako",
|
|
action="store_true",
|
|
dest="test_mako",
|
|
help="Test mako templating",
|
|
)
|
|
|
|
options = parser.parse_args()
|
|
|
|
if not options.test_to:
|
|
parser.error("You must specify test email recipient using -t/--to parameter")
|
|
sys.exit(1)
|
|
|
|
# Initialize logs
|
|
logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s"
|
|
if options.debug:
|
|
loglevel = logging.DEBUG
|
|
elif options.verbose:
|
|
loglevel = logging.INFO
|
|
else:
|
|
loglevel = logging.WARNING
|
|
|
|
if options.logfile:
|
|
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
|
|
else:
|
|
logging.basicConfig(level=loglevel, format=logformat)
|
|
|
|
if options.email_smtp_user and not options.email_smtp_password:
|
|
import getpass
|
|
|
|
options.email_smtp_password = getpass.getpass("Please enter SMTP password: ")
|
|
|
|
logging.info("Initialize Email client")
|
|
email_client = EmailClient(
|
|
smtp_host=options.email_smtp_host,
|
|
smtp_port=options.email_smtp_port,
|
|
smtp_ssl=options.email_smtp_ssl,
|
|
smtp_tls=options.email_smtp_tls,
|
|
smtp_user=options.email_smtp_user,
|
|
smtp_password=options.email_smtp_password,
|
|
smtp_debug=options.email_smtp_debug,
|
|
sender_name=options.email_sender_name,
|
|
sender_email=options.email_sender_email,
|
|
catch_all_addr=options.email_catch_all,
|
|
just_try=options.just_try,
|
|
encoding=options.email_encoding,
|
|
templates={
|
|
"test": {
|
|
"subject": "Test email",
|
|
"text": (
|
|
"Just a test email sent at {sent_date}."
|
|
if not options.test_mako
|
|
else MakoTemplate("Just a test email sent at ${sent_date | h}.") # nosec
|
|
),
|
|
"html": (
|
|
"<strong>Just a test email.</strong> <small>(sent at {sent_date | h})</small>"
|
|
if not options.test_mako
|
|
else MakoTemplate( # nosec
|
|
"<strong>Just a test email.</strong> "
|
|
"<small>(sent at ${sent_date | h})</small>"
|
|
)
|
|
),
|
|
}
|
|
},
|
|
)
|
|
|
|
logging.info("Send a test email to %s", options.test_to)
|
|
if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()):
|
|
logging.info("Test email sent")
|
|
sys.exit(0)
|
|
logging.error("Fail to send test email")
|
|
sys.exit(1)
|