#!/usr/bin/python # # Python module to mass validating email address # # This module was inspired by (and use) validate_email library # write by Syrus Akbary : # # https://github.com/SyrusAkbary/validate_email # # This main goal is to optimize mass validating using cache of # bad (or good) domain or MX server. # # Author: Benjamin Renard # Website: https://gogs.zionetrix.net/bn8/mass_validate_email # Licence: LGPL """ Mass email addresses validation tools """ import smtplib import socket import sys import logging import DNS from validate_email import validate_email try: DNS.DiscoverNameServers() except DNS.ServerError, err: logging.fatal("Error discovering DNS servers : %s", err) sys.exit(1) # Exception class EmailInvalid(Exception): """ Generic invalid email exception """ def __init__(self, email, error_msg=None): self.email = email self.error_msg = error_msg or "Invalid email address" super(EmailInvalid, self).__init__("%s : %s" % (email, self.error_msg)) class EmailInvalidSyntax(EmailInvalid): """ Exception raised when an email address is invalid by syntax """ def __init__(self, email): super(EmailInvalidSyntax, self).__init__(email, "Invalid email address syntax") class EmailInvalidDomain(EmailInvalid): """ Exceptiond raise when an email address is from an invalid mail domain """ def __init__(self, email, domain, cause): self.domain = domain self.cause = cause super(EmailInvalidDomain, self).__init__(email, "Invalid email domain : %s" % domain) class NoMXhostAvailable(EmailInvalid): """ Exception raised when an email address is from a mail domain without available MX host """ def __init__(self, email, mx_hosts=None, mx_hosts_error=None): self.mx_hosts = mx_hosts self.mx_hosts_error = mx_hosts_error or {} if mx_hosts_error: super(NoMXhostAvailable, self).__init__(email, "No MX hosts available : %s" % ', '.join([mx_hosts_error[host].error_msg for host in mx_hosts_error])) else: super(NoMXhostAvailable, self).__init__(email, "No MX hosts available") class EmailRefused(EmailInvalid): """ Exception raised when an email address is refused by the MX host """ def __init__(self, email, mx_host=None): self.mx_hosts = mx_host if mx_host: super(EmailRefused, self).__init__(email, "MX host %s refused this email" % mx_host) else: super(EmailRefused, self).__init__(email, "MX hosts refused this email") class MXUnavailable(EmailInvalid): """ Exception raised when an MX host is not available to validate an email address """ def __init__(self, email, mx_host, error_msg=None): self.mx_host = mx_host super(MXUnavailable, self).__init__(email, error_msg or "%s : MX host %s unavailable" % (email, mx_host)) class TemporaryErrorOnMX(MXUnavailable): """ Exception raised when an MX host raise a temporary error validating an email address """ def __init__(self, email, mx_host, msg=None): self.msg = msg if msg: error_msg = "%s : temporary error occured on MX host %s : %s" % (email, mx_host, msg) else: error_msg = "%s : temporary error occured on MX host %s" % (email, mx_host) super(TemporaryErrorOnMX, self).__init__(email, mx_host, error_msg) class MXRefuseConnection(MXUnavailable): """ Exception raised when an MX host refuse connection validating an email address """ def __init__(self, email, mx_host, msg=None): self.msg = msg if msg: error_msg = "%s : MX host %s refuse connection : %s" % (email, mx_host, msg) else: error_msg = "%s : MX host %s refuse connection" % (email, mx_host) super(MXRefuseConnection, self).__init__(email, mx_host, error_msg) # Options class OptionsClass(object): """ Class used to defined validation options """ debug = False debugsmtp = False checkmx = False verifyaddress = False usesmtpvrfy = False acceptoncnxrefused = False acceptontemporaryerror = False raiseonerror = False options = OptionsClass() if options.verifyaddress: options.checkmx = True def clean_mail(mail): mail = str(mail).lower().strip() return mail # Cache domain info # Domains's MX hosts domains_mx_hosts = {} # List of valid domains valid_domains = [] # List of invalid domains (with invalid cause) invalid_domains = {} # List of domain without available MX host (with unavailable cause) mx_unavailable_domain = {} def get_mail_domain_and_mx_hosts(mail): """ Retreive domain name and it's MX hosts from an email address """ domain = mail[mail.find('@')+1:] if domain in domains_mx_hosts: return (domain, domains_mx_hosts[domain]) if domain in invalid_domains: if options.raiseonerror: raise EmailInvalidDomain(mail, domain, invalid_domains[domain]) return (domain, False) try: # Retreive domain's MX hosts info mx_hosts_info = DNS.mxlookup(domain) if len(mx_hosts_info) > 0: domains_mx_hosts[domain] = [mx_host_info[1] for mx_host_info in mx_hosts_info] logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain])) valid_domains.append(domain) return (domain, domains_mx_hosts[domain]) # If domain have no MX hosts, try on domain name it self if connect_to_mx(domain): domains_mx_hosts[domain] = [domain] logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain])) valid_domains.append(domain) return (domain, domains_mx_hosts[domain]) # No valid MX host found for this domain logging.debug("No valid MX of domain %s found", domain) invalid_domains[domain] = "No valid MX hosts found" except DNS.ServerError, err: logging.debug('Error getting MX servers of domain %s : %s', domain, err) invalid_domains[domain] = 'DNS server error getting MX hosts : %s' % err if options.raiseonerror: raise EmailInvalidDomain(mail, domain, invalid_domains[domain]) return (domain, False) def check_mx(mail): """ MX check of an email address """ domain, mx_hosts = get_mail_domain_and_mx_hosts(mail) if not mx_hosts: return False if not options.verifyaddress: # We don't have to connect on MX host : just check if domain have at least on MX host return bool(mx_hosts) if domain in mx_unavailable_domain: if options.raiseonerror: raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain]) return False # Check mail on MX hosts no_mx_available = True mx_unavailable_errors = [] for mx_host in mx_hosts: con = connect_to_mx(mx_host) if not con: mx_unavailable_errors[mx_host] = "%s : Fail to connect on MX host" % mx_host continue no_mx_available = False try: if verify_mail_on_mx_host(domain, con, mail, accept_on_cnx_refused=options.acceptoncnxrefused): return True except EmailRefused: if options.raiseonerror: raise return False except MXUnavailable as err: mx_unavailable_errors[mx_host] = err if no_mx_available: mx_unavailable_domain[domain] = mx_unavailable_errors if options.raiseonerror: raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain]) elif options.raiseonerror: raise EmailRefused(mail) return False valid_mx = [] invalid_mx = [] def connect_to_mx(mx_host): """ Connect on a MX host and return the smtplib corresponding connection object """ if mx_host in invalid_mx: return False try: smtp = smtplib.SMTP(timeout=5) smtp.connect(mx_host) if options.debugsmtp: smtp.set_debuglevel(True) valid_mx.append(mx_host) return smtp except smtplib.SMTPConnectError: logging.debug("MX server %s does not respond from SMTP", mx_host) except smtplib.SMTPServerDisconnected: logging.debug("MX server %s unexpectedly closed connection", mx_host) except socket.gaierror: logging.debug("Can't resolv MX server %s", mx_host) except socket.timeout: logging.debug("Connection timeout to SMTP server %s", mx_host) except socket.error: logging.debug("Connection error on SMTP server %s", mx_host) except Exception: logging.error("Unknown error connecting to SMTP server %s", mx_host, exc_info=True) invalid_mx.append(mx_host) return None mx_refuse_check_mail = {} def verify_mail_on_mx_host(mx_host, smtp, mail, accept_on_cnx_refused=False): """ Verify an email address on a specific MX host """ if mx_host in mx_refuse_check_mail: if accept_on_cnx_refused: logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) return True raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host]) try: status, msg = smtp.helo() if status != 250: mx_refuse_check_mail[mx_host] = msg if accept_on_cnx_refused: logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) return True raise MXRefuseConnection(mail, mx_host, msg) if options.usesmtpvrfy: (status, msg) = smtp.verify(mail) logging.debug('%s : MX host %s return the code %s on VRFY command with the following message : %s', mail, mx_host, status, msg) if status >= 250 and status < 260: # Server normaly return an normalize email address for word in msg.split(' '): if validate_email(word): return True smtp.mail('') status, msg = smtp.rcpt(mail) if status >= 400 and status < 500: logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg) if options.acceptontemporaryerror: logging.debug('%s : MX host %s raise a temporary error but consider email as validated', mail, mx_host) return True raise TemporaryErrorOnMX(mail, mx_host, msg) elif status != 250: if options.raiseonerror: raise EmailRefused(mail, mx_host) return False logging.debug('%s : MX host %s accept email for this address with the following message : %s', mail, mx_host, msg) return True except smtplib.SMTPServerDisconnected: # Server not permits verify user mx_refuse_check_mail[mx_host] = "server disconnected during the exchange" if accept_on_cnx_refused: logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host) return True raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host]) except smtplib.SMTPConnectError: raise MXUnavailable(mail, mx_host) def mass_validate_email(mail, simple=False): """ Validate an email address with mecanisms optimized for mass email addresses validation """ mail = clean_mail(mail) if not validate_email(mail): if options.raiseonerror: raise EmailInvalidSyntax(mail) return elif simple: return True elif options.checkmx: return check_mx(mail) else: return True if __name__ == '__main__': from optparse import OptionParser, OptionGroup # Default options default_output_delimiter = ";" default_output_quotechar = '"' parser = OptionParser() # options parser.add_option( '-v', '--verbose', action="store_true", dest="verbose", help='Enable verbose mode' ) parser.add_option( '-d', '--debug', action="store_true", dest="debug", help='Enable debug mode' ) parser.add_option( '-p', '--progress', action='store_true', dest='progress', help='Enable progress bar', default=False ) parser.add_option( '-D', '--debug-smtp', action="store_true", dest="debugsmtp", help='Enabled SMTP exchange debuging' ) parser.add_option( '-m', '--mx', action="store_true", dest="checkmx", help='Enable MX check' ) parser.add_option( '-V', '--verify', action="store_true", dest="verifyaddress", help="Enable email address verification on MX server. If this option is enabled, MX check is also automatically enabled." ) parser.add_option( '--use-smtp-vrfy', action="store_true", dest="usesmtpvrfy", help="When MX check is enabled, enable the SMPT VRFY command usage" ) parser.add_option( '--accept-email-on-cnx-refused', action="store_true", dest="acceptoncnxrefused", help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)" ) parser.add_option( '--accept-on-temporary-error', action="store_true", dest="acceptontemporaryerror", help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)" ) parser.add_option( '-f', '--from-file', action="store", type='string', dest="fromfile", help="Read emails addresses to validate from from" ) output_opts = OptionGroup(parser, u"Output options") output_opts.add_option( '-o', '--output-file', action="store", type='string', dest="output_file", help="Write emails addresses validation result as a CSV file" ) output_opts.add_option( '--delimiter', action='store', type='string', dest='output_delimiter', help="CSV ouput file delimiter (Default: %s)" % default_output_delimiter, default=default_output_delimiter ) output_opts.add_option( '--quotechar', action='store', type='string', dest='output_quotechar', help="CSV ouput file quote character (Default: %s)" % default_output_quotechar, default=default_output_quotechar ) parser.add_option_group(output_opts) (opts, emails) = parser.parse_args() # Enable and configure logging if opts.debug: logging_level = logging.DEBUG options.debug = True elif opts.verbose: logging_level = logging.INFO else: logging_level = logging.WARNING logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s') # If fromfile options if setted, load emails if opts.fromfile: logging.info('Load emails addresses from %s', opts.fromfile) with open(opts.fromfile, 'r') as fd: for line in fd.readlines(): email = line.strip() if email not in emails: emails.append(email) # Check at leat one email is provided if not emails: parser.error('You must specify emails address as arguments') # If output is enabled, import csv library if opts.output_file: import csv # Configure other options from command line arguments options.raiseonerror = True options.debugsmtp = opts.debugsmtp options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy options.verifyaddress = opts.verifyaddress options.usesmtpvrfy = opts.usesmtpvrfy options.acceptoncnxrefused = opts.acceptoncnxrefused options.acceptontemporaryerror = opts.acceptontemporaryerror if opts.progress: from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA pbar = ProgressBar( widgets=[ 'Validating emails addresses : ', Percentage(), ' ', Bar(marker=RotatingMarker()), ' ', SimpleProgress(), ETA() ], maxval=len(emails) ).start() pbar_count = 0 else: logging.info('Start emails addresses validation') validated = [] not_validated = {} for email in emails: try: if mass_validate_email(email): logging.info('Address %s is valid', email) validated.append(email) else: logging.info('Address %s is NOT valid, but no exception raised : it is not supose to happen !', email) not_validated[email] = EmailInvalid(email) except EmailInvalid as err: not_validated[email] = err if opts.progress: pbar_count += 1 pbar.update(pbar_count) if opts.progress: pbar.finish() if not_validated: logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join([str(not_validated[email]) for email in not_validated])) else: logging.info('All %s emails addresses provided are valid.', len(emails)) if opts.output_file: logging.info('Write emails validation result to file %s', opts.output_file) with open(opts.output_file, 'w') as fd: csv_output = csv.writer(fd, delimiter=opts.output_delimiter, quotechar=opts.output_quotechar) for email in not_validated: csv_output.writerow([email, not_validated[email].error_msg]) # Adapt exit code on validation result sys.exit(1 if not_validated else 0)