diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c709f35 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*~ +*.pyc +.*.swp diff --git a/mass_validate_email.py b/mass_validate_email.py index 4aa9e9c..8364022 100755 --- a/mass_validate_email.py +++ b/mass_validate_email.py @@ -4,218 +4,340 @@ # # This module was inspired by (and use) validate_email library # write by Syrus Akbary : -# +# # https://github.com/SyrusAkbary/validate_email # # This main goal is to optimize mass validating using cache of # bad (or good) domain or MX server. # # Author: Benjamin Renard -# Website: http://git.zionetrix.net/mass_validate_email +# Website: https://gogs.zionetrix.net/bn8/mass_validate_email # Licence: LGPL +import smtplib +import socket import sys import logging -from validate_email import validate_email -import smtplib -import socket import DNS +from validate_email import validate_email try: - DNS.DiscoverNameServers() -except DNS.ServerError, e: - logging.fatal("Error discovering DNS servers : %s" % e) - sys.exit(1) + DNS.DiscoverNameServers() +except DNS.ServerError, err: + logging.fatal("Error discovering DNS servers : %s", err) + sys.exit(1) # options class OptionsClass(object): - def __init__(self): - self.debug = False - self.debugsmtp = False - self.checkmx = False - self.verifyaddress = False - self.usesmtpvrfy = False - self.refusemailifnotpermit = True - self.refuseontemporaryerror = True + def __init__(self): + self.debug = False + self.debugsmtp = False + self.checkmx = False + self.verifyaddress = False + self.usesmtpvrfy = False + self.refusemailifnotpermit = True + self.refuseontemporaryerror = True -options=OptionsClass() +options = OptionsClass() if options.verifyaddress: - options.checkmx=True + options.checkmx = True def clean_mail(mail): - mail=str(mail).lower().strip() - return mail + mail = str(mail).lower().strip() + return mail -domain_mx={} -valid_domain=[] -invalid_domain=[] +domain_mx = {} +valid_domain = [] +invalid_domain = [] def check_mx(mail): - dom = mail[mail.find('@')+1:] - if not options.verifyaddress: - if dom in valid_domain: - return True - elif dom in invalid_domain: - return False - else: - try: - mx_hosts = DNS.mxlookup(dom) - if len(mx_hosts)>0: - domain_mx[dom]=[] - for mx in mx_hosts: - domain_mx[dom].append(mx[1]) - logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) - valid_domain.append(dom) - return True - elif connect_to_mx(dom): - domain_mx[dom]=[dom] - logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) - valid_domain.append(dom) - return True - else: - logging.debug("No valid MX of domain %s found" % dom) - invalid_domain.append(dom) - return False - except DNS.ServerError, e: - logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) - invalid_domain.append(dom) - return False - else: - if dom in invalid_domain: - return False - if dom in domain_mx: - for mx in domain_mx[dom]: - con = connect_to_mx(mx) - if not con: - continue - if check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)): - return True - return False - else: - try: - mx_hosts = DNS.mxlookup(dom) - if len(mx_hosts)>0: - domain_mx[dom]=[] - for mx in mx_hosts: - domain_mx[dom].append(mx[1]) - valid_domain.append(dom) - return check_mx(mail) - else: - # Directly check MX and mail - con=connect_to_mx(dom) - if not con: - invalid_domain.append(dom) - return False - domain_mx[dom]=[dom] - return check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)) - except DNS.ServerError, e: - logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) - invalid_domain.append(dom) - return False + dom = mail[mail.find('@')+1:] + if not options.verifyaddress: + if dom in valid_domain: + return True + elif dom in invalid_domain: + return False + else: + try: + mx_hosts = DNS.mxlookup(dom) + if len(mx_hosts) > 0: + domain_mx[dom] = [] + for mx_host in mx_hosts: + domain_mx[dom].append(mx_host[1]) + logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom])) + valid_domain.append(dom) + return True + elif connect_to_mx(dom): + domain_mx[dom] = [dom] + logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom])) + valid_domain.append(dom) + return True + else: + logging.debug("No valid MX of domain %s found", dom) + invalid_domain.append(dom) + return False + except DNS.ServerError, err: + logging.debug('Error getting MX servers of domain %s : %s', dom, err) + invalid_domain.append(dom) + return False + else: + if dom in invalid_domain: + return False + if dom in domain_mx: + for mx_host in domain_mx[dom]: + con = connect_to_mx(mx_host) + if not con: + continue + if check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)): + return True + return False + else: + try: + mx_hosts = DNS.mxlookup(dom) + if len(mx_hosts) > 0: + domain_mx[dom] = [] + for mx_host in mx_hosts: + domain_mx[dom].append(mx_host[1]) + valid_domain.append(dom) + return check_mx(mail) + else: + # Directly check MX and mail + con = connect_to_mx(dom) + if not con: + invalid_domain.append(dom) + return False + domain_mx[dom] = [dom] + return check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)) + except DNS.ServerError, err: + logging.debug('Error getting MX servers of domain %s : %s', dom, err) + invalid_domain.append(dom) + return False -valid_mx=[] -invalid_mx=[] -def verify_mx(mx,mail,check_mail=False): - if not check_mail and mx in valid_mx: - return True - elif not check_mail and mx in invalid_mx: - return False +valid_mx = [] +invalid_mx = [] +def verify_mx(mx_host, mail, check_mail=False): + if not check_mail and mx_host in valid_mx: + return True + elif not check_mail and mx_host in invalid_mx: + return False -def connect_to_mx(mx): - if mx in invalid_mx: - return False - try: - smtp = smtplib.SMTP(timeout=5) - smtp.connect(mx) - if options.debugsmtp: - smtp.set_debuglevel(True) - valid_mx.append(mx) - return smtp - except smtplib.SMTPConnectError: - logging.debug("MX server %s does not respond from SMTP" % mx) - except smtplib.SMTPServerDisconnected: - logging.debug("MX server %s unexpectedly closed connection" % mx) - except socket.gaierror: - logging.debug("Can't resolv MX server %s" % mx) - except socket.timeout: - logging.debug("Connection timeout to SMTP server %s" % mx) - except socket.error: - logging.debug("Connection error on SMTP server %s" % mx) - except Exception, e: - logging.error("Unknown error (%s) connecting to SMTP server %s : %s" % (type(e),mx,e)) - invalid_mx.append(mx) - return None +def connect_to_mx(mx_host): + if mx_host in invalid_mx: + return False + try: + smtp = smtplib.SMTP(timeout=5) + smtp.connect(mx_host) + if options.debugsmtp: + smtp.set_debuglevel(True) + valid_mx.append(mx_host) + return smtp + except smtplib.SMTPConnectError: + logging.debug("MX server %s does not respond from SMTP", mx_host) + except smtplib.SMTPServerDisconnected: + logging.debug("MX server %s unexpectedly closed connection", mx_host) + except socket.gaierror: + logging.debug("Can't resolv MX server %s", mx_host) + except socket.timeout: + logging.debug("Connection timeout to SMTP server %s", mx_host) + except socket.error: + logging.debug("Connection error on SMTP server %s", mx_host) + except Exception: + logging.error("Unknown error connecting to SMTP server %s", mx_host, exc_info=True) + invalid_mx.append(mx_host) + return None -mx_refuse_check_mail=[] -def check_mail_on_mx(mx,smtp,mail,ifNotPermit=False): - if mx in mx_refuse_check_mail: - return ifNotPermit - try: - status, _ = smtp.helo() - if status != 250: - mx_refuse_check_mail.append(mx) - return ifNotPermit +mx_refuse_check_mail = [] +def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False): + if mx_host in mx_refuse_check_mail: + return if_not_permit + try: + status, _ = smtp.helo() + if status != 250: + mx_refuse_check_mail.append(mx_host) + return if_not_permit - if options.usesmtpvrfy: - (status, msg) = smtp.verify(mail) - if status >= 250 and status < 260: - # Server normaly return an normalize email address - for word in msg.split(' '): - if validate_email(word): - return True - smtp.mail('') - status, msg = smtp.rcpt(mail) - if status >= 400 and status < 500: - logging.debug('SMTP server return temporary error (code=%s) : %s' % (status,msg)) - return not options.refuseontemporaryerror - elif status != 250: - return False - return True - except smtplib.SMTPServerDisconnected: - # Server not permits verify user - mx_refuse_check_mail.append(mx) - return ifNotPermit - except smtplib.SMTPConnectError: - return False + if options.usesmtpvrfy: + (status, msg) = smtp.verify(mail) + if status >= 250 and status < 260: + # Server normaly return an normalize email address + for word in msg.split(' '): + if validate_email(word): + return True + smtp.mail('') + status, msg = smtp.rcpt(mail) + if status >= 400 and status < 500: + logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg) + return not options.refuseontemporaryerror + elif status != 250: + return False + return True + except smtplib.SMTPServerDisconnected: + # Server not permits verify user + mx_refuse_check_mail.append(mx_host) + return if_not_permit + except smtplib.SMTPConnectError: + return False -def mass_validate_email(mail,simple=False): - mail=clean_mail(mail) - if not validate_email(mail): - return - elif simple: - return True - elif options.checkmx: - return check_mx(mail) - else: - return True +def mass_validate_email(mail, simple=False): + mail = clean_mail(mail) + if not validate_email(mail): + return + elif simple: + return True + elif options.checkmx: + return check_mx(mail) + else: + return True -if __name__=='__main__': +if __name__ == '__main__': - if len(sys.argv)!=2: - print "Usage : %s [email]" % sys.argv[0] - sys.exit(0) + from optparse import OptionParser - logging.basicConfig(level=logging.DEBUG) - options.debugsmtp=True + parser = OptionParser() + # options + parser.add_option( + '-v', + '--verbose', + action="store_true", + dest="verbose", + help='Enable verbose mode' + ) + parser.add_option( + '-d', + '--debug', + action="store_true", + dest="debug", + help='Enable debug mode' + ) + parser.add_option( + '-p', + '--progress', + action='store_true', + dest='progress', + help='Enable progress bar', + default=False + ) + parser.add_option( + '-D', + '--debug-smtp', + action="store_true", + dest="debugsmtp", + help='Enabled SMTP exchange debuging' + ) + parser.add_option( + '-m', + '--mx', + action="store_true", + dest="checkmx", + help='Enable MX check' + ) + parser.add_option( + '-V', + '--verify', + action="store_true", + dest="verifyaddress", + help="Enable email address verification on MX server. If this option is enabled, MX check is also automatically enabled." + ) + parser.add_option( + '--use-smtp-vrfy', + action="store_true", + dest="usesmtpvrfy", + help="When MX check is enabled, enable the SMPT VRFY command usage" + ) + parser.add_option( + '--accept-mail-if-not-permit', + action="store_false", + dest="refusemailifnotpermit", + help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)" + ) + parser.add_option( + '--accept-on-temporary-error', + action="store_false", + dest="refuseontemporaryerror", + help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)" + ) + parser.add_option( + '-f', + '--from-file', + action="store", + type='string', + dest="fromfile", + help="Read emails addresses to validate from from" + ) - mail=sys.argv[1] + (opts, emails) = parser.parse_args() - print "Simple syntax validation :" - print "==========================" - print "Return : %s" % mass_validate_email(mail) + # Enable and configure logging + if opts.debug: + logging_level = logging.DEBUG + options.debug = True + elif opts.verbose: + logging_level = logging.INFO + else: + logging_level = logging.WARNING - options.checkmx=True - print "\n\n" - print "Syntax validation and domain MX check :" - print "=======================================" - print "Return : %s" % mass_validate_email(mail) + logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s') - options.verifyaddress=True - print "\n\n" - print "Syntax validation, domain MX check and validation of email address by SMTP server :" - print "===================================================================================" - print "Return : %s" % mass_validate_email(mail) + # If fromfile options if setted, load emails + if opts.fromfile: + logging.info('Load emails addresses from %s', opts.fromfile) + with open(opts.fromfile, 'r') as fd: + for line in fd.readlines(): + email = line.strip() + if email not in emails: + emails.append(email) + # Check at leat one email is provided + if not emails: + parser.error('You must specify emails address as arguments') + + # Configure other options from command line arguments + options.debugsmtp = opts.debugsmtp + options.checkmx = opts.checkmx + options.verifyaddress = opts.verifyaddress + options.usesmtpvrfy = opts.usesmtpvrfy + options.refusemailifnotpermit = opts.refusemailifnotpermit + options.refuseontemporaryerror = opts.refuseontemporaryerror + + if opts.progress: + from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA + + pbar = ProgressBar( + widgets=[ + 'Validating emails addresses : ', + Percentage(), + ' ', + Bar(marker=RotatingMarker()), + ' ', + SimpleProgress(), + ETA() + ], + maxval=len(emails) + ).start() + pbar_count = 0 + else: + logging.info('Start emails addresses validation') + + validated = [] + not_validated = [] + for email in emails: + if mass_validate_email(email): + logging.info('Address %s is valid', email) + validated.append(email) + else: + logging.info('Address %s is NOT valid', email) + not_validated.append(email) + if opts.progress: + pbar_count += 1 + pbar.update(pbar_count) + + if opts.progress: + pbar.finish() + + if not_validated: + logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join(not_validated)) + else: + logging.info('All %s emails addresses provided are valid.', len(emails))