|
- #!/usr/bin/python
- #
- # Python module to mass validating email address
- #
- # This module was inspired by (and use) validate_email library
- # write by Syrus Akbary :
- #
- # https://github.com/SyrusAkbary/validate_email
- #
- # This main goal is to optimize mass validating using cache of
- # bad (or good) domain or MX server.
- #
- # Author: Benjamin Renard
- # Website: https://gogs.zionetrix.net/bn8/mass_validate_email
- # Licence: LGPL
- """ Mass email addresses validation tools """
- import smtplib
- import socket
- import sys
- import logging
- import DNS
- from validate_email import validate_email
- try:
- DNS.DiscoverNameServers()
- except DNS.ServerError, err:
- logging.fatal("Error discovering DNS servers : %s", err)
- sys.exit(1)
- # Exception
- class EmailInvalid(Exception):
- """ Generic invalid email exception """
- def __init__(self, email, error_msg=None):
- self.email = email
- self.error_msg = error_msg or "Invalid email address"
- super(EmailInvalid, self).__init__("%s : %s" % (email, self.error_msg))
- class EmailInvalidSyntax(EmailInvalid):
- """ Exception raised when an email address is invalid by syntax """
- def __init__(self, email):
- super(EmailInvalidSyntax, self).__init__(email, "Invalid email address syntax")
- class EmailInvalidDomain(EmailInvalid):
- """ Exceptiond raise when an email address is from an invalid mail domain """
- def __init__(self, email, domain, cause):
- self.domain = domain
- self.cause = cause
- super(EmailInvalidDomain, self).__init__(email, "Invalid email domain : %s" % domain)
- class NoMXhostAvailable(EmailInvalid):
- """ Exception raised when an email address is from a mail domain without available MX host """
- def __init__(self, email, mx_hosts=None, mx_hosts_error=None):
- self.mx_hosts = mx_hosts
- self.mx_hosts_error = mx_hosts_error or {}
- if mx_hosts_error:
- super(NoMXhostAvailable, self).__init__(email, "No MX hosts available : %s" % ', '.join([mx_hosts_error[host].error_msg for host in mx_hosts_error]))
- else:
- super(NoMXhostAvailable, self).__init__(email, "No MX hosts available")
- class EmailRefused(EmailInvalid):
- """ Exception raised when an email address is refused by the MX host """
- def __init__(self, email, mx_host=None):
- self.mx_hosts = mx_host
- if mx_host:
- super(EmailRefused, self).__init__(email, "MX host %s refused this email" % mx_host)
- else:
- super(EmailRefused, self).__init__(email, "MX hosts refused this email")
- class MXUnavailable(EmailInvalid):
- """ Exception raised when an MX host is not available to validate an email address """
- def __init__(self, email, mx_host, error_msg=None):
- self.mx_host = mx_host
- super(MXUnavailable, self).__init__(email, error_msg or "%s : MX host %s unavailable" % (email, mx_host))
- class TemporaryErrorOnMX(MXUnavailable):
- """ Exception raised when an MX host raise a temporary error validating an email address """
- def __init__(self, email, mx_host, msg=None):
- self.msg = msg
- if msg:
- error_msg = "%s : temporary error occured on MX host %s : %s" % (email, mx_host, msg)
- else:
- error_msg = "%s : temporary error occured on MX host %s" % (email, mx_host)
- super(TemporaryErrorOnMX, self).__init__(email, mx_host, error_msg)
- class MXRefuseConnection(MXUnavailable):
- """ Exception raised when an MX host refuse connection validating an email address """
- def __init__(self, email, mx_host, msg=None):
- self.msg = msg
- if msg:
- error_msg = "%s : MX host %s refuse connection : %s" % (email, mx_host, msg)
- else:
- error_msg = "%s : MX host %s refuse connection" % (email, mx_host)
- super(MXRefuseConnection, self).__init__(email, mx_host, error_msg)
- # Options
- class OptionsClass(object):
- """ Class used to defined validation options """
- debug = False
- debugsmtp = False
- checkmx = False
- verifyaddress = False
- usesmtpvrfy = False
- acceptoncnxrefused = False
- acceptontemporaryerror = False
- raiseonerror = False
- options = OptionsClass()
- if options.verifyaddress:
- options.checkmx = True
- def clean_mail(mail):
- mail = str(mail).lower().strip()
- return mail
- # Cache domain info
- # Domains's MX hosts
- domains_mx_hosts = {}
- # List of valid domains
- valid_domains = []
- # List of invalid domains (with invalid cause)
- invalid_domains = {}
- # List of domain without available MX host (with unavailable cause)
- mx_unavailable_domain = {}
- def get_mail_domain_and_mx_hosts(mail):
- """ Retreive domain name and it's MX hosts from an email address """
- domain = mail[mail.find('@')+1:]
- if domain in domains_mx_hosts:
- return (domain, domains_mx_hosts[domain])
- if domain in invalid_domains:
- if options.raiseonerror:
- raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
- return (domain, False)
- try:
- # Retreive domain's MX hosts info
- mx_hosts_info = DNS.mxlookup(domain)
- if len(mx_hosts_info) > 0:
- domains_mx_hosts[domain] = [mx_host_info[1] for mx_host_info in mx_hosts_info]
- logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
- valid_domains.append(domain)
- return (domain, domains_mx_hosts[domain])
- # If domain have no MX hosts, try on domain name it self
- if connect_to_mx(domain):
- domains_mx_hosts[domain] = [domain]
- logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
- valid_domains.append(domain)
- return (domain, domains_mx_hosts[domain])
- # No valid MX host found for this domain
- logging.debug("No valid MX of domain %s found", domain)
- invalid_domains[domain] = "No valid MX hosts found"
- except DNS.ServerError, err:
- logging.debug('Error getting MX servers of domain %s : %s', domain, err)
- invalid_domains[domain] = 'DNS server error getting MX hosts : %s' % err
- if options.raiseonerror:
- raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
- return (domain, False)
- def check_mx(mail):
- """ MX check of an email address """
- domain, mx_hosts = get_mail_domain_and_mx_hosts(mail)
- if not mx_hosts:
- return False
- if not options.verifyaddress:
- # We don't have to connect on MX host : just check if domain have at least on MX host
- return bool(mx_hosts)
- if domain in mx_unavailable_domain:
- if options.raiseonerror:
- raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
- return False
- # Check mail on MX hosts
- no_mx_available = True
- mx_unavailable_errors = []
- for mx_host in mx_hosts:
- con = connect_to_mx(mx_host)
- if not con:
- mx_unavailable_errors[mx_host] = "%s : Fail to connect on MX host" % mx_host
- continue
- no_mx_available = False
- try:
- if verify_mail_on_mx_host(domain, con, mail, accept_on_cnx_refused=options.acceptoncnxrefused):
- return True
- except EmailRefused:
- if options.raiseonerror:
- raise
- return False
- except MXUnavailable as err:
- mx_unavailable_errors[mx_host] = err
- if no_mx_available:
- mx_unavailable_domain[domain] = mx_unavailable_errors
- if options.raiseonerror:
- raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
- elif options.raiseonerror:
- raise EmailRefused(mail)
- return False
- valid_mx = []
- invalid_mx = []
- def connect_to_mx(mx_host):
- """ Connect on a MX host and return the smtplib corresponding connection object """
- if mx_host in invalid_mx:
- return False
- try:
- smtp = smtplib.SMTP(timeout=5)
- smtp.connect(mx_host)
- if options.debugsmtp:
- smtp.set_debuglevel(True)
- valid_mx.append(mx_host)
- return smtp
- except smtplib.SMTPConnectError:
- logging.debug("MX server %s does not respond from SMTP", mx_host)
- except smtplib.SMTPServerDisconnected:
- logging.debug("MX server %s unexpectedly closed connection", mx_host)
- except socket.gaierror:
- logging.debug("Can't resolv MX server %s", mx_host)
- except socket.timeout:
- logging.debug("Connection timeout to SMTP server %s", mx_host)
- except socket.error:
- logging.debug("Connection error on SMTP server %s", mx_host)
- except Exception:
- logging.error("Unknown error connecting to SMTP server %s", mx_host, exc_info=True)
- invalid_mx.append(mx_host)
- return None
- mx_refuse_check_mail = {}
- def verify_mail_on_mx_host(mx_host, smtp, mail, accept_on_cnx_refused=False):
- """ Verify an email address on a specific MX host """
- if mx_host in mx_refuse_check_mail:
- if accept_on_cnx_refused:
- logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
- return True
- raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
- try:
- status, msg = smtp.helo()
- if status != 250:
- mx_refuse_check_mail[mx_host] = msg
- if accept_on_cnx_refused:
- logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
- return True
- raise MXRefuseConnection(mail, mx_host, msg)
- if options.usesmtpvrfy:
- (status, msg) = smtp.verify(mail)
- logging.debug('%s : MX host %s return the code %s on VRFY command with the following message : %s', mail, mx_host, status, msg)
- if status >= 250 and status < 260:
- # Server normaly return an normalize email address
- for word in msg.split(' '):
- if validate_email(word):
- return True
- smtp.mail('')
- status, msg = smtp.rcpt(mail)
- if status >= 400 and status < 500:
- logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg)
- if options.acceptontemporaryerror:
- logging.debug('%s : MX host %s raise a temporary error but consider email as validated', mail, mx_host)
- return True
- raise TemporaryErrorOnMX(mail, mx_host, msg)
- elif status != 250:
- if options.raiseonerror:
- raise EmailRefused(mail, mx_host)
- return False
- logging.debug('%s : MX host %s accept email for this address with the following message : %s', mail, mx_host, msg)
- return True
- except smtplib.SMTPServerDisconnected:
- # Server not permits verify user
- mx_refuse_check_mail[mx_host] = "server disconnected during the exchange"
- if accept_on_cnx_refused:
- logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
- return True
- raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
- except smtplib.SMTPConnectError:
- raise MXUnavailable(mail, mx_host)
- def mass_validate_email(mail, simple=False):
- """ Validate an email address with mecanisms optimized for mass email addresses validation """
- mail = clean_mail(mail)
- if not validate_email(mail):
- if options.raiseonerror:
- raise EmailInvalidSyntax(mail)
- return
- elif simple:
- return True
- elif options.checkmx:
- return check_mx(mail)
- else:
- return True
- if __name__ == '__main__':
- from optparse import OptionParser, OptionGroup
- # Default options
- default_output_delimiter = ";"
- default_output_quotechar = '"'
- parser = OptionParser()
- # options
- parser.add_option(
- '-v',
- '--verbose',
- action="store_true",
- dest="verbose",
- help='Enable verbose mode'
- )
- parser.add_option(
- '-d',
- '--debug',
- action="store_true",
- dest="debug",
- help='Enable debug mode'
- )
- parser.add_option(
- '-p',
- '--progress',
- action='store_true',
- dest='progress',
- help='Enable progress bar',
- default=False
- )
- parser.add_option(
- '-D',
- '--debug-smtp',
- action="store_true",
- dest="debugsmtp",
- help='Enabled SMTP exchange debuging'
- )
- parser.add_option(
- '-m',
- '--mx',
- action="store_true",
- dest="checkmx",
- help='Enable MX check'
- )
- parser.add_option(
- '-V',
- '--verify',
- action="store_true",
- dest="verifyaddress",
- help="Enable email address verification on MX server. If this option is enabled, MX check is also automatically enabled."
- )
- parser.add_option(
- '--use-smtp-vrfy',
- action="store_true",
- dest="usesmtpvrfy",
- help="When MX check is enabled, enable the SMPT VRFY command usage"
- )
- parser.add_option(
- '--accept-email-on-cnx-refused',
- action="store_true",
- dest="acceptoncnxrefused",
- help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)"
- )
- parser.add_option(
- '--accept-on-temporary-error',
- action="store_true",
- dest="acceptontemporaryerror",
- help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)"
- )
- parser.add_option(
- '-f',
- '--from-file',
- action="store",
- type='string',
- dest="fromfile",
- help="Read emails addresses to validate from from"
- )
- output_opts = OptionGroup(parser, u"Output options")
- output_opts.add_option(
- '-o',
- '--output-file',
- action="store",
- type='string',
- dest="output_file",
- help="Write emails addresses validation result as a CSV file"
- )
- output_opts.add_option(
- '--delimiter',
- action='store',
- type='string',
- dest='output_delimiter',
- help="CSV ouput file delimiter (Default: %s)" % default_output_delimiter,
- default=default_output_delimiter
- )
- output_opts.add_option(
- '--quotechar',
- action='store',
- type='string',
- dest='output_quotechar',
- help="CSV ouput file quote character (Default: %s)" % default_output_quotechar,
- default=default_output_quotechar
- )
- parser.add_option_group(output_opts)
- (opts, emails) = parser.parse_args()
- # Enable and configure logging
- if opts.debug:
- logging_level = logging.DEBUG
- options.debug = True
- elif opts.verbose:
- logging_level = logging.INFO
- else:
- logging_level = logging.WARNING
- logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s')
- # If fromfile options if setted, load emails
- if opts.fromfile:
- logging.info('Load emails addresses from %s', opts.fromfile)
- with open(opts.fromfile, 'r') as fd:
- for line in fd.readlines():
- email = line.strip()
- if email not in emails:
- emails.append(email)
- # Check at leat one email is provided
- if not emails:
- parser.error('You must specify emails address as arguments')
- # If output is enabled, import csv library
- if opts.output_file:
- import csv
- # Configure other options from command line arguments
- options.raiseonerror = True
- options.debugsmtp = opts.debugsmtp
- options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy
- options.verifyaddress = opts.verifyaddress
- options.usesmtpvrfy = opts.usesmtpvrfy
- options.acceptoncnxrefused = opts.acceptoncnxrefused
- options.acceptontemporaryerror = opts.acceptontemporaryerror
- if opts.progress:
- from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA
- pbar = ProgressBar(
- widgets=[
- 'Validating emails addresses : ',
- Percentage(),
- ' ',
- Bar(marker=RotatingMarker()),
- ' ',
- SimpleProgress(),
- ETA()
- ],
- maxval=len(emails)
- ).start()
- pbar_count = 0
- else:
- logging.info('Start emails addresses validation')
- validated = []
- not_validated = {}
- for email in emails:
- try:
- if mass_validate_email(email):
- logging.info('Address %s is valid', email)
- validated.append(email)
- else:
- logging.info('Address %s is NOT valid, but no exception raised : it is not supose to happen !', email)
- not_validated[email] = EmailInvalid(email)
- except EmailInvalid as err:
- not_validated[email] = err
- if opts.progress:
- pbar_count += 1
- pbar.update(pbar_count)
- if opts.progress:
- pbar.finish()
- if not_validated:
- logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join([str(not_validated[email]) for email in not_validated]))
- else:
- logging.info('All %s emails addresses provided are valid.', len(emails))
- if opts.output_file:
- logging.info('Write emails validation result to file %s', opts.output_file)
- with open(opts.output_file, 'w') as fd:
- csv_output = csv.writer(fd, delimiter=opts.output_delimiter, quotechar=opts.output_quotechar)
- for email in not_validated:
- csv_output.writerow([email, not_validated[email].error_msg])
- # Adapt exit code on validation result
- sys.exit(1 if not_validated else 0)
|