Add option to raise an exception on validation error instead of returning a boolean
This commit is contained in:
parent
f0bd6d132d
commit
55cfd4a7de
1 changed files with 233 additions and 103 deletions
|
@ -14,6 +14,8 @@
|
||||||
# Website: https://gogs.zionetrix.net/bn8/mass_validate_email
|
# Website: https://gogs.zionetrix.net/bn8/mass_validate_email
|
||||||
# Licence: LGPL
|
# Licence: LGPL
|
||||||
|
|
||||||
|
""" Mass email addresses validation tools """
|
||||||
|
|
||||||
import smtplib
|
import smtplib
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
|
@ -28,17 +30,99 @@ except DNS.ServerError, err:
|
||||||
logging.fatal("Error discovering DNS servers : %s", err)
|
logging.fatal("Error discovering DNS servers : %s", err)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# options
|
# Exception
|
||||||
class OptionsClass(object):
|
class EmailInvalid(Exception):
|
||||||
|
""" Generic invalid email exception """
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, email, error_msg=None):
|
||||||
self.debug = False
|
self.email = email
|
||||||
self.debugsmtp = False
|
self.error_msg = error_msg or "Invalid email address"
|
||||||
self.checkmx = False
|
super(EmailInvalid, self).__init__("%s : %s" % (email, self.error_msg))
|
||||||
self.verifyaddress = False
|
|
||||||
self.usesmtpvrfy = False
|
|
||||||
self.refusemailifnotpermit = True
|
class EmailInvalidSyntax(EmailInvalid):
|
||||||
self.refuseontemporaryerror = True
|
""" Exception raised when an email address is invalid by syntax """
|
||||||
|
|
||||||
|
def __init__(self, email):
|
||||||
|
super(EmailInvalidSyntax, self).__init__(email, "Invalid email address syntax")
|
||||||
|
|
||||||
|
|
||||||
|
class EmailInvalidDomain(EmailInvalid):
|
||||||
|
""" Exceptiond raise when an email address is from an invalid mail domain """
|
||||||
|
|
||||||
|
def __init__(self, email, domain, cause):
|
||||||
|
self.domain = domain
|
||||||
|
self.cause = cause
|
||||||
|
super(EmailInvalidDomain, self).__init__(email, "Invalid email domain : %s" % domain)
|
||||||
|
|
||||||
|
|
||||||
|
class NoMXhostAvailable(EmailInvalid):
|
||||||
|
""" Exception raised when an email address is from a mail domain without available MX host """
|
||||||
|
|
||||||
|
def __init__(self, email, mx_hosts=None, mx_hosts_error=None):
|
||||||
|
self.mx_hosts = mx_hosts
|
||||||
|
self.mx_hosts_error = mx_hosts_error or {}
|
||||||
|
if mx_hosts_error:
|
||||||
|
super(NoMXhostAvailable, self).__init__(email, "No MX hosts available : %s" % ', '.join([mx_hosts_error[host].error_msg for host in mx_hosts_error]))
|
||||||
|
else:
|
||||||
|
super(NoMXhostAvailable, self).__init__(email, "No MX hosts available")
|
||||||
|
|
||||||
|
|
||||||
|
class EmailRefused(EmailInvalid):
|
||||||
|
""" Exception raised when an email address is refused by the MX host """
|
||||||
|
|
||||||
|
def __init__(self, email, mx_host=None):
|
||||||
|
self.mx_hosts = mx_host
|
||||||
|
if mx_host:
|
||||||
|
super(EmailRefused, self).__init__(email, "MX host %s refused this email" % mx_host)
|
||||||
|
else:
|
||||||
|
super(EmailRefused, self).__init__(email, "MX hosts refused this email")
|
||||||
|
|
||||||
|
|
||||||
|
class MXUnavailable(EmailInvalid):
|
||||||
|
""" Exception raised when an MX host is not available to validate an email address """
|
||||||
|
|
||||||
|
def __init__(self, email, mx_host, error_msg=None):
|
||||||
|
self.mx_host = mx_host
|
||||||
|
super(MXUnavailable, self).__init__(email, error_msg or "%s : MX host %s unavailable" % (email, mx_host))
|
||||||
|
|
||||||
|
|
||||||
|
class TemporaryErrorOnMX(MXUnavailable):
|
||||||
|
""" Exception raised when an MX host raise a temporary error validating an email address """
|
||||||
|
|
||||||
|
def __init__(self, email, mx_host, msg=None):
|
||||||
|
self.msg = msg
|
||||||
|
if msg:
|
||||||
|
error_msg = "%s : temporary error occured on MX host %s : %s" % (email, mx_host, msg)
|
||||||
|
else:
|
||||||
|
error_msg = "%s : temporary error occured on MX host %s" % (email, mx_host)
|
||||||
|
super(TemporaryErrorOnMX, self).__init__(email, mx_host, error_msg)
|
||||||
|
|
||||||
|
|
||||||
|
class MXRefuseConnection(MXUnavailable):
|
||||||
|
""" Exception raised when an MX host refuse connection validating an email address """
|
||||||
|
|
||||||
|
def __init__(self, email, mx_host, msg=None):
|
||||||
|
self.msg = msg
|
||||||
|
if msg:
|
||||||
|
error_msg = "%s : MX host %s refuse connection : %s" % (email, mx_host, msg)
|
||||||
|
else:
|
||||||
|
error_msg = "%s : MX host %s refuse connection" % (email, mx_host)
|
||||||
|
super(MXRefuseConnection, self).__init__(email, mx_host, error_msg)
|
||||||
|
|
||||||
|
|
||||||
|
# Options
|
||||||
|
class OptionsClass(object):
|
||||||
|
""" Class used to defined validation options """
|
||||||
|
|
||||||
|
debug = False
|
||||||
|
debugsmtp = False
|
||||||
|
checkmx = False
|
||||||
|
verifyaddress = False
|
||||||
|
usesmtpvrfy = False
|
||||||
|
acceptoncnxrefused = False
|
||||||
|
acceptontemporaryerror = False
|
||||||
|
raiseonerror = False
|
||||||
|
|
||||||
options = OptionsClass()
|
options = OptionsClass()
|
||||||
|
|
||||||
|
@ -49,82 +133,101 @@ def clean_mail(mail):
|
||||||
mail = str(mail).lower().strip()
|
mail = str(mail).lower().strip()
|
||||||
return mail
|
return mail
|
||||||
|
|
||||||
|
# Cache domain info
|
||||||
|
|
||||||
domain_mx = {}
|
# Domains's MX hosts
|
||||||
valid_domain = []
|
domains_mx_hosts = {}
|
||||||
invalid_domain = []
|
|
||||||
def check_mx(mail):
|
# List of valid domains
|
||||||
dom = mail[mail.find('@')+1:]
|
valid_domains = []
|
||||||
if not options.verifyaddress:
|
|
||||||
if dom in valid_domain:
|
# List of invalid domains (with invalid cause)
|
||||||
return True
|
invalid_domains = {}
|
||||||
elif dom in invalid_domain:
|
|
||||||
return False
|
# List of domain without available MX host (with unavailable cause)
|
||||||
else:
|
mx_unavailable_domain = {}
|
||||||
|
|
||||||
|
def get_mail_domain_and_mx_hosts(mail):
|
||||||
|
""" Retreive domain name and it's MX hosts from an email address """
|
||||||
|
domain = mail[mail.find('@')+1:]
|
||||||
|
if domain in domains_mx_hosts:
|
||||||
|
return (domain, domains_mx_hosts[domain])
|
||||||
|
if domain in invalid_domains:
|
||||||
|
if options.raiseonerror:
|
||||||
|
raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
|
||||||
|
return (domain, False)
|
||||||
try:
|
try:
|
||||||
mx_hosts = DNS.mxlookup(dom)
|
# Retreive domain's MX hosts info
|
||||||
if len(mx_hosts) > 0:
|
mx_hosts_info = DNS.mxlookup(domain)
|
||||||
domain_mx[dom] = []
|
if len(mx_hosts_info) > 0:
|
||||||
for mx_host in mx_hosts:
|
domains_mx_hosts[domain] = [mx_host_info[1] for mx_host_info in mx_hosts_info]
|
||||||
domain_mx[dom].append(mx_host[1])
|
logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
|
||||||
logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom]))
|
valid_domains.append(domain)
|
||||||
valid_domain.append(dom)
|
return (domain, domains_mx_hosts[domain])
|
||||||
return True
|
|
||||||
elif connect_to_mx(dom):
|
# If domain have no MX hosts, try on domain name it self
|
||||||
domain_mx[dom] = [dom]
|
if connect_to_mx(domain):
|
||||||
logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom]))
|
domains_mx_hosts[domain] = [domain]
|
||||||
valid_domain.append(dom)
|
logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
|
||||||
return True
|
valid_domains.append(domain)
|
||||||
else:
|
return (domain, domains_mx_hosts[domain])
|
||||||
logging.debug("No valid MX of domain %s found", dom)
|
|
||||||
invalid_domain.append(dom)
|
# No valid MX host found for this domain
|
||||||
return False
|
logging.debug("No valid MX of domain %s found", domain)
|
||||||
|
invalid_domains[domain] = "No valid MX hosts found"
|
||||||
except DNS.ServerError, err:
|
except DNS.ServerError, err:
|
||||||
logging.debug('Error getting MX servers of domain %s : %s', dom, err)
|
logging.debug('Error getting MX servers of domain %s : %s', domain, err)
|
||||||
invalid_domain.append(dom)
|
invalid_domains[domain] = 'DNS server error getting MX hosts : %s' % err
|
||||||
|
|
||||||
|
if options.raiseonerror:
|
||||||
|
raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
|
||||||
|
return (domain, False)
|
||||||
|
|
||||||
|
def check_mx(mail):
|
||||||
|
""" MX check of an email address """
|
||||||
|
domain, mx_hosts = get_mail_domain_and_mx_hosts(mail)
|
||||||
|
if not mx_hosts:
|
||||||
return False
|
return False
|
||||||
else:
|
|
||||||
if dom in invalid_domain:
|
if not options.verifyaddress:
|
||||||
|
# We don't have to connect on MX host : just check if domain have at least on MX host
|
||||||
|
return bool(mx_hosts)
|
||||||
|
|
||||||
|
if domain in mx_unavailable_domain:
|
||||||
|
if options.raiseonerror:
|
||||||
|
raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
|
||||||
return False
|
return False
|
||||||
if dom in domain_mx:
|
|
||||||
for mx_host in domain_mx[dom]:
|
# Check mail on MX hosts
|
||||||
|
no_mx_available = True
|
||||||
|
mx_unavailable_errors = []
|
||||||
|
for mx_host in mx_hosts:
|
||||||
con = connect_to_mx(mx_host)
|
con = connect_to_mx(mx_host)
|
||||||
if not con:
|
if not con:
|
||||||
|
mx_unavailable_errors[mx_host] = "%s : Fail to connect on MX host" % mx_host
|
||||||
continue
|
continue
|
||||||
if check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)):
|
no_mx_available = False
|
||||||
return True
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
mx_hosts = DNS.mxlookup(dom)
|
if verify_mail_on_mx_host(domain, con, mail, accept_on_cnx_refused=options.acceptoncnxrefused):
|
||||||
if len(mx_hosts) > 0:
|
return True
|
||||||
domain_mx[dom] = []
|
except EmailRefused:
|
||||||
for mx_host in mx_hosts:
|
if options.raiseonerror:
|
||||||
domain_mx[dom].append(mx_host[1])
|
raise
|
||||||
valid_domain.append(dom)
|
|
||||||
return check_mx(mail)
|
|
||||||
else:
|
|
||||||
# Directly check MX and mail
|
|
||||||
con = connect_to_mx(dom)
|
|
||||||
if not con:
|
|
||||||
invalid_domain.append(dom)
|
|
||||||
return False
|
return False
|
||||||
domain_mx[dom] = [dom]
|
except MXUnavailable as err:
|
||||||
return check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit))
|
mx_unavailable_errors[mx_host] = err
|
||||||
except DNS.ServerError, err:
|
if no_mx_available:
|
||||||
logging.debug('Error getting MX servers of domain %s : %s', dom, err)
|
mx_unavailable_domain[domain] = mx_unavailable_errors
|
||||||
invalid_domain.append(dom)
|
if options.raiseonerror:
|
||||||
|
raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
|
||||||
|
elif options.raiseonerror:
|
||||||
|
raise EmailRefused(mail)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
valid_mx = []
|
valid_mx = []
|
||||||
invalid_mx = []
|
invalid_mx = []
|
||||||
def verify_mx(mx_host, mail, check_mail=False):
|
|
||||||
if not check_mail and mx_host in valid_mx:
|
|
||||||
return True
|
|
||||||
elif not check_mail and mx_host in invalid_mx:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def connect_to_mx(mx_host):
|
def connect_to_mx(mx_host):
|
||||||
|
""" Connect on a MX host and return the smtplib corresponding connection object """
|
||||||
if mx_host in invalid_mx:
|
if mx_host in invalid_mx:
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
|
@ -149,18 +252,26 @@ def connect_to_mx(mx_host):
|
||||||
invalid_mx.append(mx_host)
|
invalid_mx.append(mx_host)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
mx_refuse_check_mail = []
|
mx_refuse_check_mail = {}
|
||||||
def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False):
|
def verify_mail_on_mx_host(mx_host, smtp, mail, accept_on_cnx_refused=False):
|
||||||
|
""" Verify an email address on a specific MX host """
|
||||||
if mx_host in mx_refuse_check_mail:
|
if mx_host in mx_refuse_check_mail:
|
||||||
return if_not_permit
|
if accept_on_cnx_refused:
|
||||||
|
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
|
||||||
|
return True
|
||||||
|
raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
|
||||||
try:
|
try:
|
||||||
status, _ = smtp.helo()
|
status, msg = smtp.helo()
|
||||||
if status != 250:
|
if status != 250:
|
||||||
mx_refuse_check_mail.append(mx_host)
|
mx_refuse_check_mail[mx_host] = msg
|
||||||
return if_not_permit
|
if accept_on_cnx_refused:
|
||||||
|
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
|
||||||
|
return True
|
||||||
|
raise MXRefuseConnection(mail, mx_host, msg)
|
||||||
|
|
||||||
if options.usesmtpvrfy:
|
if options.usesmtpvrfy:
|
||||||
(status, msg) = smtp.verify(mail)
|
(status, msg) = smtp.verify(mail)
|
||||||
|
logging.debug('%s : MX host %s return the code %s on VRFY command with the following message : %s', mail, mx_host, status, msg)
|
||||||
if status >= 250 and status < 260:
|
if status >= 250 and status < 260:
|
||||||
# Server normaly return an normalize email address
|
# Server normaly return an normalize email address
|
||||||
for word in msg.split(' '):
|
for word in msg.split(' '):
|
||||||
|
@ -170,20 +281,32 @@ def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False):
|
||||||
status, msg = smtp.rcpt(mail)
|
status, msg = smtp.rcpt(mail)
|
||||||
if status >= 400 and status < 500:
|
if status >= 400 and status < 500:
|
||||||
logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg)
|
logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg)
|
||||||
return not options.refuseontemporaryerror
|
if options.acceptontemporaryerror:
|
||||||
|
logging.debug('%s : MX host %s raise a temporary error but consider email as validated', mail, mx_host)
|
||||||
|
return True
|
||||||
|
raise TemporaryErrorOnMX(mail, mx_host, msg)
|
||||||
elif status != 250:
|
elif status != 250:
|
||||||
|
if options.raiseonerror:
|
||||||
|
raise EmailRefused(mail, mx_host)
|
||||||
return False
|
return False
|
||||||
|
logging.debug('%s : MX host %s accept email for this address with the following message : %s', mail, mx_host, msg)
|
||||||
return True
|
return True
|
||||||
except smtplib.SMTPServerDisconnected:
|
except smtplib.SMTPServerDisconnected:
|
||||||
# Server not permits verify user
|
# Server not permits verify user
|
||||||
mx_refuse_check_mail.append(mx_host)
|
mx_refuse_check_mail[mx_host] = "server disconnected during the exchange"
|
||||||
return if_not_permit
|
if accept_on_cnx_refused:
|
||||||
|
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
|
||||||
|
return True
|
||||||
|
raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
|
||||||
except smtplib.SMTPConnectError:
|
except smtplib.SMTPConnectError:
|
||||||
return False
|
raise MXUnavailable(mail, mx_host)
|
||||||
|
|
||||||
def mass_validate_email(mail, simple=False):
|
def mass_validate_email(mail, simple=False):
|
||||||
|
""" Validate an email address with mecanisms optimized for mass email addresses validation """
|
||||||
mail = clean_mail(mail)
|
mail = clean_mail(mail)
|
||||||
if not validate_email(mail):
|
if not validate_email(mail):
|
||||||
|
if options.raiseonerror:
|
||||||
|
raise EmailInvalidSyntax(mail)
|
||||||
return
|
return
|
||||||
elif simple:
|
elif simple:
|
||||||
return True
|
return True
|
||||||
|
@ -248,15 +371,15 @@ if __name__ == '__main__':
|
||||||
help="When MX check is enabled, enable the SMPT VRFY command usage"
|
help="When MX check is enabled, enable the SMPT VRFY command usage"
|
||||||
)
|
)
|
||||||
parser.add_option(
|
parser.add_option(
|
||||||
'--accept-mail-if-not-permit',
|
'--accept-email-on-cnx-refused',
|
||||||
action="store_false",
|
action="store_true",
|
||||||
dest="refusemailifnotpermit",
|
dest="acceptoncnxrefused",
|
||||||
help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)"
|
help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)"
|
||||||
)
|
)
|
||||||
parser.add_option(
|
parser.add_option(
|
||||||
'--accept-on-temporary-error',
|
'--accept-on-temporary-error',
|
||||||
action="store_false",
|
action="store_true",
|
||||||
dest="refuseontemporaryerror",
|
dest="acceptontemporaryerror",
|
||||||
help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)"
|
help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)"
|
||||||
)
|
)
|
||||||
parser.add_option(
|
parser.add_option(
|
||||||
|
@ -295,12 +418,13 @@ if __name__ == '__main__':
|
||||||
parser.error('You must specify emails address as arguments')
|
parser.error('You must specify emails address as arguments')
|
||||||
|
|
||||||
# Configure other options from command line arguments
|
# Configure other options from command line arguments
|
||||||
|
options.raiseonerror = True
|
||||||
options.debugsmtp = opts.debugsmtp
|
options.debugsmtp = opts.debugsmtp
|
||||||
options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy
|
options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy
|
||||||
options.verifyaddress = opts.verifyaddress
|
options.verifyaddress = opts.verifyaddress
|
||||||
options.usesmtpvrfy = opts.usesmtpvrfy
|
options.usesmtpvrfy = opts.usesmtpvrfy
|
||||||
options.refusemailifnotpermit = opts.refusemailifnotpermit
|
options.acceptoncnxrefused = opts.acceptoncnxrefused
|
||||||
options.refuseontemporaryerror = opts.refuseontemporaryerror
|
options.acceptontemporaryerror = opts.acceptontemporaryerror
|
||||||
|
|
||||||
if opts.progress:
|
if opts.progress:
|
||||||
from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA
|
from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA
|
||||||
|
@ -322,14 +446,17 @@ if __name__ == '__main__':
|
||||||
logging.info('Start emails addresses validation')
|
logging.info('Start emails addresses validation')
|
||||||
|
|
||||||
validated = []
|
validated = []
|
||||||
not_validated = []
|
not_validated = {}
|
||||||
for email in emails:
|
for email in emails:
|
||||||
|
try:
|
||||||
if mass_validate_email(email):
|
if mass_validate_email(email):
|
||||||
logging.info('Address %s is valid', email)
|
logging.info('Address %s is valid', email)
|
||||||
validated.append(email)
|
validated.append(email)
|
||||||
else:
|
else:
|
||||||
logging.info('Address %s is NOT valid', email)
|
logging.info('Address %s is NOT valid, but no exception raised : it is not supose to happen !', email)
|
||||||
not_validated.append(email)
|
not_validated[email] = EmailInvalid(email)
|
||||||
|
except EmailInvalid as err:
|
||||||
|
not_validated[email] = err
|
||||||
if opts.progress:
|
if opts.progress:
|
||||||
pbar_count += 1
|
pbar_count += 1
|
||||||
pbar.update(pbar_count)
|
pbar.update(pbar_count)
|
||||||
|
@ -338,6 +465,9 @@ if __name__ == '__main__':
|
||||||
pbar.finish()
|
pbar.finish()
|
||||||
|
|
||||||
if not_validated:
|
if not_validated:
|
||||||
logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join(not_validated))
|
logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join([str(not_validated[email]) for email in not_validated]))
|
||||||
else:
|
else:
|
||||||
logging.info('All %s emails addresses provided are valid.', len(emails))
|
logging.info('All %s emails addresses provided are valid.', len(emails))
|
||||||
|
|
||||||
|
# Adapt exit code on validation result
|
||||||
|
sys.exit(1 if not_validated else 0)
|
||||||
|
|
Loading…
Reference in a new issue