mass_validate_email/mass_validate_email.py

516 lines
18 KiB
Python
Executable File

#!/usr/bin/python
#
# Python module to mass validating email address
#
# This module was inspired by (and use) validate_email library
# write by Syrus Akbary :
#
# https://github.com/SyrusAkbary/validate_email
#
# This main goal is to optimize mass validating using cache of
# bad (or good) domain or MX server.
#
# Author: Benjamin Renard
# Website: https://gogs.zionetrix.net/bn8/mass_validate_email
# Licence: LGPL
""" Mass email addresses validation tools """
import smtplib
import socket
import sys
import logging
import DNS
from validate_email import validate_email
try:
DNS.DiscoverNameServers()
except DNS.ServerError, err:
logging.fatal("Error discovering DNS servers : %s", err)
sys.exit(1)
# Exception
class EmailInvalid(Exception):
""" Generic invalid email exception """
def __init__(self, email, error_msg=None):
self.email = email
self.error_msg = error_msg or "Invalid email address"
super(EmailInvalid, self).__init__("%s : %s" % (email, self.error_msg))
class EmailInvalidSyntax(EmailInvalid):
""" Exception raised when an email address is invalid by syntax """
def __init__(self, email):
super(EmailInvalidSyntax, self).__init__(email, "Invalid email address syntax")
class EmailInvalidDomain(EmailInvalid):
""" Exceptiond raise when an email address is from an invalid mail domain """
def __init__(self, email, domain, cause):
self.domain = domain
self.cause = cause
super(EmailInvalidDomain, self).__init__(email, "Invalid email domain : %s" % domain)
class NoMXhostAvailable(EmailInvalid):
""" Exception raised when an email address is from a mail domain without available MX host """
def __init__(self, email, mx_hosts=None, mx_hosts_error=None):
self.mx_hosts = mx_hosts
self.mx_hosts_error = mx_hosts_error or {}
if mx_hosts_error:
super(NoMXhostAvailable, self).__init__(email, "No MX hosts available : %s" % ', '.join([mx_hosts_error[host].error_msg for host in mx_hosts_error]))
else:
super(NoMXhostAvailable, self).__init__(email, "No MX hosts available")
class EmailRefused(EmailInvalid):
""" Exception raised when an email address is refused by the MX host """
def __init__(self, email, mx_host=None):
self.mx_hosts = mx_host
if mx_host:
super(EmailRefused, self).__init__(email, "MX host %s refused this email" % mx_host)
else:
super(EmailRefused, self).__init__(email, "MX hosts refused this email")
class MXUnavailable(EmailInvalid):
""" Exception raised when an MX host is not available to validate an email address """
def __init__(self, email, mx_host, error_msg=None):
self.mx_host = mx_host
super(MXUnavailable, self).__init__(email, error_msg or "%s : MX host %s unavailable" % (email, mx_host))
class TemporaryErrorOnMX(MXUnavailable):
""" Exception raised when an MX host raise a temporary error validating an email address """
def __init__(self, email, mx_host, msg=None):
self.msg = msg
if msg:
error_msg = "%s : temporary error occured on MX host %s : %s" % (email, mx_host, msg)
else:
error_msg = "%s : temporary error occured on MX host %s" % (email, mx_host)
super(TemporaryErrorOnMX, self).__init__(email, mx_host, error_msg)
class MXRefuseConnection(MXUnavailable):
""" Exception raised when an MX host refuse connection validating an email address """
def __init__(self, email, mx_host, msg=None):
self.msg = msg
if msg:
error_msg = "%s : MX host %s refuse connection : %s" % (email, mx_host, msg)
else:
error_msg = "%s : MX host %s refuse connection" % (email, mx_host)
super(MXRefuseConnection, self).__init__(email, mx_host, error_msg)
# Options
class OptionsClass(object):
""" Class used to defined validation options """
debug = False
debugsmtp = False
checkmx = False
verifyaddress = False
usesmtpvrfy = False
acceptoncnxrefused = False
acceptontemporaryerror = False
raiseonerror = False
options = OptionsClass()
if options.verifyaddress:
options.checkmx = True
def clean_mail(mail):
mail = str(mail).lower().strip()
return mail
# Cache domain info
# Domains's MX hosts
domains_mx_hosts = {}
# List of valid domains
valid_domains = []
# List of invalid domains (with invalid cause)
invalid_domains = {}
# List of domain without available MX host (with unavailable cause)
mx_unavailable_domain = {}
def get_mail_domain_and_mx_hosts(mail):
""" Retreive domain name and it's MX hosts from an email address """
domain = mail[mail.find('@')+1:]
if domain in domains_mx_hosts:
return (domain, domains_mx_hosts[domain])
if domain in invalid_domains:
if options.raiseonerror:
raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
return (domain, False)
try:
# Retreive domain's MX hosts info
mx_hosts_info = DNS.mxlookup(domain)
if len(mx_hosts_info) > 0:
domains_mx_hosts[domain] = [mx_host_info[1] for mx_host_info in mx_hosts_info]
logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
valid_domains.append(domain)
return (domain, domains_mx_hosts[domain])
# If domain have no MX hosts, try on domain name it self
if connect_to_mx(domain):
domains_mx_hosts[domain] = [domain]
logging.debug("MX of domain %s : %s", domain, ','.join(domains_mx_hosts[domain]))
valid_domains.append(domain)
return (domain, domains_mx_hosts[domain])
# No valid MX host found for this domain
logging.debug("No valid MX of domain %s found", domain)
invalid_domains[domain] = "No valid MX hosts found"
except DNS.ServerError, err:
logging.debug('Error getting MX servers of domain %s : %s', domain, err)
invalid_domains[domain] = 'DNS server error getting MX hosts : %s' % err
if options.raiseonerror:
raise EmailInvalidDomain(mail, domain, invalid_domains[domain])
return (domain, False)
def check_mx(mail):
""" MX check of an email address """
domain, mx_hosts = get_mail_domain_and_mx_hosts(mail)
if not mx_hosts:
return False
if not options.verifyaddress:
# We don't have to connect on MX host : just check if domain have at least on MX host
return bool(mx_hosts)
if domain in mx_unavailable_domain:
if options.raiseonerror:
raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
return False
# Check mail on MX hosts
no_mx_available = True
mx_unavailable_errors = []
for mx_host in mx_hosts:
con = connect_to_mx(mx_host)
if not con:
mx_unavailable_errors[mx_host] = "%s : Fail to connect on MX host" % mx_host
continue
no_mx_available = False
try:
if verify_mail_on_mx_host(domain, con, mail, accept_on_cnx_refused=options.acceptoncnxrefused):
return True
except EmailRefused:
if options.raiseonerror:
raise
return False
except MXUnavailable as err:
mx_unavailable_errors[mx_host] = err
if no_mx_available:
mx_unavailable_domain[domain] = mx_unavailable_errors
if options.raiseonerror:
raise NoMXhostAvailable(mail, mx_hosts, mx_unavailable_domain[domain])
elif options.raiseonerror:
raise EmailRefused(mail)
return False
valid_mx = []
invalid_mx = []
def connect_to_mx(mx_host):
""" Connect on a MX host and return the smtplib corresponding connection object """
if mx_host in invalid_mx:
return False
try:
smtp = smtplib.SMTP(timeout=5)
smtp.connect(mx_host)
if options.debugsmtp:
smtp.set_debuglevel(True)
valid_mx.append(mx_host)
return smtp
except smtplib.SMTPConnectError:
logging.debug("MX server %s does not respond from SMTP", mx_host)
except smtplib.SMTPServerDisconnected:
logging.debug("MX server %s unexpectedly closed connection", mx_host)
except socket.gaierror:
logging.debug("Can't resolv MX server %s", mx_host)
except socket.timeout:
logging.debug("Connection timeout to SMTP server %s", mx_host)
except socket.error:
logging.debug("Connection error on SMTP server %s", mx_host)
except Exception:
logging.error("Unknown error connecting to SMTP server %s", mx_host, exc_info=True)
invalid_mx.append(mx_host)
return None
mx_refuse_check_mail = {}
def verify_mail_on_mx_host(mx_host, smtp, mail, accept_on_cnx_refused=False):
""" Verify an email address on a specific MX host """
if mx_host in mx_refuse_check_mail:
if accept_on_cnx_refused:
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
return True
raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
try:
status, msg = smtp.helo()
if status != 250:
mx_refuse_check_mail[mx_host] = msg
if accept_on_cnx_refused:
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
return True
raise MXRefuseConnection(mail, mx_host, msg)
if options.usesmtpvrfy:
(status, msg) = smtp.verify(mail)
logging.debug('%s : MX host %s return the code %s on VRFY command with the following message : %s', mail, mx_host, status, msg)
if status >= 250 and status < 260:
# Server normaly return an normalize email address
for word in msg.split(' '):
if validate_email(word):
return True
smtp.mail('')
status, msg = smtp.rcpt(mail)
if status >= 400 and status < 500:
logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg)
if options.acceptontemporaryerror:
logging.debug('%s : MX host %s raise a temporary error but consider email as validated', mail, mx_host)
return True
raise TemporaryErrorOnMX(mail, mx_host, msg)
elif status != 250:
if options.raiseonerror:
raise EmailRefused(mail, mx_host)
return False
logging.debug('%s : MX host %s accept email for this address with the following message : %s', mail, mx_host, msg)
return True
except smtplib.SMTPServerDisconnected:
# Server not permits verify user
mx_refuse_check_mail[mx_host] = "server disconnected during the exchange"
if accept_on_cnx_refused:
logging.debug('%s : MX host %s refused connection but consider email as validated', mail, mx_host)
return True
raise MXRefuseConnection(mail, mx_host, mx_refuse_check_mail[mx_host])
except smtplib.SMTPConnectError:
raise MXUnavailable(mail, mx_host)
def mass_validate_email(mail, simple=False):
""" Validate an email address with mecanisms optimized for mass email addresses validation """
mail = clean_mail(mail)
if not validate_email(mail):
if options.raiseonerror:
raise EmailInvalidSyntax(mail)
return
elif simple:
return True
elif options.checkmx:
return check_mx(mail)
else:
return True
if __name__ == '__main__':
from optparse import OptionParser, OptionGroup
# Default options
default_output_delimiter = ";"
default_output_quotechar = '"'
parser = OptionParser()
# options
parser.add_option(
'-v',
'--verbose',
action="store_true",
dest="verbose",
help='Enable verbose mode'
)
parser.add_option(
'-d',
'--debug',
action="store_true",
dest="debug",
help='Enable debug mode'
)
parser.add_option(
'-p',
'--progress',
action='store_true',
dest='progress',
help='Enable progress bar',
default=False
)
parser.add_option(
'-D',
'--debug-smtp',
action="store_true",
dest="debugsmtp",
help='Enabled SMTP exchange debuging'
)
parser.add_option(
'-m',
'--mx',
action="store_true",
dest="checkmx",
help='Enable MX check'
)
parser.add_option(
'-V',
'--verify',
action="store_true",
dest="verifyaddress",
help="Enable email address verification on MX server. If this option is enabled, MX check is also automatically enabled."
)
parser.add_option(
'--use-smtp-vrfy',
action="store_true",
dest="usesmtpvrfy",
help="When MX check is enabled, enable the SMPT VRFY command usage"
)
parser.add_option(
'--accept-email-on-cnx-refused',
action="store_true",
dest="acceptoncnxrefused",
help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)"
)
parser.add_option(
'--accept-on-temporary-error',
action="store_true",
dest="acceptontemporaryerror",
help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)"
)
parser.add_option(
'-f',
'--from-file',
action="store",
type='string',
dest="fromfile",
help="Read emails addresses to validate from from"
)
output_opts = OptionGroup(parser, u"Output options")
output_opts.add_option(
'-o',
'--output-file',
action="store",
type='string',
dest="output_file",
help="Write emails addresses validation result as a CSV file"
)
output_opts.add_option(
'--delimiter',
action='store',
type='string',
dest='output_delimiter',
help="CSV ouput file delimiter (Default: %s)" % default_output_delimiter,
default=default_output_delimiter
)
output_opts.add_option(
'--quotechar',
action='store',
type='string',
dest='output_quotechar',
help="CSV ouput file quote character (Default: %s)" % default_output_quotechar,
default=default_output_quotechar
)
parser.add_option_group(output_opts)
(opts, emails) = parser.parse_args()
# Enable and configure logging
if opts.debug:
logging_level = logging.DEBUG
options.debug = True
elif opts.verbose:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s')
# If fromfile options if setted, load emails
if opts.fromfile:
logging.info('Load emails addresses from %s', opts.fromfile)
with open(opts.fromfile, 'r') as fd:
for line in fd.readlines():
email = line.strip()
if email not in emails:
emails.append(email)
# Check at leat one email is provided
if not emails:
parser.error('You must specify emails address as arguments')
# If output is enabled, import csv library
if opts.output_file:
import csv
# Configure other options from command line arguments
options.raiseonerror = True
options.debugsmtp = opts.debugsmtp
options.checkmx = opts.checkmx or opts.verifyaddress or opts.usesmtpvrfy
options.verifyaddress = opts.verifyaddress
options.usesmtpvrfy = opts.usesmtpvrfy
options.acceptoncnxrefused = opts.acceptoncnxrefused
options.acceptontemporaryerror = opts.acceptontemporaryerror
if opts.progress:
from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA
pbar = ProgressBar(
widgets=[
'Validating emails addresses : ',
Percentage(),
' ',
Bar(marker=RotatingMarker()),
' ',
SimpleProgress(),
ETA()
],
maxval=len(emails)
).start()
pbar_count = 0
else:
logging.info('Start emails addresses validation')
validated = []
not_validated = {}
for email in emails:
try:
if mass_validate_email(email):
logging.info('Address %s is valid', email)
validated.append(email)
else:
logging.info('Address %s is NOT valid, but no exception raised : it is not supose to happen !', email)
not_validated[email] = EmailInvalid(email)
except EmailInvalid as err:
not_validated[email] = err
if opts.progress:
pbar_count += 1
pbar.update(pbar_count)
if opts.progress:
pbar.finish()
if not_validated:
logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join([str(not_validated[email]) for email in not_validated]))
else:
logging.info('All %s emails addresses provided are valid.', len(emails))
if opts.output_file:
logging.info('Write emails validation result to file %s', opts.output_file)
with open(opts.output_file, 'w') as fd:
csv_output = csv.writer(fd, delimiter=opts.output_delimiter, quotechar=opts.output_quotechar)
for email in not_validated:
csv_output.writerow([email, not_validated[email].error_msg])
# Adapt exit code on validation result
sys.exit(1 if not_validated else 0)