python-mylib/mylib/email.py
Benjamin Renard 62c3fadf96
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Introduce pyupgrade,isort,black and configure pre-commit hooks to run all testing tools before commit
2023-01-16 12:56:12 +01:00

504 lines
17 KiB
Python

""" Email client to forge and send emails """
import email.utils
import logging
import os
import smtplib
from email.encoders import encode_base64
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from mako.template import Template as MakoTemplate
from mylib.config import (
BooleanOption,
ConfigurableObject,
IntegerOption,
PasswordOption,
StringOption,
)
log = logging.getLogger(__name__)
class EmailClient(
ConfigurableObject
): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
"""
Email client
This class abstract all interactions with the SMTP server.
"""
_config_name = "email"
_config_comment = "Email"
_defaults = {
"smtp_host": "localhost",
"smtp_port": 25,
"smtp_ssl": False,
"smtp_tls": False,
"smtp_user": None,
"smtp_password": None,
"smtp_debug": False,
"sender_name": "No reply",
"sender_email": "noreply@localhost",
"encoding": "utf-8",
"catch_all_addr": None,
"just_try": False,
}
templates = {}
def __init__(self, templates=None, **kwargs):
super().__init__(**kwargs)
assert templates is None or isinstance(templates, dict)
self.templates = templates if templates else {}
# pylint: disable=arguments-differ,arguments-renamed
def configure(self, use_smtp=True, just_try=True, **kwargs):
"""Configure options on registered mylib.Config object"""
section = super().configure(**kwargs)
if use_smtp:
section.add_option(
StringOption,
"smtp_host",
default=self._defaults["smtp_host"],
comment="SMTP server hostname/IP address",
)
section.add_option(
IntegerOption,
"smtp_port",
default=self._defaults["smtp_port"],
comment="SMTP server port",
)
section.add_option(
BooleanOption,
"smtp_ssl",
default=self._defaults["smtp_ssl"],
comment="Use SSL on SMTP server connection",
)
section.add_option(
BooleanOption,
"smtp_tls",
default=self._defaults["smtp_tls"],
comment="Use TLS on SMTP server connection",
)
section.add_option(
StringOption,
"smtp_user",
default=self._defaults["smtp_user"],
comment="SMTP authentication username",
)
section.add_option(
PasswordOption,
"smtp_password",
default=self._defaults["smtp_password"],
comment='SMTP authentication password (set to "keyring" to use XDG keyring)',
username_option="smtp_user",
keyring_value="keyring",
)
section.add_option(
BooleanOption,
"smtp_debug",
default=self._defaults["smtp_debug"],
comment="Enable SMTP debugging",
)
section.add_option(
StringOption,
"sender_name",
default=self._defaults["sender_name"],
comment="Sender name",
)
section.add_option(
StringOption,
"sender_email",
default=self._defaults["sender_email"],
comment="Sender email address",
)
section.add_option(
StringOption, "encoding", default=self._defaults["encoding"], comment="Email encoding"
)
section.add_option(
StringOption,
"catch_all_addr",
default=self._defaults["catch_all_addr"],
comment="Catch all sent emails to this specified email address",
)
if just_try:
section.add_option(
BooleanOption,
"just_try",
default=self._defaults["just_try"],
comment="Just-try mode: do not really send emails",
)
return section
def forge_message(
self,
rcpt_to,
subject=None,
html_body=None,
text_body=None, # pylint: disable=too-many-arguments,too-many-locals
attachment_files=None,
attachment_payloads=None,
sender_name=None,
sender_email=None,
encoding=None,
template=None,
**template_vars,
):
"""
Forge a message
:param rcpt_to: The recipient of the email. Could be a tuple(name, email) or
just the email of the recipient.
:param subject: The subject of the email.
:param html_body: The HTML body of the email
:param text_body: The plain text body of the email
:param attachment_files: List of filepaths to attach
:param attachment_payloads: List of tuples with filename and payload to attach
:param sender_name: Custom sender name (default: as defined on initialization)
:param sender_email: Custom sender email (default: as defined on initialization)
:param encoding: Email content encoding (default: as defined on initialization)
:param template: The name of a template to use to forge this email
All other parameters will be consider as template variables.
"""
msg = MIMEMultipart("alternative")
msg["To"] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to
msg["From"] = email.utils.formataddr(
(
sender_name or self._get_option("sender_name"),
sender_email or self._get_option("sender_email"),
)
)
if subject:
msg["Subject"] = subject.format(**template_vars)
msg["Date"] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self._get_option("encoding")
if template:
assert template in self.templates, f"Unknwon template {template}"
# Handle subject from template
if not subject:
assert self.templates[template].get(
"subject"
), f"No subject defined in template {template}"
msg["Subject"] = self.templates[template]["subject"].format(**template_vars)
# Put HTML part in last one to prefered it
parts = []
if self.templates[template].get("text"):
if isinstance(self.templates[template]["text"], MakoTemplate):
parts.append(
(self.templates[template]["text"].render(**template_vars), "plain")
)
else:
parts.append(
(self.templates[template]["text"].format(**template_vars), "plain")
)
if self.templates[template].get("html"):
if isinstance(self.templates[template]["html"], MakoTemplate):
parts.append((self.templates[template]["html"].render(**template_vars), "html"))
else:
parts.append((self.templates[template]["html"].format(**template_vars), "html"))
for body, mime_type in parts:
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
else:
assert subject, "No subject provided"
if text_body:
msg.attach(MIMEText(text_body.encode(encoding), "plain", _charset=encoding))
if html_body:
msg.attach(MIMEText(html_body.encode(encoding), "html", _charset=encoding))
if attachment_files:
for filepath in attachment_files:
with open(filepath, "rb") as fp:
part = MIMEBase("application", "octet-stream")
part.set_payload(fp.read())
encode_base64(part)
part.add_header(
"Content-Disposition",
f'attachment; filename="{os.path.basename(filepath)}"',
)
msg.attach(part)
if attachment_payloads:
for filename, payload in attachment_payloads:
part = MIMEBase("application", "octet-stream")
part.set_payload(payload)
encode_base64(part)
part.add_header("Content-Disposition", f'attachment; filename="{filename}"')
msg.attach(part)
return msg
def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args):
"""
Send an email
:param rcpt_to: The recipient of the email. Could be a tuple(name, email)
or just the email of the recipient.
:param msg: The message of this email (as MIMEBase or derivated classes)
:param subject: The subject of the email (only if the message is not provided
using msg parameter)
:param just_try: Enable just try mode (do not really send email, default: as defined on
initialization)
All other parameters will be consider as parameters to forge the message
(only if the message is not provided using msg parameter).
"""
msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args)
if just_try or self._get_option("just_try"):
log.debug(
'Just-try mode: do not really send this email to %s (subject="%s")',
rcpt_to,
subject or msg.get("subject", "No subject"),
)
return True
catch_addr = self._get_option("catch_all_addr")
if catch_addr:
log.debug("Catch email originaly send to %s to %s", rcpt_to, catch_addr)
rcpt_to = catch_addr
smtp_host = self._get_option("smtp_host")
smtp_port = self._get_option("smtp_port")
try:
if self._get_option("smtp_ssl"):
logging.info("Establish SSL connection to server %s:%s", smtp_host, smtp_port)
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
else:
logging.info("Establish connection to server %s:%s", smtp_host, smtp_port)
server = smtplib.SMTP(smtp_host, smtp_port)
if self._get_option("smtp_tls"):
logging.info("Start TLS on SMTP connection")
server.starttls()
except smtplib.SMTPException:
log.error("Error connecting to SMTP server %s:%s", smtp_host, smtp_port, exc_info=True)
return False
if self._get_option("smtp_debug"):
server.set_debuglevel(True)
smtp_user = self._get_option("smtp_user")
smtp_password = self._get_option("smtp_password")
if smtp_user and smtp_password:
try:
log.info("Try to authenticate on SMTP connection as %s", smtp_user)
server.login(smtp_user, smtp_password)
except smtplib.SMTPException:
log.error(
"Error authenticating on SMTP server %s:%s with user %s",
smtp_host,
smtp_port,
smtp_user,
exc_info=True,
)
return False
error = False
try:
log.info("Sending email to %s", rcpt_to)
server.sendmail(
self._get_option("sender_email"),
[rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to],
msg.as_string(),
)
except smtplib.SMTPException:
error = True
log.error("Error sending email to %s", rcpt_to, exc_info=True)
finally:
server.quit()
return not error
if __name__ == "__main__":
# Run tests
import argparse
import datetime
import sys
# Options parser
parser = argparse.ArgumentParser()
parser.add_argument(
"-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode"
)
parser.add_argument(
"-d", "--debug", action="store_true", dest="debug", help="Enable debug mode"
)
parser.add_argument(
"-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path"
)
parser.add_argument(
"-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode"
)
email_opts = parser.add_argument_group("Email options")
email_opts.add_argument(
"-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host"
)
email_opts.add_argument(
"-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port"
)
email_opts.add_argument(
"-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL"
)
email_opts.add_argument(
"-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS"
)
email_opts.add_argument(
"-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username"
)
email_opts.add_argument(
"-p",
"--smtp-password",
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password",
)
email_opts.add_argument(
"-D",
"--smtp-debug",
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection",
)
email_opts.add_argument(
"-e",
"--email-encoding",
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding",
)
email_opts.add_argument(
"-f",
"--sender-name",
action="store",
type=str,
dest="email_sender_name",
help="Sender name",
)
email_opts.add_argument(
"-F",
"--sender-email",
action="store",
type=str,
dest="email_sender_email",
help="Sender email",
)
email_opts.add_argument(
"-C",
"--catch-all",
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address",
)
test_opts = parser.add_argument_group("Test email options")
test_opts.add_argument(
"-t",
"--to",
action="store",
type=str,
dest="test_to",
help="Test email recipient",
)
test_opts.add_argument(
"-m",
"--mako",
action="store_true",
dest="test_mako",
help="Test mako templating",
)
options = parser.parse_args()
if not options.test_to:
parser.error("You must specify test email recipient using -t/--to parameter")
sys.exit(1)
# Initialize logs
logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s"
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
if options.logfile:
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
if options.email_smtp_user and not options.email_smtp_password:
import getpass
options.email_smtp_password = getpass.getpass("Please enter SMTP password: ")
logging.info("Initialize Email client")
email_client = EmailClient(
smtp_host=options.email_smtp_host,
smtp_port=options.email_smtp_port,
smtp_ssl=options.email_smtp_ssl,
smtp_tls=options.email_smtp_tls,
smtp_user=options.email_smtp_user,
smtp_password=options.email_smtp_password,
smtp_debug=options.email_smtp_debug,
sender_name=options.email_sender_name,
sender_email=options.email_sender_email,
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates=dict(
test=dict(
subject="Test email",
text=(
"Just a test email sent at {sent_date}."
if not options.test_mako
else MakoTemplate("Just a test email sent at ${sent_date}.")
),
html=(
"<strong>Just a test email.</strong> <small>(sent at {sent_date})</small>"
if not options.test_mako
else MakoTemplate(
"<strong>Just a test email.</strong> <small>(sent at ${sent_date})</small>"
)
),
)
),
)
logging.info("Send a test email to %s", options.test_to)
if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()):
logging.info("Test email sent")
sys.exit(0)
logging.error("Fail to send test email")
sys.exit(1)