460 lines
16 KiB
Python
460 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
""" Email client to forge and send emails """
|
|
|
|
import logging
|
|
import os
|
|
import smtplib
|
|
import email.utils
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.encoders import encode_base64
|
|
|
|
from mako.template import Template as MakoTemplate
|
|
|
|
from mylib.config import ConfigurableObject
|
|
from mylib.config import BooleanOption
|
|
from mylib.config import IntegerOption
|
|
from mylib.config import PasswordOption
|
|
from mylib.config import StringOption
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailClient(ConfigurableObject): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
|
|
"""
|
|
Email client
|
|
|
|
This class abstract all interactions with the SMTP server.
|
|
"""
|
|
|
|
_config_name = 'email'
|
|
_config_comment = 'Email'
|
|
_defaults = {
|
|
'smtp_host': 'localhost',
|
|
'smtp_port': 25,
|
|
'smtp_ssl': False,
|
|
'smtp_tls': False,
|
|
'smtp_user': None,
|
|
'smtp_password': None,
|
|
'smtp_debug': False,
|
|
'sender_name': 'No reply',
|
|
'sender_email': 'noreply@localhost',
|
|
'encoding': 'utf-8',
|
|
'catch_all_addr': None,
|
|
'just_try': False,
|
|
}
|
|
|
|
templates = {}
|
|
|
|
def __init__(self, templates=None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
assert templates is None or isinstance(templates, dict)
|
|
self.templates = templates if templates else {}
|
|
|
|
# pylint: disable=arguments-differ,arguments-renamed
|
|
def configure(self, use_smtp=True, just_try=True, ** kwargs):
|
|
""" Configure options on registered mylib.Config object """
|
|
section = super().configure(**kwargs)
|
|
|
|
if use_smtp:
|
|
section.add_option(
|
|
StringOption, 'smtp_host', default=self._defaults['smtp_host'],
|
|
comment='SMTP server hostname/IP address')
|
|
section.add_option(
|
|
IntegerOption, 'smtp_port', default=self._defaults['smtp_port'],
|
|
comment='SMTP server port')
|
|
section.add_option(
|
|
BooleanOption, 'smtp_ssl', default=self._defaults['smtp_ssl'],
|
|
comment='Use SSL on SMTP server connection')
|
|
section.add_option(
|
|
BooleanOption, 'smtp_tls', default=self._defaults['smtp_tls'],
|
|
comment='Use TLS on SMTP server connection')
|
|
section.add_option(
|
|
StringOption, 'smtp_user', default=self._defaults['smtp_user'],
|
|
comment='SMTP authentication username')
|
|
section.add_option(
|
|
PasswordOption, 'smtp_password', default=self._defaults['smtp_password'],
|
|
comment='SMTP authentication password (set to "keyring" to use XDG keyring)',
|
|
username_option='smtp_user', keyring_value='keyring')
|
|
section.add_option(
|
|
BooleanOption, 'smtp_debug', default=self._defaults['smtp_debug'],
|
|
comment='Enable SMTP debugging')
|
|
|
|
section.add_option(
|
|
StringOption, 'sender_name', default=self._defaults['sender_name'],
|
|
comment='Sender name')
|
|
section.add_option(
|
|
StringOption, 'sender_email', default=self._defaults['sender_email'],
|
|
comment='Sender email address')
|
|
section.add_option(
|
|
StringOption, 'encoding', default=self._defaults['encoding'],
|
|
comment='Email encoding')
|
|
section.add_option(
|
|
StringOption, 'catch_all_addr', default=self._defaults['catch_all_addr'],
|
|
comment='Catch all sent emails to this specified email address')
|
|
|
|
if just_try:
|
|
section.add_option(
|
|
BooleanOption, 'just_try', default=self._defaults['just_try'],
|
|
comment='Just-try mode: do not really send emails')
|
|
|
|
return section
|
|
|
|
def forge_message(self, rcpt_to, subject=None, html_body=None, text_body=None, # pylint: disable=too-many-arguments,too-many-locals
|
|
attachment_files=None, attachment_payloads=None, sender_name=None,
|
|
sender_email=None, encoding=None, template=None, **template_vars):
|
|
"""
|
|
Forge a message
|
|
|
|
:param rcpt_to: The recipient of the email. Could be a tuple(name, email) or just the email of the recipient.
|
|
:param subject: The subject of the email.
|
|
:param html_body: The HTML body of the email
|
|
:param text_body: The plain text body of the email
|
|
:param attachment_files: List of filepaths to attach
|
|
:param attachment_payloads: List of tuples with filename and payload to attach
|
|
:param sender_name: Custom sender name (default: as defined on initialization)
|
|
:param sender_email: Custom sender email (default: as defined on initialization)
|
|
:param encoding: Email content encoding (default: as defined on initialization)
|
|
:param template: The name of a template to use to forge this email
|
|
|
|
All other parameters will be consider as template variables.
|
|
"""
|
|
msg = MIMEMultipart('alternative')
|
|
msg['To'] = email.utils.formataddr(rcpt_to) if isinstance(rcpt_to, tuple) else rcpt_to
|
|
msg['From'] = email.utils.formataddr(
|
|
(
|
|
sender_name or self._get_option('sender_name'),
|
|
sender_email or self._get_option('sender_email')
|
|
)
|
|
)
|
|
if subject:
|
|
msg['Subject'] = subject.format(**template_vars)
|
|
msg['Date'] = email.utils.formatdate(None, True)
|
|
encoding = encoding if encoding else self._get_option('encoding')
|
|
if template:
|
|
assert template in self.templates, f'Unknwon template {template}'
|
|
# Handle subject from template
|
|
if not subject:
|
|
assert self.templates[template].get('subject'), f'No subject defined in template {template}'
|
|
msg['Subject'] = self.templates[template]['subject'].format(**template_vars)
|
|
|
|
# Put HTML part in last one to prefered it
|
|
parts = []
|
|
if self.templates[template].get('text'):
|
|
if isinstance(self.templates[template]['text'], MakoTemplate):
|
|
parts.append((self.templates[template]['text'].render(**template_vars), 'plain'))
|
|
else:
|
|
parts.append((self.templates[template]['text'].format(**template_vars), 'plain'))
|
|
if self.templates[template].get('html'):
|
|
if isinstance(self.templates[template]['html'], MakoTemplate):
|
|
parts.append((self.templates[template]['html'].render(**template_vars), 'html'))
|
|
else:
|
|
parts.append((self.templates[template]['html'].format(**template_vars), 'html'))
|
|
|
|
for body, mime_type in parts:
|
|
msg.attach(MIMEText(body.encode(encoding), mime_type, _charset=encoding))
|
|
else:
|
|
assert subject, 'No subject provided'
|
|
if text_body:
|
|
msg.attach(MIMEText(text_body.encode(encoding), 'plain', _charset=encoding))
|
|
if html_body:
|
|
msg.attach(MIMEText(html_body.encode(encoding), 'html', _charset=encoding))
|
|
if attachment_files:
|
|
for filepath in attachment_files:
|
|
with open(filepath, 'rb') as fp:
|
|
part = MIMEBase('application', "octet-stream")
|
|
part.set_payload(fp.read())
|
|
encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
f'attachment; filename="{os.path.basename(filepath)}"')
|
|
msg.attach(part)
|
|
if attachment_payloads:
|
|
for filename, payload in attachment_payloads:
|
|
part = MIMEBase('application', "octet-stream")
|
|
part.set_payload(payload)
|
|
encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
f'attachment; filename="{filename}"')
|
|
msg.attach(part)
|
|
return msg
|
|
|
|
def send(self, rcpt_to, msg=None, subject=None, just_try=False, **forge_args):
|
|
"""
|
|
Send an email
|
|
|
|
:param rcpt_to: The recipient of the email. Could be a tuple(name, email)
|
|
or just the email of the recipient.
|
|
:param msg: The message of this email (as MIMEBase or derivated classes)
|
|
:param subject: The subject of the email (only if the message is not provided
|
|
using msg parameter)
|
|
:param just_try: Enable just try mode (do not really send email, default: as defined on initialization)
|
|
|
|
All other parameters will be consider as parameters to forge the message
|
|
(only if the message is not provided using msg parameter).
|
|
"""
|
|
msg = msg if msg else self.forge_message(rcpt_to, subject, **forge_args)
|
|
|
|
if just_try or self._get_option('just_try'):
|
|
log.debug('Just-try mode: do not really send this email to %s (subject="%s")', rcpt_to, subject or msg.get('subject', 'No subject'))
|
|
return True
|
|
|
|
catch_addr = self._get_option('catch_all_addr')
|
|
if catch_addr:
|
|
log.debug('Catch email originaly send to %s to %s', rcpt_to, catch_addr)
|
|
rcpt_to = catch_addr
|
|
|
|
smtp_host = self._get_option('smtp_host')
|
|
smtp_port = self._get_option('smtp_port')
|
|
try:
|
|
if self._get_option('smtp_ssl'):
|
|
logging.info("Establish SSL connection to server %s:%s", smtp_host, smtp_port)
|
|
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
|
|
else:
|
|
logging.info("Establish connection to server %s:%s", smtp_host, smtp_port)
|
|
server = smtplib.SMTP(smtp_host, smtp_port)
|
|
if self._get_option('smtp_tls'):
|
|
logging.info('Start TLS on SMTP connection')
|
|
server.starttls()
|
|
except smtplib.SMTPException:
|
|
log.error('Error connecting to SMTP server %s:%s', smtp_host, smtp_port, exc_info=True)
|
|
return False
|
|
|
|
if self._get_option('smtp_debug'):
|
|
server.set_debuglevel(True)
|
|
|
|
smtp_user = self._get_option('smtp_user')
|
|
smtp_password = self._get_option('smtp_password')
|
|
if smtp_user and smtp_password:
|
|
try:
|
|
log.info('Try to authenticate on SMTP connection as %s', smtp_user)
|
|
server.login(smtp_user, smtp_password)
|
|
except smtplib.SMTPException:
|
|
log.error(
|
|
'Error authenticating on SMTP server %s:%s with user %s',
|
|
smtp_host, smtp_port, smtp_user, exc_info=True)
|
|
return False
|
|
|
|
error = False
|
|
try:
|
|
log.info('Sending email to %s', rcpt_to)
|
|
server.sendmail(
|
|
self._get_option('sender_email'),
|
|
[rcpt_to[1] if isinstance(rcpt_to, tuple) else rcpt_to],
|
|
msg.as_string()
|
|
)
|
|
except smtplib.SMTPException:
|
|
error = True
|
|
log.error('Error sending email to %s', rcpt_to, exc_info=True)
|
|
finally:
|
|
server.quit()
|
|
|
|
return not error
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Run tests
|
|
import datetime
|
|
import sys
|
|
|
|
import argparse
|
|
|
|
# Options parser
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
'-v', '--verbose',
|
|
action="store_true",
|
|
dest="verbose",
|
|
help="Enable verbose mode"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-d', '--debug',
|
|
action="store_true",
|
|
dest="debug",
|
|
help="Enable debug mode"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-l', '--log-file',
|
|
action="store",
|
|
type=str,
|
|
dest="logfile",
|
|
help="Log file path"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-j', '--just-try',
|
|
action="store_true",
|
|
dest="just_try",
|
|
help="Enable just-try mode"
|
|
)
|
|
|
|
email_opts = parser.add_argument_group('Email options')
|
|
|
|
email_opts.add_argument(
|
|
'-H', '--smtp-host',
|
|
action="store",
|
|
type=str,
|
|
dest="email_smtp_host",
|
|
help="SMTP host"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-P', '--smtp-port',
|
|
action="store",
|
|
type=int,
|
|
dest="email_smtp_port",
|
|
help="SMTP port"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-S', '--smtp-ssl',
|
|
action="store_true",
|
|
dest="email_smtp_ssl",
|
|
help="Use SSL"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-T', '--smtp-tls',
|
|
action="store_true",
|
|
dest="email_smtp_tls",
|
|
help="Use TLS"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-u', '--smtp-user',
|
|
action="store",
|
|
type=str,
|
|
dest="email_smtp_user",
|
|
help="SMTP username"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-p', '--smtp-password',
|
|
action="store",
|
|
type=str,
|
|
dest="email_smtp_password",
|
|
help="SMTP password"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-D', '--smtp-debug',
|
|
action="store_true",
|
|
dest="email_smtp_debug",
|
|
help="Debug SMTP connection"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-e', '--email-encoding',
|
|
action="store",
|
|
type=str,
|
|
dest="email_encoding",
|
|
help="SMTP encoding"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-f', '--sender-name',
|
|
action="store",
|
|
type=str,
|
|
dest="email_sender_name",
|
|
help="Sender name"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-F', '--sender-email',
|
|
action="store",
|
|
type=str,
|
|
dest="email_sender_email",
|
|
help="Sender email"
|
|
)
|
|
|
|
email_opts.add_argument(
|
|
'-C', '--catch-all',
|
|
action="store",
|
|
type=str,
|
|
dest="email_catch_all",
|
|
help="Catch all sent email: specify catch recipient email address"
|
|
)
|
|
|
|
test_opts = parser.add_argument_group('Test email options')
|
|
|
|
test_opts.add_argument(
|
|
'-t', '--to',
|
|
action="store",
|
|
type=str,
|
|
dest="test_to",
|
|
help="Test email recipient",
|
|
)
|
|
|
|
test_opts.add_argument(
|
|
'-m', '--mako',
|
|
action="store_true",
|
|
dest="test_mako",
|
|
help="Test mako templating",
|
|
)
|
|
|
|
options = parser.parse_args()
|
|
|
|
if not options.test_to:
|
|
parser.error('You must specify test email recipient using -t/--to parameter')
|
|
sys.exit(1)
|
|
|
|
# Initialize logs
|
|
logformat = '%(asctime)s - Test EmailClient - %(levelname)s - %(message)s'
|
|
if options.debug:
|
|
loglevel = logging.DEBUG
|
|
elif options.verbose:
|
|
loglevel = logging.INFO
|
|
else:
|
|
loglevel = logging.WARNING
|
|
|
|
if options.logfile:
|
|
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
|
|
else:
|
|
logging.basicConfig(level=loglevel, format=logformat)
|
|
|
|
if options.email_smtp_user and not options.email_smtp_password:
|
|
import getpass
|
|
options.email_smtp_password = getpass.getpass('Please enter SMTP password: ')
|
|
|
|
logging.info('Initialize Email client')
|
|
email_client = EmailClient(
|
|
smtp_host=options.email_smtp_host,
|
|
smtp_port=options.email_smtp_port,
|
|
smtp_ssl=options.email_smtp_ssl,
|
|
smtp_tls=options.email_smtp_tls,
|
|
smtp_user=options.email_smtp_user,
|
|
smtp_password=options.email_smtp_password,
|
|
smtp_debug=options.email_smtp_debug,
|
|
sender_name=options.email_sender_name,
|
|
sender_email=options.email_sender_email,
|
|
catch_all_addr=options.email_catch_all,
|
|
just_try=options.just_try,
|
|
encoding=options.email_encoding,
|
|
templates=dict(
|
|
test=dict(
|
|
subject="Test email",
|
|
text=(
|
|
"Just a test email sent at {sent_date}." if not options.test_mako else
|
|
MakoTemplate("Just a test email sent at ${sent_date}.")
|
|
),
|
|
html=(
|
|
"<strong>Just a test email.</strong> <small>(sent at {sent_date})</small>" if not options.test_mako else
|
|
MakoTemplate("<strong>Just a test email.</strong> <small>(sent at ${sent_date})</small>")
|
|
)
|
|
)
|
|
)
|
|
)
|
|
|
|
logging.info('Send a test email to %s', options.test_to)
|
|
if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()):
|
|
logging.info('Test email sent')
|
|
sys.exit(0)
|
|
logging.error('Fail to send test email')
|
|
sys.exit(1)
|