Improve test mode and fix some pylint warnings

This commit is contained in:
Benjamin Renard 2019-09-09 17:12:42 +02:00
parent bc4cf9904e
commit d5c2363b8c
2 changed files with 301 additions and 176 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
*~
*.pyc
.*.swp

View file

@ -11,211 +11,333 @@
# bad (or good) domain or MX server. # bad (or good) domain or MX server.
# #
# Author: Benjamin Renard # Author: Benjamin Renard
# Website: http://git.zionetrix.net/mass_validate_email # Website: https://gogs.zionetrix.net/bn8/mass_validate_email
# Licence: LGPL # Licence: LGPL
import smtplib
import socket
import sys import sys
import logging import logging
from validate_email import validate_email
import smtplib
import socket
import DNS import DNS
from validate_email import validate_email
try: try:
DNS.DiscoverNameServers() DNS.DiscoverNameServers()
except DNS.ServerError, e: except DNS.ServerError, err:
logging.fatal("Error discovering DNS servers : %s" % e) logging.fatal("Error discovering DNS servers : %s", err)
sys.exit(1) sys.exit(1)
# options # options
class OptionsClass(object): class OptionsClass(object):
def __init__(self): def __init__(self):
self.debug = False self.debug = False
self.debugsmtp = False self.debugsmtp = False
self.checkmx = False self.checkmx = False
self.verifyaddress = False self.verifyaddress = False
self.usesmtpvrfy = False self.usesmtpvrfy = False
self.refusemailifnotpermit = True self.refusemailifnotpermit = True
self.refuseontemporaryerror = True self.refuseontemporaryerror = True
options=OptionsClass() options = OptionsClass()
if options.verifyaddress: if options.verifyaddress:
options.checkmx=True options.checkmx = True
def clean_mail(mail): def clean_mail(mail):
mail=str(mail).lower().strip() mail = str(mail).lower().strip()
return mail return mail
domain_mx={} domain_mx = {}
valid_domain=[] valid_domain = []
invalid_domain=[] invalid_domain = []
def check_mx(mail): def check_mx(mail):
dom = mail[mail.find('@')+1:] dom = mail[mail.find('@')+1:]
if not options.verifyaddress: if not options.verifyaddress:
if dom in valid_domain: if dom in valid_domain:
return True return True
elif dom in invalid_domain: elif dom in invalid_domain:
return False return False
else: else:
try: try:
mx_hosts = DNS.mxlookup(dom) mx_hosts = DNS.mxlookup(dom)
if len(mx_hosts)>0: if len(mx_hosts) > 0:
domain_mx[dom]=[] domain_mx[dom] = []
for mx in mx_hosts: for mx_host in mx_hosts:
domain_mx[dom].append(mx[1]) domain_mx[dom].append(mx_host[1])
logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom]))
valid_domain.append(dom) valid_domain.append(dom)
return True return True
elif connect_to_mx(dom): elif connect_to_mx(dom):
domain_mx[dom]=[dom] domain_mx[dom] = [dom]
logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom]))) logging.debug("MX of domain %s : %s", dom, ','.join(domain_mx[dom]))
valid_domain.append(dom) valid_domain.append(dom)
return True return True
else: else:
logging.debug("No valid MX of domain %s found" % dom) logging.debug("No valid MX of domain %s found", dom)
invalid_domain.append(dom) invalid_domain.append(dom)
return False return False
except DNS.ServerError, e: except DNS.ServerError, err:
logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) logging.debug('Error getting MX servers of domain %s : %s', dom, err)
invalid_domain.append(dom) invalid_domain.append(dom)
return False return False
else: else:
if dom in invalid_domain: if dom in invalid_domain:
return False return False
if dom in domain_mx: if dom in domain_mx:
for mx in domain_mx[dom]: for mx_host in domain_mx[dom]:
con = connect_to_mx(mx) con = connect_to_mx(mx_host)
if not con: if not con:
continue continue
if check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)): if check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit)):
return True return True
return False return False
else: else:
try: try:
mx_hosts = DNS.mxlookup(dom) mx_hosts = DNS.mxlookup(dom)
if len(mx_hosts)>0: if len(mx_hosts) > 0:
domain_mx[dom]=[] domain_mx[dom] = []
for mx in mx_hosts: for mx_host in mx_hosts:
domain_mx[dom].append(mx[1]) domain_mx[dom].append(mx_host[1])
valid_domain.append(dom) valid_domain.append(dom)
return check_mx(mail) return check_mx(mail)
else: else:
# Directly check MX and mail # Directly check MX and mail
con=connect_to_mx(dom) con = connect_to_mx(dom)
if not con: if not con:
invalid_domain.append(dom) invalid_domain.append(dom)
return False return False
domain_mx[dom]=[dom] domain_mx[dom] = [dom]
return check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)) return check_mail_on_mx(dom, con, mail, if_not_permit=(not options.refusemailifnotpermit))
except DNS.ServerError, e: except DNS.ServerError, err:
logging.debug('Error getting MX servers of domain %s : %s' % (dom,e)) logging.debug('Error getting MX servers of domain %s : %s', dom, err)
invalid_domain.append(dom) invalid_domain.append(dom)
return False return False
valid_mx=[] valid_mx = []
invalid_mx=[] invalid_mx = []
def verify_mx(mx,mail,check_mail=False): def verify_mx(mx_host, mail, check_mail=False):
if not check_mail and mx in valid_mx: if not check_mail and mx_host in valid_mx:
return True return True
elif not check_mail and mx in invalid_mx: elif not check_mail and mx_host in invalid_mx:
return False return False
def connect_to_mx(mx): def connect_to_mx(mx_host):
if mx in invalid_mx: if mx_host in invalid_mx:
return False return False
try: try:
smtp = smtplib.SMTP(timeout=5) smtp = smtplib.SMTP(timeout=5)
smtp.connect(mx) smtp.connect(mx_host)
if options.debugsmtp: if options.debugsmtp:
smtp.set_debuglevel(True) smtp.set_debuglevel(True)
valid_mx.append(mx) valid_mx.append(mx_host)
return smtp return smtp
except smtplib.SMTPConnectError: except smtplib.SMTPConnectError:
logging.debug("MX server %s does not respond from SMTP" % mx) logging.debug("MX server %s does not respond from SMTP", mx_host)
except smtplib.SMTPServerDisconnected: except smtplib.SMTPServerDisconnected:
logging.debug("MX server %s unexpectedly closed connection" % mx) logging.debug("MX server %s unexpectedly closed connection", mx_host)
except socket.gaierror: except socket.gaierror:
logging.debug("Can't resolv MX server %s" % mx) logging.debug("Can't resolv MX server %s", mx_host)
except socket.timeout: except socket.timeout:
logging.debug("Connection timeout to SMTP server %s" % mx) logging.debug("Connection timeout to SMTP server %s", mx_host)
except socket.error: except socket.error:
logging.debug("Connection error on SMTP server %s" % mx) logging.debug("Connection error on SMTP server %s", mx_host)
except Exception, e: except Exception:
logging.error("Unknown error (%s) connecting to SMTP server %s : %s" % (type(e),mx,e)) logging.error("Unknown error connecting to SMTP server %s", mx_host, exc_info=True)
invalid_mx.append(mx) invalid_mx.append(mx_host)
return None return None
mx_refuse_check_mail=[] mx_refuse_check_mail = []
def check_mail_on_mx(mx,smtp,mail,ifNotPermit=False): def check_mail_on_mx(mx_host, smtp, mail, if_not_permit=False):
if mx in mx_refuse_check_mail: if mx_host in mx_refuse_check_mail:
return ifNotPermit return if_not_permit
try: try:
status, _ = smtp.helo() status, _ = smtp.helo()
if status != 250: if status != 250:
mx_refuse_check_mail.append(mx) mx_refuse_check_mail.append(mx_host)
return ifNotPermit return if_not_permit
if options.usesmtpvrfy: if options.usesmtpvrfy:
(status, msg) = smtp.verify(mail) (status, msg) = smtp.verify(mail)
if status >= 250 and status < 260: if status >= 250 and status < 260:
# Server normaly return an normalize email address # Server normaly return an normalize email address
for word in msg.split(' '): for word in msg.split(' '):
if validate_email(word): if validate_email(word):
return True return True
smtp.mail('') smtp.mail('')
status, msg = smtp.rcpt(mail) status, msg = smtp.rcpt(mail)
if status >= 400 and status < 500: if status >= 400 and status < 500:
logging.debug('SMTP server return temporary error (code=%s) : %s' % (status,msg)) logging.debug('SMTP server return temporary error (code=%s) : %s', status, msg)
return not options.refuseontemporaryerror return not options.refuseontemporaryerror
elif status != 250: elif status != 250:
return False return False
return True return True
except smtplib.SMTPServerDisconnected: except smtplib.SMTPServerDisconnected:
# Server not permits verify user # Server not permits verify user
mx_refuse_check_mail.append(mx) mx_refuse_check_mail.append(mx_host)
return ifNotPermit return if_not_permit
except smtplib.SMTPConnectError: except smtplib.SMTPConnectError:
return False return False
def mass_validate_email(mail,simple=False): def mass_validate_email(mail, simple=False):
mail=clean_mail(mail) mail = clean_mail(mail)
if not validate_email(mail): if not validate_email(mail):
return return
elif simple: elif simple:
return True return True
elif options.checkmx: elif options.checkmx:
return check_mx(mail) return check_mx(mail)
else: else:
return True return True
if __name__=='__main__': if __name__ == '__main__':
if len(sys.argv)!=2: from optparse import OptionParser
print "Usage : %s [email]" % sys.argv[0]
sys.exit(0)
logging.basicConfig(level=logging.DEBUG) parser = OptionParser()
options.debugsmtp=True # options
parser.add_option(
'-v',
'--verbose',
action="store_true",
dest="verbose",
help='Enable verbose mode'
)
parser.add_option(
'-d',
'--debug',
action="store_true",
dest="debug",
help='Enable debug mode'
)
parser.add_option(
'-p',
'--progress',
action='store_true',
dest='progress',
help='Enable progress bar',
default=False
)
parser.add_option(
'-D',
'--debug-smtp',
action="store_true",
dest="debugsmtp",
help='Enabled SMTP exchange debuging'
)
parser.add_option(
'-m',
'--mx',
action="store_true",
dest="checkmx",
help='Enable MX check'
)
parser.add_option(
'-V',
'--verify',
action="store_true",
dest="verifyaddress",
help="Enable email address verification on MX server. If this option is enabled, MX check is also automatically enabled."
)
parser.add_option(
'--use-smtp-vrfy',
action="store_true",
dest="usesmtpvrfy",
help="When MX check is enabled, enable the SMPT VRFY command usage"
)
parser.add_option(
'--accept-mail-if-not-permit',
action="store_false",
dest="refusemailifnotpermit",
help="When MX check is enabled, accept email address even if MX server refuse the SMTP connection (after HELO command)"
)
parser.add_option(
'--accept-on-temporary-error',
action="store_false",
dest="refuseontemporaryerror",
help="When MX check is enabled, accept email address even if MX server return a temporary error (after trying to send an email to the checked address)"
)
parser.add_option(
'-f',
'--from-file',
action="store",
type='string',
dest="fromfile",
help="Read emails addresses to validate from from"
)
mail=sys.argv[1] (opts, emails) = parser.parse_args()
print "Simple syntax validation :" # Enable and configure logging
print "==========================" if opts.debug:
print "Return : %s" % mass_validate_email(mail) logging_level = logging.DEBUG
options.debug = True
elif opts.verbose:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
options.checkmx=True logging.basicConfig(level=logging_level, format='%(asctime)s - %(levelname)s - %(message)s')
print "\n\n"
print "Syntax validation and domain MX check :"
print "======================================="
print "Return : %s" % mass_validate_email(mail)
options.verifyaddress=True # If fromfile options if setted, load emails
print "\n\n" if opts.fromfile:
print "Syntax validation, domain MX check and validation of email address by SMTP server :" logging.info('Load emails addresses from %s', opts.fromfile)
print "===================================================================================" with open(opts.fromfile, 'r') as fd:
print "Return : %s" % mass_validate_email(mail) for line in fd.readlines():
email = line.strip()
if email not in emails:
emails.append(email)
# Check at leat one email is provided
if not emails:
parser.error('You must specify emails address as arguments')
# Configure other options from command line arguments
options.debugsmtp = opts.debugsmtp
options.checkmx = opts.checkmx
options.verifyaddress = opts.verifyaddress
options.usesmtpvrfy = opts.usesmtpvrfy
options.refusemailifnotpermit = opts.refusemailifnotpermit
options.refuseontemporaryerror = opts.refuseontemporaryerror
if opts.progress:
from progressbar import ProgressBar, Percentage, Bar, RotatingMarker, SimpleProgress, ETA
pbar = ProgressBar(
widgets=[
'Validating emails addresses : ',
Percentage(),
' ',
Bar(marker=RotatingMarker()),
' ',
SimpleProgress(),
ETA()
],
maxval=len(emails)
).start()
pbar_count = 0
else:
logging.info('Start emails addresses validation')
validated = []
not_validated = []
for email in emails:
if mass_validate_email(email):
logging.info('Address %s is valid', email)
validated.append(email)
else:
logging.info('Address %s is NOT valid', email)
not_validated.append(email)
if opts.progress:
pbar_count += 1
pbar.update(pbar_count)
if opts.progress:
pbar.finish()
if not_validated:
logging.warning('%s on %s is NOT valid :\n- %s', len(not_validated), len(emails), '\n- '.join(not_validated))
else:
logging.info('All %s emails addresses provided are valid.', len(emails))