diff --git a/mass_validate_email.py b/mass_validate_email.py index 3340381..46d19bf 100755 --- a/mass_validate_email.py +++ b/mass_validate_email.py @@ -14,6 +14,8 @@ # Website: https://gogs.zionetrix.net/bn8/mass_validate_email # Licence: LGPL +""" Mass email addresses validation tools """ + import smtplib import socket import sys @@ -28,17 +30,99 @@ except DNS.ServerError, err: logging.fatal("Error discovering DNS servers : %s", err) sys.exit(1) -# options -class OptionsClass(object): +# Exception +class EmailInvalid(Exception): + """ Generic invalid email exception """ - def __init__(self): - self.debug = False - self.debugsmtp = False - self.checkmx = False - self.verifyaddress = False - self.usesmtpvrfy = False - self.refusemailifnotpermit = True - self.refuseontemporaryerror = True + def __init__(self, email, error_msg=None): + self.email = email + self.error_msg = error_msg or "Invalid email address" + super(EmailInvalid, self).__init__("%s : %s" % (email, self.error_msg)) + + +class EmailInvalidSyntax(EmailInvalid): + """ Exception raised when an email address is invalid by syntax """ + + def __init__(self, email): + super(EmailInvalidSyntax, self).__init__(email, "Invalid email address syntax") + + +class EmailInvalidDomain(EmailInvalid): + """ Exceptiond raise when an email address is from an invalid mail domain """ + + def __init__(self, email, domain, cause): + self.domain = domain + self.cause = cause + super(EmailInvalidDomain, self).__init__(email, "Invalid email domain : %s" % domain) + + +class NoMXhostAvailable(EmailInvalid): + """ Exception raised when an email address is from a mail domain without available MX host """ + + def __init__(self, email, mx_hosts=None, mx_hosts_error=None): + self.mx_hosts = mx_hosts + self.mx_hosts_error = mx_hosts_error or {} + if mx_hosts_error: + super(NoMXhostAvailable, self).__init__(email, "No MX hosts available : %s" % ', '.join([mx_hosts_error[host].error_msg for host in mx_hosts_error])) + else: + super(NoMXhostAvailable, self).__init__(email, "No MX hosts available") + + +class EmailRefused(EmailInvalid): + """ Exception raised when an email address is refused by the MX host """ + + def __init__(self, email, mx_host=None): + self.mx_hosts = mx_host + if mx_host: + super(EmailRefused, self).__init__(email, "MX host %s refused this email" % mx_host) + else: + super(EmailRefused, self).__init__(email, "MX hosts refused this email") + + +class MXUnavailable(EmailInvalid): + """ Exception raised when an MX host is not available to validate an email address """ + + def __init__(self, email, mx_host, error_msg=None): + self.mx_host = mx_host + super(MXUnavailable, self).__init__(email, error_msg or "%s : MX host %s unavailable" % (email, mx_host)) + + +class TemporaryErrorOnMX(MXUnavailable): + """ Exception raised when an MX host raise a temporary error validating an email address """ + + def __init__(self, email, mx_host, msg=None): + self.msg = msg + if msg: + error_msg = "%s : temporary error occured on MX host %s : %s" % (email, mx_host, msg) + else: + error_msg = "%s : temporary error occured on MX host %s" % (email, mx_host) + super(TemporaryErrorOnMX, self).__init__(email, mx_host, error_msg) + + +class MXRefuseConnection(MXUnavailable): + """ Exception raised when an MX host refuse connection validating an email address """ + + def __init__(self, email, mx_host, msg=None): + self.msg = msg + if msg: + error_msg = "%s : MX host %s refuse connection : %s" % (email, mx_host, msg) + else: + error_msg = "%s : MX host %s refuse connection" % (email, mx_host) + super(MXRefuseConnection, self).__init__(email, mx_host, error_msg) + + +# Options +class OptionsClass(object): + """ Class used to defined validation options """ + + debug = False + debugsmtp = False + checkmx = False + verifyaddress = False + usesmtpvrfy = False + acceptoncnxrefused = False + acceptontemporaryerror = False + raiseonerror = False options = OptionsClass() @@ -49,82 +133,101 @@ def clean_mail(mail): mail = str(mail).lower().strip() return mail +# Cache domain info + +# Domains's MX hosts +domains_mx_hosts = {} + +# List of valid domains +valid_domains = [] + +# List of invalid domains (with invalid cause) +invalid_domains = {} + +# List of domain without available MX host (with unavailable cause) +mx_unavailable_domain = {} + +def get_mail_domain_and_mx_hosts(mail): + """ Retreive domain name and it's MX hosts from an email address """ + domain = mail[mail.find('@')+1:] + if domain in domains_mx_hosts: + return (domain, domains_mx_hosts[domain]) + if domain in invalid_domains: + if options.raiseonerror: + raise EmailInvalidDomain(mail, domain, invalid_domains[domain]) + return (domain, False) + try: + # Retreive domain's MX hosts info + mx_hosts_info = DNS.mxlookup(domain) + if len(mx_hosts_info) > 0: + domains_mx_hosts[domain] = [mx_host_info[1] for mx_host_info in mx_hosts_info] + logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain])) + valid_domains.append(domain) + return (domain, domains_mx_hosts[domain]) + + # If domain have no MX hosts, try on domain name it self + if connect_to_mx(domain): + domains_mx_hosts[domain] = [domain] + logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain])) + valid_domains.append(domain) + return (domain, domains_mx_hosts[domain]) + + # No valid MX host found for this domain + logging.debug("No valid MX of domain %s found", domain) + invalid_domains[domain] = "No valid MX hosts found" + except DNS.ServerError, err: + logging.debug('Error getting MX servers of domain %s : %s', domain, err) + invalid_domains[domain] = 'DNS server error getting MX hosts : %s' % err + + if options.raiseonerror: + raise EmailInvalidDomain(mail, domain, invalid_domains[domain]) + return (domain, False) -domain_mx = {} -valid_domain = [] -invalid_domain = [] def check_mx(mail): - dom = mail[mail.find('@')+1:] + """ MX check of an email address """ + domain, mx_hosts = get_mail_domain_and_mx_hosts(mail) + if not mx_hosts: + return False + if not options.verifyaddress: - if dom in valid_domain: - return True - elif dom in invalid_domain: + # We don't have to connect on MX host : just check if domain have at least on MX host + return bool(mx_hosts) + + if domain in mx_unavailable_domain: + if options.raiseonerror: + raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain]) + return False + + # Check mail on MX hosts + no_mx_available = True + mx_unavailable_errors = [] + for mx_host in mx_hosts: + con = connect_to_mx(mx_host) + if not con: + mx_unavailable_errors[mx_host] = "%s : Fail to connect on MX host" % mx_host + continue + no_mx_available = False + try: + if verify_mail_on_mx_host(domain, con, mail, accept_on_cnx_refused=options.acceptoncnxrefused): + return True + except EmailRefused: + if options.raiseonerror: + raise return False - else: - try: - mx_hosts = DNS.mxlookup(dom) - if len(mx_hosts) > 0: - domain_mx[dom] = [] - for mx_host in mx_hosts: - domain_mx[dom].append(mx_host[1]) - logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom])) - valid_domain.append(dom) - return True - elif connect_to_mx(dom): - domain_mx[dom] = [dom] - logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom])) - valid_domain.append(dom) - return True - else: - logging.debug("No valid MX of domain %s found", dom) - invalid_domain.append(dom) - return False - except DNS.ServerError, err: - logging.debug('Error getting MX servers of domain %s : %s', dom, err) - invalid_domain.append(dom) - return False - else: - if dom in invalid_domain: - return False - if dom in domain_mx: - for mx_host in domain_mx[dom]: - con = connect_to_mx(mx_host) - if not con: - continue - if check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)): - return True - return False - else: - try: - mx_hosts = DNS.mxlookup(dom) - if len(mx_hosts) > 0: - domain_mx[dom] = [] - for mx_host in mx_hosts: - domain_mx[dom].append(mx_host[1]) - valid_domain.append(dom) - return check_mx(mail) - else: - # Directly check MX and mail - con = connect_to_mx(dom) - if not con: - invalid_domain.append(dom) - return False - domain_mx[dom] = [dom] - return check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)) - except DNS.ServerError, err: - logging.debug('Error getting MX servers of domain %s : %s', dom, err) - invalid_domain.append(dom) - return False + except MXUnavailable as err: + mx_unavailable_errors[mx_host] = err + if no_mx_available: + mx_unavailable_domain[domain] = mx_unavailable_errors + if options.raiseonerror: + raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain]) + elif options.raiseonerror: + raise EmailRefused(mail) + return False valid_mx = [] invalid_mx = [] -def verify_mx(mx_host, mail, check_mail=False): - if not check_mail and mx_host in valid_mx: - return True - elif not check_mail and mx_host in invalid_mx: - return False - def connect_to_mx(mx_host): + """ Connect on a MX host and return the smtplib corresponding connection object """ if mx_host in invalid_mx: return False try: @@ -149,18 +252,26 @@ def connect_to_mx(mx_host): invalid_mx.append(mx_host) return None -mx_refuse_check_mail = [] -def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False): +mx_refuse_check_mail = {} +def verify_mail_on_mx_host(mx_host, smtp, mail, accept_on_cnx_refused=False): + """ Verify an email address on a specific MX host """ if mx_host in mx_refuse_check_mail: - return if_not_permit + if accept_on_cnx_refused: + logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) + return True + raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host]) try: - status, _ = smtp.helo() + status, msg = smtp.helo() if status != 250: - mx_refuse_check_mail.append(mx_host) - return if_not_permit + mx_refuse_check_mail[mx_host] = msg + if accept_on_cnx_refused: + logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) + return True + raise MXRefuseConnection(mail, mx_host, msg) if options.usesmtpvrfy: (status, msg) = smtp.verify(mail) + logging.debug('%s : MX host %s return the code %s on VRFY command with the following message : %s', mail, mx_host, status, msg) if status >= 250 and status < 260: # Server normaly return an normalize email address for word in msg.split(' '): @@ -170,20 +281,32 @@ def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False): status, msg = smtp.rcpt(mail) if status >= 400 and status < 500: logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg) - return not options.refuseontemporaryerror + if options.acceptontemporaryerror: + logging.debug('%s : MX host %s raise a temporary error but consider email as validated', mail, mx_host) + return True + raise TemporaryErrorOnMX(mail, mx_host, msg) elif status != 250: + if options.raiseonerror: + raise EmailRefused(mail, mx_host) return False + logging.debug('%s : MX host %s accept email for this address with the following message : %s', mail, mx_host, msg) return True except smtplib.SMTPServerDisconnected: # Server not permits verify user - mx_refuse_check_mail.append(mx_host) - return if_not_permit + mx_refuse_check_mail[mx_host] = "server disconnected during the exchange" + if accept_on_cnx_refused: + logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) + return True + raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host]) except smtplib.SMTPConnectError: - return False + raise MXUnavailable(mail, mx_host) def mass_validate_email(mail, simple=False): + """ Validate an email address with mecanisms optimized for mass email addresses validation """ mail = clean_mail(mail) if not validate_email(mail): + if options.raiseonerror: + raise EmailInvalidSyntax(mail) return elif simple: return True @@ -248,15 +371,15 @@ if __name__ == '__main__': help="When MX check is enabled, enable the SMPT VRFY command usage" ) parser.add_option( - '--accept-mail-if-not-permit', - action="store_false", - dest="refusemailifnotpermit", + '--accept-email-on-cnx-refused', + action="store_true", + dest="acceptoncnxrefused", help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)" ) parser.add_option( '--accept-on-temporary-error', - action="store_false", - dest="refuseontemporaryerror", + action="store_true", + dest="acceptontemporaryerror", help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)" ) parser.add_option( @@ -295,12 +418,13 @@ if __name__ == '__main__': parser.error('You must specify emails address as arguments') # Configure other options from command line arguments + options.raiseonerror = True options.debugsmtp = opts.debugsmtp options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy options.verifyaddress = opts.verifyaddress options.usesmtpvrfy = opts.usesmtpvrfy - options.refusemailifnotpermit = opts.refusemailifnotpermit - options.refuseontemporaryerror = opts.refuseontemporaryerror + options.acceptoncnxrefused = opts.acceptoncnxrefused + options.acceptontemporaryerror = opts.acceptontemporaryerror if opts.progress: from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA @@ -322,14 +446,17 @@ if __name__ == '__main__': logging.info('Start emails addresses validation') validated = [] - not_validated = [] + not_validated = {} for email in emails: - if mass_validate_email(email): - logging.info('Address %s is valid', email) - validated.append(email) - else: - logging.info('Address %s is NOT valid', email) - not_validated.append(email) + try: + if mass_validate_email(email): + logging.info('Address %s is valid', email) + validated.append(email) + else: + logging.info('Address %s is NOT valid, but no exception raised : it is not supose to happen !', email) + not_validated[email] = EmailInvalid(email) + except EmailInvalid as err: + not_validated[email] = err if opts.progress: pbar_count += 1 pbar.update(pbar_count) @@ -338,6 +465,9 @@ if __name__ == '__main__': pbar.finish() if not_validated: - logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join(not_validated)) + logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join([str(not_validated[email]) for email in not_validated])) else: logging.info('All %s emails addresses provided are valid.', len(emails)) + + # Adapt exit code on validation result + sys.exit(1 if not_validated else 0)