commit bc4cf9904e7ba3f4aac9cdf5c4904bb48b05417f Author: Benjamin Renard Date: Thu May 30 00:45:50 2013 +0200 Initial commit diff --git a/mass_validate_email.py b/mass_validate_email.py new file mode 100755 index 0000000..4aa9e9c --- /dev/null +++ b/mass_validate_email.py @@ -0,0 +1,221 @@ +#!/usr/bin/python +# +# Python module to mass validating email address +# +# This module was inspired by (and use) validate_email library +# write by Syrus Akbary : +# +# https://github.com/SyrusAkbary/validate_email +# +# This main goal is to optimize mass validating using cache of +# bad (or good) domain or MX server. +# +# Author: Benjamin Renard +# Website: http://git.zionetrix.net/mass_validate_email +# Licence: LGPL + +import sys +import logging + +from validate_email import validate_email +import smtplib +import socket +import DNS + +try: + DNS.DiscoverNameServers() +except DNS.ServerError, e: + logging.fatal("Error discovering DNS servers : %s" % e) + sys.exit(1) + +# options +class OptionsClass(object): + + def __init__(self): + self.debug = False + self.debugsmtp = False + self.checkmx = False + self.verifyaddress = False + self.usesmtpvrfy = False + self.refusemailifnotpermit = True + self.refuseontemporaryerror = True + +options=OptionsClass() + +if options.verifyaddress: + options.checkmx=True + +def clean_mail(mail): + mail=str(mail).lower().strip() + return mail + + +domain_mx={} +valid_domain=[] +invalid_domain=[] +def check_mx(mail): + dom = mail[mail.find('@')+1:] + if not options.verifyaddress: + if dom in valid_domain: + return True + elif dom in invalid_domain: + return False + else: + try: + mx_hosts = DNS.mxlookup(dom) + if len(mx_hosts)>0: + domain_mx[dom]=[] + for mx in mx_hosts: + domain_mx[dom].append(mx[1]) + logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) + valid_domain.append(dom) + return True + elif connect_to_mx(dom): + domain_mx[dom]=[dom] + logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) + valid_domain.append(dom) + return True + else: + logging.debug("No valid MX of domain %s found" % dom) + invalid_domain.append(dom) + return False + except DNS.ServerError, e: + logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) + invalid_domain.append(dom) + return False + else: + if dom in invalid_domain: + return False + if dom in domain_mx: + for mx in domain_mx[dom]: + con = connect_to_mx(mx) + if not con: + continue + if check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)): + return True + return False + else: + try: + mx_hosts = DNS.mxlookup(dom) + if len(mx_hosts)>0: + domain_mx[dom]=[] + for mx in mx_hosts: + domain_mx[dom].append(mx[1]) + valid_domain.append(dom) + return check_mx(mail) + else: + # Directly check MX and mail + con=connect_to_mx(dom) + if not con: + invalid_domain.append(dom) + return False + domain_mx[dom]=[dom] + return check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)) + except DNS.ServerError, e: + logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) + invalid_domain.append(dom) + return False + +valid_mx=[] +invalid_mx=[] +def verify_mx(mx,mail,check_mail=False): + if not check_mail and mx in valid_mx: + return True + elif not check_mail and mx in invalid_mx: + return False + +def connect_to_mx(mx): + if mx in invalid_mx: + return False + try: + smtp = smtplib.SMTP(timeout=5) + smtp.connect(mx) + if options.debugsmtp: + smtp.set_debuglevel(True) + valid_mx.append(mx) + return smtp + except smtplib.SMTPConnectError: + logging.debug("MX server %s does not respond from SMTP" % mx) + except smtplib.SMTPServerDisconnected: + logging.debug("MX server %s unexpectedly closed connection" % mx) + except socket.gaierror: + logging.debug("Can't resolv MX server %s" % mx) + except socket.timeout: + logging.debug("Connection timeout to SMTP server %s" % mx) + except socket.error: + logging.debug("Connection error on SMTP server %s" % mx) + except Exception, e: + logging.error("Unknown error (%s) connecting to SMTP server %s : %s" % (type(e),mx,e)) + invalid_mx.append(mx) + return None + +mx_refuse_check_mail=[] +def check_mail_on_mx(mx,smtp,mail,ifNotPermit=False): + if mx in mx_refuse_check_mail: + return ifNotPermit + try: + status, _ = smtp.helo() + if status != 250: + mx_refuse_check_mail.append(mx) + return ifNotPermit + + if options.usesmtpvrfy: + (status, msg) = smtp.verify(mail) + if status >= 250 and status < 260: + # Server normaly return an normalize email address + for word in msg.split(' '): + if validate_email(word): + return True + smtp.mail('') + status, msg = smtp.rcpt(mail) + if status >= 400 and status < 500: + logging.debug('SMTP server return temporary error (code=%s) : %s' % (status,msg)) + return not options.refuseontemporaryerror + elif status != 250: + return False + return True + except smtplib.SMTPServerDisconnected: + # Server not permits verify user + mx_refuse_check_mail.append(mx) + return ifNotPermit + except smtplib.SMTPConnectError: + return False + +def mass_validate_email(mail,simple=False): + mail=clean_mail(mail) + if not validate_email(mail): + return + elif simple: + return True + elif options.checkmx: + return check_mx(mail) + else: + return True + +if __name__=='__main__': + + if len(sys.argv)!=2: + print "Usage : %s [email]" % sys.argv[0] + sys.exit(0) + + logging.basicConfig(level=logging.DEBUG) + options.debugsmtp=True + + mail=sys.argv[1] + + print "Simple syntax validation :" + print "==========================" + print "Return : %s" % mass_validate_email(mail) + + options.checkmx=True + print "\n\n" + print "Syntax validation and domain MX check :" + print "=======================================" + print "Return : %s" % mass_validate_email(mail) + + options.verifyaddress=True + print "\n\n" + print "Syntax validation, domain MX check and validation of email address by SMTP server :" + print "===================================================================================" + print "Return : %s" % mass_validate_email(mail) +