Browse Source

Initial commit

Benjamin Renard 8 years ago
commit
bc4cf9904e
1 changed files with 221 additions and 0 deletions
  1. 221 0
      mass_validate_email.py

+ 221 - 0
mass_validate_email.py

@@ -0,0 +1,221 @@
+#!/usr/bin/python
+#
+# Python module to mass validating email address
+#
+# This module was inspired by (and use) validate_email library
+# write by Syrus Akbary :
+# 
+#    https://github.com/SyrusAkbary/validate_email
+#
+# This main goal is to optimize mass validating using cache of
+# bad (or good) domain or MX server.
+#
+# Author: Benjamin Renard
+# Website: http://git.zionetrix.net/mass_validate_email
+# Licence: LGPL
+
+import sys
+import logging
+
+from validate_email import validate_email
+import smtplib
+import socket
+import DNS
+
+try:
+	DNS.DiscoverNameServers()
+except DNS.ServerError, e:
+	logging.fatal("Error discovering DNS servers : %s" % e)
+	sys.exit(1)
+
+# options
+class OptionsClass(object):
+
+	def __init__(self):
+		self.debug = False
+		self.debugsmtp = False
+		self.checkmx = False
+		self.verifyaddress = False
+		self.usesmtpvrfy = False
+		self.refusemailifnotpermit = True
+		self.refuseontemporaryerror = True
+
+options=OptionsClass()
+
+if options.verifyaddress:
+	options.checkmx=True
+
+def clean_mail(mail):
+	mail=str(mail).lower().strip()
+	return mail
+
+
+domain_mx={}
+valid_domain=[]
+invalid_domain=[]
+def check_mx(mail):
+	dom = mail[mail.find('@')+1:]
+	if not options.verifyaddress:
+		if dom in valid_domain:
+			return True
+		elif dom in invalid_domain:
+			return False
+		else:
+			try:
+				mx_hosts = DNS.mxlookup(dom)
+				if len(mx_hosts)>0:
+					domain_mx[dom]=[]
+					for mx in mx_hosts:
+						domain_mx[dom].append(mx[1])
+					logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom])))
+					valid_domain.append(dom)
+					return True
+				elif connect_to_mx(dom):
+					domain_mx[dom]=[dom]
+					logging.debug("MX of domain %s : %s" % (dom,','.join(domain_mx[dom])))
+					valid_domain.append(dom)
+					return True
+				else:
+					logging.debug("No valid MX of domain %s found" % dom)
+					invalid_domain.append(dom)
+					return False
+			except DNS.ServerError, e:
+				logging.debug('Error getting MX servers of domain %s : %s' % (dom,e))
+				invalid_domain.append(dom)
+				return False
+	else:
+		if dom in invalid_domain:
+			return False
+		if dom in domain_mx:
+			for mx in domain_mx[dom]:
+				con = connect_to_mx(mx)
+				if not con:
+					continue
+				if check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit)):
+					return True
+			return False
+		else:
+			try:
+				mx_hosts = DNS.mxlookup(dom)
+				if len(mx_hosts)>0:
+					domain_mx[dom]=[]
+					for mx in mx_hosts:
+						domain_mx[dom].append(mx[1])
+					valid_domain.append(dom)
+					return check_mx(mail)
+				else:
+					# Directly check MX and mail
+					con=connect_to_mx(dom)
+					if not con:
+						invalid_domain.append(dom)
+						return False
+					domain_mx[dom]=[dom]
+					return check_mail_on_mx(dom,con,mail,ifNotPermit=(not options.refusemailifnotpermit))
+			except DNS.ServerError, e:
+				logging.debug('Error getting MX servers of domain %s : %s' % (dom,e))
+				invalid_domain.append(dom)
+				return False
+
+valid_mx=[]
+invalid_mx=[]
+def verify_mx(mx,mail,check_mail=False):
+	if not check_mail and mx in valid_mx:
+		return True
+	elif not check_mail and mx in invalid_mx:
+		return False
+
+def connect_to_mx(mx):
+	if mx in invalid_mx:
+		return False
+	try:
+		smtp = smtplib.SMTP(timeout=5)
+		smtp.connect(mx)
+		if options.debugsmtp:
+			smtp.set_debuglevel(True)
+		valid_mx.append(mx)
+		return smtp
+	except smtplib.SMTPConnectError:
+		logging.debug("MX server %s does not respond from SMTP" % mx)
+	except smtplib.SMTPServerDisconnected:
+		logging.debug("MX server %s unexpectedly closed connection" % mx)
+	except socket.gaierror:
+		logging.debug("Can't resolv MX server %s" % mx)
+	except socket.timeout:
+		logging.debug("Connection timeout to SMTP server %s" % mx)
+	except socket.error:
+		logging.debug("Connection error on SMTP server %s" % mx)
+	except Exception, e:
+		logging.error("Unknown error (%s) connecting to SMTP server %s : %s" % (type(e),mx,e))
+	invalid_mx.append(mx)
+	return None
+
+mx_refuse_check_mail=[]
+def check_mail_on_mx(mx,smtp,mail,ifNotPermit=False):
+	if mx in mx_refuse_check_mail:
+		return ifNotPermit
+	try:
+		status, _ = smtp.helo()
+		if status != 250:
+			mx_refuse_check_mail.append(mx)
+			return ifNotPermit
+
+		if options.usesmtpvrfy:
+			(status, msg) = smtp.verify(mail)
+			if status >= 250 and status < 260:
+				# Server normaly return an normalize email address
+				for word in msg.split(' '):
+					if validate_email(word):
+						return True
+		smtp.mail('')
+		status, msg = smtp.rcpt(mail)
+		if status >= 400 and status < 500:
+			logging.debug('SMTP server return temporary error (code=%s) : %s' % (status,msg))
+			return not options.refuseontemporaryerror
+		elif status != 250:
+			return False
+		return True
+	except smtplib.SMTPServerDisconnected:
+		# Server not permits verify user
+		mx_refuse_check_mail.append(mx)
+		return ifNotPermit
+	except smtplib.SMTPConnectError:
+		return False
+
+def mass_validate_email(mail,simple=False):
+	mail=clean_mail(mail)
+	if not validate_email(mail):
+		return
+	elif simple:
+		return True
+	elif options.checkmx:
+		return check_mx(mail)
+	else:
+		return True
+
+if __name__=='__main__':
+
+	if len(sys.argv)!=2:
+		print "Usage : %s [email]" % sys.argv[0]
+		sys.exit(0)
+
+	logging.basicConfig(level=logging.DEBUG)
+	options.debugsmtp=True
+
+	mail=sys.argv[1]
+
+	print "Simple syntax validation :"
+	print "=========================="
+	print "Return : %s" % mass_validate_email(mail)
+
+	options.checkmx=True
+	print "\n\n"
+	print "Syntax validation and domain MX check :"
+	print "======================================="
+	print "Return : %s" % mass_validate_email(mail)
+
+	options.verifyaddress=True
+	print "\n\n"
+	print "Syntax validation, domain MX check and validation of email address by SMTP server :"
+	print "==================================================================================="
+	print "Return : %s" % mass_validate_email(mail)
+