From bd4845f7802079b2bb7d311fe0bcc966bf605247 Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Wed, 19 May 2021 18:07:42 +0200 Subject: [PATCH] Switch to module style lib, make python3 compatibility and clean code --- .gitignore | 2 + MyDB.py | 59 ------- Pbar.py | 123 ------------- PgDB.py | 179 ------------------- README.md | 33 ++++ mylib/__init__.py | 0 EmailClient.py => mylib/email.py | 2 +- LdapServer.py => mylib/ldap.py | 55 +----- mylib/mysql.py | 58 +++++++ mylib/opening_hours.py | 217 +++++++++++++++++++++++ mylib/pbar.py | 53 ++++++ mylib/pgsql.py | 203 ++++++++++++++++++++++ Report.py => mylib/report.py | 11 +- mylib/scripts/email_test.py | 78 +++++++++ mylib/scripts/helpers.py | 190 ++++++++++++++++++++ mylib/scripts/ldap_test.py | 74 ++++++++ mylib/scripts/pbar_test.py | 43 +++++ mylib/scripts/report_test.py | 48 ++++++ opening_hours.py | 207 ---------------------- setup.py | 50 ++++++ tests/test_opening_hours.py | 286 +++++++++++++++++++++++++++++++ tests/tests_opening_hours.py | 53 ------ 22 files changed, 1343 insertions(+), 681 deletions(-) delete mode 100644 MyDB.py delete mode 100644 Pbar.py delete mode 100644 PgDB.py create mode 100644 README.md create mode 100644 mylib/__init__.py rename EmailClient.py => mylib/email.py (99%) rename LdapServer.py => mylib/ldap.py (79%) create mode 100644 mylib/mysql.py create mode 100644 mylib/opening_hours.py create mode 100644 mylib/pbar.py create mode 100644 mylib/pgsql.py rename Report.py => mylib/report.py (87%) create mode 100644 mylib/scripts/email_test.py create mode 100644 mylib/scripts/helpers.py create mode 100644 mylib/scripts/ldap_test.py create mode 100644 mylib/scripts/pbar_test.py create mode 100644 mylib/scripts/report_test.py delete mode 100644 opening_hours.py create mode 100644 setup.py create mode 100644 tests/test_opening_hours.py delete mode 100644 tests/tests_opening_hours.py diff --git a/.gitignore b/.gitignore index f3d74a9..95c2611 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ *.pyc *~ +mylib.egg-info +venv* diff --git a/MyDB.py b/MyDB.py deleted file mode 100644 index 09a7ddf..0000000 --- a/MyDB.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/python - -import MySQLdb -import logging -import sys - -class MyDB(object): - - host = "" - user = "" - pwd = "" - db = "" - - con = 0 - - def __init__(self,host,user,pwd,db): - self.host = host - self.user = user - self.pwd = pwd - self.db = db - - def connect(self): - if self.con == 0: - try: - con = MySQLdb.connect(self.host,self.user,self.pwd,self.db) - self.con = con - except Exception, e: - logging.fatal(e) - sys.exit(1) - - def doSQL(self,sql): - cursor = self.con.cursor() - try: - cursor.execute(sql) - self.con.commit() - return True - except Exception, e: - logging.error('Erreur durant la requete sql %s : %s' % (sql,e)) - self.con.rollback() - return False - - def doSelect(self,sql): - cursor = self.con.cursor() - try: - cursor.execute(sql) - results = cursor.fetchall() - return results - ret=[] - t=0 - for row in results: - c=0 - for field in row: - ret[t][c]=field - c=c+1 - t=t+1 - return ret - except Exception, e: - logging.error('Erreur durant la requete sql %s : %s' % (sql,e)) - return False diff --git a/Pbar.py b/Pbar.py deleted file mode 100644 index 6ff7d1d..0000000 --- a/Pbar.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/python -# coding: utf8 - -""" Progress bar """ - -import logging -import progressbar - -class Pbar(object): # pylint: disable=useless-object-inheritance - """ - Progress bar - - This class abstract a progress bar that could be enable/disable by - configuration/script parameters. - """ - - __pbar = None - __count = None - - def __init__(self, name, maxval, enabled=True): - if enabled and maxval: - self.__count = 0 - self.__pbar = progressbar.ProgressBar( - widgets=[ - name + ': ', - progressbar.Percentage(), - ' ', - progressbar.Bar(), - ' ', - progressbar.SimpleProgress(), - progressbar.ETA() - ], - maxval=maxval - ).start() - else: - logging.info(name) - - def increment(self, step=None): - """ - Increment the progress bar - - :param step: The step (optional, default: 1) - """ - if self.__pbar: - self.__count += step if step else 1 - self.__pbar.update(self.__count) - - def finish(self): - """ Finish the progress bar """ - if self.__pbar: - self.__pbar.finish() - - -if __name__ == '__main__': - # Run tests - import time - import argparse - - default_max_val = 10 - - # Options parser - parser = argparse.ArgumentParser() - - parser.add_argument( - '-v', '--verbose', - action="store_true", - dest="verbose", - help="Enable verbose mode" - ) - - parser.add_argument( - '-d', '--debug', - action="store_true", - dest="debug", - help="Enable debug mode" - ) - - parser.add_argument( - '-l', '--log-file', - action="store", - type=str, - dest="logfile", - help="Log file path" - ) - - parser.add_argument( - '-p', '--progress', - action="store_true", - dest="progress", - help="Enable progress bar" - ) - - parser.add_argument( - '-C', '--count', - action="store", - type=int, - dest="count", - help="Progress bar max value (default: %s)" % default_max_val, - default=default_max_val - ) - - options = parser.parse_args() - - # Initialize logs - logformat = '%(asctime)s - Test Pbar - %(levelname)s - %(message)s' - if options.debug: - loglevel = logging.DEBUG - elif options.verbose: - loglevel = logging.INFO - else: - loglevel = logging.WARNING - - if options.logfile: - logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat) - else: - logging.basicConfig(level=loglevel, format=logformat) - - pbar = Pbar('Test', options.count, enabled=options.progress) - - for idx in range(0, options.count): # pylint: disable=unused-variable - pbar.increment() - time.sleep(0.3) - pbar.finish() diff --git a/PgDB.py b/PgDB.py deleted file mode 100644 index 6fcc570..0000000 --- a/PgDB.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import psycopg2 -import logging -import sys -import traceback -import datetime - -class PgDB(object): - - host = "" - user = "" - pwd = "" - db = "" - - con = 0 - - date_format = '%Y-%m-%d' - datetime_format = '%Y-%m-%d %H:%M:%S' - - def __init__(self,host,user,pwd,db): - self.host = host - self.user = user - self.pwd = pwd - self.db = db - - def connect(self): - if self.con == 0: - try: - con = psycopg2.connect("dbname='%s' user='%s' host='%s' password='%s'" % (self.db,self.user,self.host,self.pwd)) - self.con = con - except Exception: - logging.fatal('An error occured during Postgresql database connection.', exc_info=1) - sys.exit(1) - - def close(self): - if self.con: - self.con.close() - - def setEncoding(self,enc): - if self.con: - try: - self.con.set_client_encoding(enc) - return True - except Exception: - logging.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1) - return False - - def doSQL(self,sql,params=None): - cursor = self.con.cursor() - try: - if params is None: - cursor.execute(sql) - else: - cursor.execute(sql,params) - self.con.commit() - return True - except Exception: - logging.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) - self.con.rollback() - return False - - def doSelect(self,sql): - cursor = self.con.cursor() - try: - cursor.execute(sql) - results = cursor.fetchall() - return results - except Exception: - logging.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) - return False - - # - # SQL helpers - # - def _quote_value(self, value): - if isinstance(value, int) or isinstance(value, float): - return unicode(value) - - if isinstance(value, str): - value = unicode(value) - elif isinstance(value, datetime.datetime): - value = unicode(self._format_datetime(value)) - elif isinstance(value, datetime.date): - value = unicode(self._format_date(value)) - - return u"'%s'" % value.replace(u"'",u"''") - - def _format_where_clauses(self, where_clauses, where_op=u'AND'): - if isinstance(where_clauses, str): - return where_clauses - elif isinstance(where_clauses, list): - return (u" %s " % where_op).join(where_clauses) - elif isinstance(where_clauses, dict): - return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses)) - logging.error('Unsupported where clauses type %s', type(where_clauses)) - return False - - def _format_datetime(self, datetime): - return datetime.strftime(self.datetime_format) - - def _format_date(self, date): - return date.strftime(self.date_format) - - def time2datetime(self, time): - return self._format_datetime(datetime.fromtimestamp(int(time))) - - def time2date(self, time): - return self._format_date(datetime.fromtimestamp(int(time))) - - def insert(self, table, values, just_try=False): - sql=u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values))) - - if just_try: - logging.debug(u"Just-try mode : execute INSERT query : %s", sql) - return True - - logging.debug(sql) - if not self.doSQL(sql): - logging.error(u"Fail to execute INSERT query (SQL : %s)" % sql) - return False - return True - - def update(self, table, values, where_clauses, where_op=u'AND', just_try=False): - where=self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql=u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where) - - if just_try: - logging.debug(u"Just-try mode : execute UPDATE query : %s", sql) - return True - - logging.debug(sql) - if not self.doSQL(sql): - logging.error(u"Fail to execute UPDATE query (SQL : %s)", sql) - return False - return True - - def delete(self, table, where_clauses, where_op=u'AND', just_try=False): - where=self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql=u"DELETE FROM %s WHERE %s" % (table, where) - - if just_try: - logging.debug(u"Just-try mode : execute DELETE query : %s", sql) - return True - - logging.debug(sql) - if not self.doSQL(sql): - logging.error(u"Fail to execute DELETE query (SQL : %s)", sql) - return False - return True - - def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None, just_try=False): - sql = u"SELECT " - if fields is None: - sql += "*" - elif isinstance(fields, str) or isinstance(fields, unicode): - sql += fields - else: - sql += u", ".join(fields) - - sql += u" FROM " + table - if where_clauses: - where=self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql += u" WHERE " + where - - if order_by: - sql += u"ORDER %s" % order_by - - return self.doSelect(sql) diff --git a/README.md b/README.md new file mode 100644 index 0000000..7da20b3 --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +# Python MyLib + +Just a set of helpers small libs to make common tasks easier in my script development. + +## Installation + +Just run `python setup.py install` + +**Note:** This project could previously use as independent python files (not as module). This old version is keep in *legacy* git branch (not maintained). + +## Include libs + +* **mylib.email.EmailClient:** An email client to forge (eventually using template) and send email via a SMTP server +* **mylib.ldap.LdapServer:** A small lib to make requesting LDAP server easier. It's also provide some helper functions to deal with LDAP date string. +* **mylib.mysql.MyDB:** An extra small lib to remember me how to interact with MySQL/MariaDB database +* **mylib.pgsql.PgDB:** An small lib to remember me how to interact with PostgreSQL database. **Warning:** The insert/update/delete/select methods demonstrate how to forge raw SQL request, but **it's a bad idea**: Prefer using prepared query. +* **mylib.opening_hours:** A set of helper functions to deal with french opening hours (including normal opening hours, exceptional closure and nonworking public holidays). +* **mylib.pbar.Pbar:** A small lib for progress bar +* **mylib.report.Report:** A small lib to implement logging based email report send at exit + +To know how to use these libs, you can take look on *mylib.scripts* content or in *tests* directory. + +## Copyright + +Copyright (c) 2013-2021 Benjamin Renard + +## License + +This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 3 as published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. diff --git a/mylib/__init__.py b/mylib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/EmailClient.py b/mylib/email.py similarity index 99% rename from EmailClient.py rename to mylib/email.py index ecc9e6e..4c90952 100644 --- a/EmailClient.py +++ b/mylib/email.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -""" Email client """ +""" Email helpers """ import logging import os diff --git a/LdapServer.py b/mylib/ldap.py similarity index 79% rename from LdapServer.py rename to mylib/ldap.py index 63a5afc..fdb3908 100644 --- a/LdapServer.py +++ b/mylib/ldap.py @@ -31,7 +31,7 @@ class LdapServer(object): # pylint: disable=useless-object-inheritance if logger: self.logger = logger else: - self.logger = logging.getLogger() + self.logger = logging.getLogger(__name__) def _error(self,error,level=logging.WARNING): if self.raiseOnError: @@ -401,56 +401,3 @@ def format_date(value, from_timezone=None, to_timezone=None, naive=None): """ assert isinstance(value, datetime.date), 'First parameter must be an datetime.date object (not %s)' % type(value) return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive) - -# -# Tests -# -if __name__ == '__main__': - now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal()) - print("Now = %s" % now) - - datestring_now = format_datetime(now) - print("format_datetime : %s" % datestring_now) - print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc)) - print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) - print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local')) - print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris')) - print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc)) - print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal())) - print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local')) - print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo')) - print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True)) - - print("format_date : %s" % format_date(now)) - print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc)) - print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) - print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local')) - print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris')) - print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc)) - print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal())) - print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local')) - print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo')) - print("format_date (naive=True) : %s" % format_date(now, naive=True)) - - - print("parse_datetime : %s" % parse_datetime(datestring_now)) - print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc)) - print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) - print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local')) - print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris')) - print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc)) - print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal())) - print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local')) - print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo')) - print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True)) - - print("parse_date : %s" % parse_date(datestring_now)) - print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc)) - print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) - print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local')) - print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris')) - print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc)) - print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal())) - print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local')) - print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo')) - print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True)) diff --git a/mylib/mysql.py b/mylib/mysql.py new file mode 100644 index 0000000..0108afc --- /dev/null +++ b/mylib/mysql.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +""" MySQL client """ + +import logging +import sys + +import MySQLdb + +log = logging.getLogger(__name__) + +class MyDB: + """ MySQL client """ + + host = "" + user = "" + pwd = "" + db = "" + + con = 0 + + def __init__(self, host, user, pwd, db): + self.host = host + self.user = user + self.pwd = pwd + self.db = db + + def connect(self): + """ Connect to MySQL server """ + if self.con == 0: + try: + con = MySQLdb.connect(self.host, self.user, self.pwd, self.db) + self.con = con + except Exception: + log.fatal('Error connecting to MySQL server', exc_info=True) + sys.exit(1) + + def doSQL(self,sql): + """ Run INSERT/UPDATE/DELETE/... SQL query """ + cursor = self.con.cursor() + try: + cursor.execute(sql) + self.con.commit() + return True + except Exception: + log.error('Error during SQL request "%s"', sql, exc_info=True) + self.con.rollback() + return False + + def doSelect(self, sql): + """ Run SELECT SQL query and return result as dict """ + cursor = self.con.cursor() + try: + cursor.execute(sql) + return cursor.fetchall() + except Exception: + log.error('Error during SQL request "%s"', sql, exc_info=True) + return False diff --git a/mylib/opening_hours.py b/mylib/opening_hours.py new file mode 100644 index 0000000..02781bf --- /dev/null +++ b/mylib/opening_hours.py @@ -0,0 +1,217 @@ +import datetime +import re +import time +import logging + +log = logging.getLogger(__name__) + +week_days = ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche'] +date_format = '%d/%m/%Y' +date_pattern = re.compile('^([0-9]{2})/([0-9]{2})/([0-9]{4})$') +time_pattern = re.compile('^([0-9]{1,2})h([0-9]{2})?$') + +def easter_date(year): + a = year//100 + b = year%100 + c = (3*(a+25))//4 + d = (3*(a+25))%4 + e = (8*(a+11))//25 + f = (5*a+b)%19 + g = (19*f+c-e)%30 + h = (f+11*g)//319 + j = (60*(5-d)+b)//4 + k = (60*(5-d)+b)%4 + m = (2*j-k-g+h)%7 + n = (g-h+m+114)//31 + p = (g-h+m+114)%31 + day = p+1 + month = n + return datetime.date(year, month, day) + +def nonworking_french_public_days_of_the_year(year=None): + if year is None: + year=datetime.date.today().year + dp=easter_date(year) + return { + '1janvier': datetime.date(year, 1, 1), + 'paques': dp, + 'lundi_paques': (dp+datetime.timedelta(1)), + '1mai': datetime.date(year, 5, 1), + '8mai': datetime.date(year, 5, 8), + 'jeudi_ascension': (dp+datetime.timedelta(39)), + 'pentecote': (dp+datetime.timedelta(49)), + 'lundi_pentecote': (dp+datetime.timedelta(50)), + '14juillet': datetime.date(year, 7, 14), + '15aout': datetime.date(year, 8, 15), + '1novembre': datetime.date(year, 11, 1), + '11novembre': datetime.date(year, 11, 11), + 'noel': datetime.date(year, 12, 25), + 'saint_etienne': datetime.date(year, 12, 26), + } + +def parse_exceptional_closures(values): + exceptional_closures=[] + for value in values: + days = [] + hours_periods = [] + words = value.strip().split() + for word in words: + if not word: + continue + parts = word.split('-') + if len(parts) == 1: + # ex : 31/02/2017 + ptime = time.strptime(word, date_format) + date = datetime.date(ptime.tm_year, ptime.tm_mon, ptime.tm_mday) + if date not in days: + days.append(date) + elif len(parts) == 2: + # ex : 18/12/2017-20/12/2017 ou 9h-10h30 + if date_pattern.match(parts[0]) and date_pattern.match(parts[1]): + # ex : 18/12/2017-20/12/2017 + pstart = time.strptime(parts[0], date_format) + pstop = time.strptime(parts[1], date_format) + if pstop <= pstart: + raise ValueError('Day %s <= %s' % (parts[1], parts[0])) + + date = datetime.date(pstart.tm_year, pstart.tm_mon, pstart.tm_mday) + stop_date = datetime.date(pstop.tm_year, pstop.tm_mon, pstop.tm_mday) + while date <= stop_date: + if date not in days: + days.append(date) + date += datetime.timedelta(days=1) + else: + # ex : 9h-10h30 + mstart = time_pattern.match(parts[0]) + mstop = time_pattern.match(parts[1]) + if not mstart or not mstop: + raise ValueError('"%s" is not a valid time period' % word) + hstart = datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0)) + hstop = datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0)) + if hstop <= hstart: + raise ValueError('Time %s <= %s' % (parts[1], parts[0])) + hours_periods.append({'start': hstart, 'stop': hstop}) + else: + raise ValueError('Invalid number of part in this word : "%s"' % word) + if not days: + raise ValueError('No days found in value "%s"' % value) + exceptional_closures.append({'days': days, 'hours_periods': hours_periods}) + return exceptional_closures + + +def parse_normal_opening_hours(values): + normal_opening_hours=[] + for value in values: + days=[] + hours_periods=[] + words=value.strip().split() + for word in words: + if word=='': + continue + parts=word.split('-') + if len(parts)==1: + # ex : jeudi + if word not in week_days: + raise ValueError('"%s" is not a valid week day' % word) + if word not in days: + days.append(word) + elif len(parts)==2: + # ex : lundi-jeudi ou 9h-10h30 + if parts[0] in week_days and parts[1] in week_days: + # ex : lundi-jeudi + if week_days.index(parts[1]) <= week_days.index(parts[0]): + raise ValueError('"%s" is before "%s"' % (parts[1],parts[0])) + started=False + for d in week_days: + if not started and d!=parts[0]: + continue + started=True + if d not in days: + days.append(d) + if d==parts[1]: + break + else: + #ex : 9h-10h30 + mstart=time_pattern.match(parts[0]) + mstop=time_pattern.match(parts[1]) + if not mstart or not mstop: + raise ValueError('"%s" is not a valid time period' % word) + hstart=datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0)) + hstop=datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0)) + if hstop<=hstart: + raise ValueError('Time %s <= %s' % (parts[1],parts[0])) + hours_periods.append({'start': hstart, 'stop': hstop}) + else: + raise ValueError('Invalid number of part in this word : "%s"' % word) + if not days and not hours_periods: + raise ValueError('No days or hours period found in this value : "%s"' % value) + normal_opening_hours.append({'days': days, 'hours_periods': hours_periods}) + return normal_opening_hours + +def is_closed(normal_opening_hours_values=None, exceptional_closures_values=None, + nonworking_public_holidays_values=None, exceptional_closure_on_nonworking_public_days=False, + when=None, on_error='raise'): + if not when: + when = datetime.datetime.now() + when_date=when.date() + when_time=when.time() + when_weekday=week_days[when.timetuple().tm_wday] + on_error_result=None + if on_error=='closed': + on_error_result={'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False} + elif on_error=='opened': + on_error_result={'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} + + log.debug("When = %s => date = %s / time = %s / week day = %s", when, when_date, when_time, when_weekday) + if nonworking_public_holidays_values: + log.debug("Nonworking public holidays: %s", nonworking_public_holidays_values) + nonworking_days=nonworking_french_public_days_of_the_year() + for day in nonworking_public_holidays_values: + if day in nonworking_days and when_date==nonworking_days[day]: + log.debug("Non working day: %s", day) + return {'closed': True, 'exceptional_closure': exceptional_closure_on_nonworking_public_days, 'exceptional_closure_all_day': exceptional_closure_on_nonworking_public_days} + + if len(exceptional_closures_values)>0: + try: + exceptional_closures=parse_exceptional_closures(exceptional_closures_values) + log.debug('Exceptional closures: %s', exceptional_closures) + except Exception as e: + log.error("Fail to parse exceptional closures, consider as closed", exc_info=True) + if on_error_result is None: + raise e from e + return on_error_result + for cl in exceptional_closures: + if when_date not in cl['days']: + log.debug("when_date (%s) no in days (%s)", when_date, cl['days']) + continue + if not cl['hours_periods']: + # All day exceptional closure + return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': True} + for hp in cl['hours_periods']: + if hp['start'] <= when_time <= hp['stop']: + return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': False} + + if normal_opening_hours_values: + try: + normal_opening_hours=parse_normal_opening_hours(normal_opening_hours_values) + log.debug('Normal opening hours: %s', normal_opening_hours) + except Exception: + log.error("Fail to parse normal opening hours, consider as closed", exc_info=True) + if on_error_result is None: + raise e from e + return on_error_result + for oh in normal_opening_hours: + if oh['days'] and when_weekday not in oh['days']: + log.debug("when_weekday (%s) no in days (%s)", when_weekday, oh['days']) + continue + if not oh['hours_periods']: + # All day opened + return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} + for hp in oh['hours_periods']: + if hp['start'] <= when_time <= hp['stop']: + return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} + log.debug("Not in normal opening hours => closed") + return {'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False} + + # Not a nonworking day, not during exceptional closure and no normal opening hours defined => Opened + return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} diff --git a/mylib/pbar.py b/mylib/pbar.py new file mode 100644 index 0000000..13dd30c --- /dev/null +++ b/mylib/pbar.py @@ -0,0 +1,53 @@ +# coding: utf8 + +""" Progress bar """ + +import logging +import progressbar + + +log = logging.getLogger(__name__) + +class Pbar(object): # pylint: disable=useless-object-inheritance + """ + Progress bar + + This class abstract a progress bar that could be enable/disable by + configuration/script parameters. + """ + + __pbar = None + __count = None + + def __init__(self, name, maxval, enabled=True): + if enabled and maxval: + self.__count = 0 + self.__pbar = progressbar.ProgressBar( + widgets=[ + name + ': ', + progressbar.Percentage(), + ' ', + progressbar.Bar(), + ' ', + progressbar.SimpleProgress(), + progressbar.ETA() + ], + maxval=maxval + ).start() + else: + log.info(name) + + def increment(self, step=None): + """ + Increment the progress bar + + :param step: The step (optional, default: 1) + """ + if self.__pbar: + self.__count += step if step else 1 + self.__pbar.update(self.__count) + + def finish(self): + """ Finish the progress bar """ + if self.__pbar: + self.__pbar.finish() diff --git a/mylib/pgsql.py b/mylib/pgsql.py new file mode 100644 index 0000000..2d0aa57 --- /dev/null +++ b/mylib/pgsql.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- + +""" PostgreSQL client """ + +import datetime +import logging +import sys + +import psycopg2 + +log = logging.getLogger(__name__) + +class PgDB: + """ PostgreSQL client """ + + host = "" + user = "" + pwd = "" + db = "" + + con = 0 + + date_format = '%Y-%m-%d' + datetime_format = '%Y-%m-%d %H:%M:%S' + + def __init__(self, host, user, pwd, db, just_try=False): + self.host = host + self.user = user + self.pwd = pwd + self.db = db + self.just_try = just_try + + def connect(self): + """ Connect to PostgreSQL server """ + if self.con == 0: + try: + con = psycopg2.connect("dbname='%s' user='%s' host='%s' password='%s'" % (self.db,self.user,self.host,self.pwd)) + self.con = con + except Exception: + log.fatal('An error occured during Postgresql database connection.', exc_info=1) + sys.exit(1) + + def close(self): + """ Close connection with PostgreSQL server (if opened) """ + if self.con: + self.con.close() + + def setEncoding(self, enc): + """ Set connection encoding """ + if self.con: + try: + self.con.set_client_encoding(enc) + return True + except Exception: + log.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1) + return False + + def doSQL(self, sql, params=None): + """ Run SELECT SQL query and return result as dict """ + if self.just_try: + log.debug(u"Just-try mode : do not really execute SQL query '%s'", sql) + return True + + cursor = self.con.cursor() + try: + if params is None: + cursor.execute(sql) + else: + cursor.execute(sql, params) + self.con.commit() + return True + except Exception: + log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) + self.con.rollback() + return False + + def doSelect(self, sql): + """ Run SELECT SQL query and return result as dict """ + cursor = self.con.cursor() + try: + cursor.execute(sql) + results = cursor.fetchall() + return results + except Exception: + log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) + return False + + # + # SQL helpers + # + def _quote_value(self, value): + """ Quote a value for SQL query """ + if isinstance(value, (int, float)): + return str(value) + + if isinstance(value, datetime.datetime): + value = self._format_datetime(value) + elif isinstance(value, datetime.date): + value = self._format_date(value) + + return u"'%s'" % value.replace(u"'", u"''") + + def _format_where_clauses(self, where_clauses, where_op=u'AND'): + """ Format WHERE clauses """ + if isinstance(where_clauses, str): + return where_clauses + if isinstance(where_clauses, list): + return (u" %s " % where_op).join(where_clauses) + if isinstance(where_clauses, dict): + return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses)) + log.error('Unsupported where clauses type %s', type(where_clauses)) + return False + + def _format_datetime(self, value): + """ Format datetime object as string """ + assert isinstance(value, datetime.datetime) + return value.strftime(self.datetime_format) + + def _format_date(self, value): + """ Format date object as string """ + assert isinstance(value, (datetime.date, datetime.datetime)) + return value.strftime(self.date_format) + + def time2datetime(self, time): + """ Convert timestamp to datetime string """ + return self._format_datetime(datetime.datetime.fromtimestamp(int(time))) + + def time2date(self, time): + """ Convert timestamp to date string """ + return self._format_date(datetime.date.fromtimestamp(int(time))) + + def insert(self, table, values, just_try=False): + """ Run INSERT SQL query """ + sql=u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values))) + + if just_try: + log.debug(u"Just-try mode : execute INSERT query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql): + log.error(u"Fail to execute INSERT query (SQL : %s)", sql) + return False + return True + + def update(self, table, values, where_clauses, where_op=u'AND', just_try=False): + """ Run UPDATE SQL query """ + where=self._format_where_clauses(where_clauses, where_op=where_op) + if not where: + return False + + sql=u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where) + + if just_try: + log.debug(u"Just-try mode : execute UPDATE query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql): + log.error(u"Fail to execute UPDATE query (SQL : %s)", sql) + return False + return True + + def delete(self, table, where_clauses, where_op=u'AND', just_try=False): + """ Run DELETE SQL query """ + where=self._format_where_clauses(where_clauses, where_op=where_op) + if not where: + return False + + sql=u"DELETE FROM %s WHERE %s" % (table, where) + + if just_try: + log.debug(u"Just-try mode : execute DELETE query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql): + log.error(u"Fail to execute DELETE query (SQL : %s)", sql) + return False + return True + + def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None): + """ Run SELECT SQL query """ + sql = u"SELECT " + if fields is None: + sql += "*" + elif isinstance(fields, str): + sql += fields + else: + sql += u", ".join(fields) + + sql += u" FROM " + table + if where_clauses: + where=self._format_where_clauses(where_clauses, where_op=where_op) + if not where: + return False + + sql += u" WHERE " + where + + if order_by: + sql += u"ORDER %s" % order_by + + return self.doSelect(sql) diff --git a/Report.py b/mylib/report.py similarity index 87% rename from Report.py rename to mylib/report.py index b8f71ea..1bb35ce 100644 --- a/Report.py +++ b/mylib/report.py @@ -1,4 +1,3 @@ -#!/usr/bin/python # coding: utf8 """ Report """ @@ -7,6 +6,8 @@ import atexit import logging +log = logging.getLogger(__name__) + class Report(object): # pylint: disable=useless-object-inheritance """ Logging report """ @@ -43,13 +44,13 @@ class Report(object): # pylint: disable=useless-object-inheritance def send(self, subject=None, rcpt_to=None, email_client=None, just_try=False): """ Send report using an EmailClient """ if not self.rcpt_to and not rcpt_to: - logging.debug('No report recipient, do not send report') + log.debug('No report recipient, do not send report') return True assert self.subject or subject, "You must provide report subject using Report.__init__ or Report.send" assert self.email_client or email_client, "You must provide email client using Report.__init__ or Report.send" content = self.get_content() if not content: - logging.debug('Report is empty, do not send it') + log.debug('Report is empty, do not send it') return True msg = email_client.forge_message( self.rcpt_to or rcpt_to, @@ -57,9 +58,9 @@ class Report(object): # pylint: disable=useless-object-inheritance text_body=content ) if email_client.send(self.rcpt_to or rcpt_to, msg=msg, just_try=just_try): - logging.debug('Report sent to %s', self.rcpt_to or rcpt_to) + log.debug('Report sent to %s', self.rcpt_to or rcpt_to) return True - logging.error('Fail to send report to %s', self.rcpt_to or rcpt_to) + log.error('Fail to send report to %s', self.rcpt_to or rcpt_to) return False def send_at_exit(self, **kwargs): diff --git a/mylib/scripts/email_test.py b/mylib/scripts/email_test.py new file mode 100644 index 0000000..a2237c9 --- /dev/null +++ b/mylib/scripts/email_test.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- + +""" Test Email client """ +import datetime +import logging +import sys + +import argparse +import getpass + +from mylib.scripts.helpers import get_opts_parser, add_email_opts +from mylib.scripts.helpers import init_logging, init_email_client + + +log = logging.getLogger('mylib.scripts.email_test') + +def main(argv=None): #pylint: disable=too-many-locals,too-many-statements + """ Script main """ + if argv is None: + argv = sys.argv[1:] + + # Options parser + parser = get_opts_parser(just_try=True) + add_email_opts(parser) + + test_opts = parser.add_argument_group('Test email options') + + test_opts.add_argument( + '-t', '--to', + action="store", + type=str, + dest="test_to", + help="Test email recipient", + ) + + test_opts.add_argument( + '-m', '--mako', + action="store_true", + dest="test_mako", + help="Test mako templating", + ) + + options = parser.parse_args() + + if not options.test_to: + parser.error('You must specify test email recipient using -t/--to parameter') + sys.exit(1) + + # Initialize logs + init_logging(options, 'Test EmailClient') + + if options.email_smtp_user and not options.email_smtp_password: + options.email_smtp_password = getpass.getpass('Please enter SMTP password: ') + + log.info('Initialize Email client') + email_client = init_email_client( + options, + templates=dict( + test=dict( + subject="Test email", + text=( + "Just a test email sent at {sent_date}." if not options.test_mako else + MakoTemplate("Just a test email sent at ${sent_date}.") + ), + html=( + "Just a test email. (sent at {sent_date})" if not options.test_mako else + MakoTemplate("Just a test email. (sent at ${sent_date})") + ) + ) + ) + ) + + log.info('Send a test email to %s', options.test_to) + if email_client.send(options.test_to, template='test', sent_date=datetime.datetime.now()): + log.info('Test email sent') + sys.exit(0) + log.error('Fail to send test email') + sys.exit(1) diff --git a/mylib/scripts/helpers.py b/mylib/scripts/helpers.py new file mode 100644 index 0000000..0c9e7a7 --- /dev/null +++ b/mylib/scripts/helpers.py @@ -0,0 +1,190 @@ +# coding: utf8 + +""" Scripts helpers """ + +import argparse +import logging + +from mylib.email import EmailClient + +log = logging.getLogger(__name__) + +def init_logging(options, name, report=None): + """ Initialize logs """ + logformat = '%(asctime)s - ' + name + ' - %(levelname)s - %(message)s' + if options.debug: + loglevel = logging.DEBUG + elif options.verbose: + loglevel = logging.INFO + else: + loglevel = logging.WARNING + + handlers = [] + if options.logfile: + handlers.append(logging.FileHandler(options.logfile)) + if not options.logfile or options.console: + handlers.append(logging.StreamHandler()) + if report: + handlers.append(report.get_handler()) + logging.basicConfig(level=loglevel, format=logformat, handlers=handlers) + + +def get_opts_parser(just_try=False, progress=False): + """ Retrieve options parser """ + parser = argparse.ArgumentParser() + + parser.add_argument( + '-v', '--verbose', + action="store_true", + dest="verbose", + help="Enable verbose mode" + ) + + parser.add_argument( + '-d', '--debug', + action="store_true", + dest="debug", + help="Enable debug mode" + ) + + parser.add_argument( + '-l', '--log-file', + action="store", + type=str, + dest="logfile", + help="Log file path" + ) + + parser.add_argument( + '-C', '--console', + action="store_true", + dest="console", + help="Always log on console (even if log file is configured)" + ) + + if just_try: + parser.add_argument( + '-j', '--just-try', + action="store_true", + dest="just_try", + help="Enable just-try mode" + ) + + if progress: + parser.add_argument( + '-p', '--progress', + action="store_true", + dest="progress", + help="Enable progress bar" + ) + + return parser + + +def add_email_opts(parser): + """ Add email options """ + email_opts = parser.add_argument_group('Email options') + + email_opts.add_argument( + '-H', '--smtp-host', + action="store", + type=str, + dest="email_smtp_host", + help="SMTP host" + ) + + email_opts.add_argument( + '-P', '--smtp-port', + action="store", + type=int, + dest="email_smtp_port", + help="SMTP port" + ) + + email_opts.add_argument( + '-S', '--smtp-ssl', + action="store_true", + dest="email_smtp_ssl", + help="Use SSL" + ) + + email_opts.add_argument( + '-T', '--smtp-tls', + action="store_true", + dest="email_smtp_tls", + help="Use TLS" + ) + + email_opts.add_argument( + '-u', '--smtp-user', + action="store", + type=str, + dest="email_smtp_user", + help="SMTP username" + ) + + email_opts.add_argument( + '-p', '--smtp-password', + action="store", + type=str, + dest="email_smtp_password", + help="SMTP password" + ) + + email_opts.add_argument( + '-D', '--smtp-debug', + action="store_true", + dest="email_smtp_debug", + help="Debug SMTP connection" + ) + + email_opts.add_argument( + '-e', '--email-encoding', + action="store", + type=str, + dest="email_encoding", + help="SMTP encoding" + ) + + email_opts.add_argument( + '-f', '--sender-name', + action="store", + type=str, + dest="email_sender_name", + help="Sender name" + ) + + email_opts.add_argument( + '-F', '--sender-email', + action="store", + type=str, + dest="email_sender_email", + help="Sender email" + ) + + email_opts.add_argument( + '-c', '--catch-all', + action="store", + type=str, + dest="email_catch_all", + help="Catch all sent email: specify catch recipient email address" + ) + + +def init_email_client(options, **kwargs): + log.info('Initialize Email client') + return EmailClient( + smtp_host=options.email_smtp_host, + smtp_port=options.email_smtp_port, + smtp_ssl=options.email_smtp_ssl, + smtp_tls=options.email_smtp_tls, + smtp_user=options.email_smtp_user, + smtp_password=options.email_smtp_password, + smtp_debug=options.email_smtp_debug, + sender_name=options.email_sender_name, + sender_email=options.email_sender_email, + catch_all_addr=options.email_catch_all, + just_try=options.just_try, + encoding=options.email_encoding, + **kwargs + ) diff --git a/mylib/scripts/ldap_test.py b/mylib/scripts/ldap_test.py new file mode 100644 index 0000000..145fbb8 --- /dev/null +++ b/mylib/scripts/ldap_test.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- + +""" Test LDAP """ +import datetime +import logging +import sys + +import dateutil.tz +import pytz + +from mylib.ldap import format_datetime,format_date, parse_datetime, parse_date +from mylib.scripts.helpers import get_opts_parser +from mylib.scripts.helpers import init_logging + + +log = logging.getLogger('mylib.scripts.ldap_test') + +def main(argv=None): #pylint: disable=too-many-locals,too-many-statements + """ Script main """ + if argv is None: + argv = sys.argv[1:] + + # Options parser + parser = get_opts_parser(just_try=True) + options = parser.parse_args() + + now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal()) + print("Now = %s" % now) + + datestring_now = format_datetime(now) + print("format_datetime : %s" % datestring_now) + print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc)) + print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) + print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local')) + print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris')) + print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc)) + print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal())) + print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local')) + print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo')) + print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True)) + + print("format_date : %s" % format_date(now)) + print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc)) + print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) + print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local')) + print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris')) + print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc)) + print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal())) + print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local')) + print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo')) + print("format_date (naive=True) : %s" % format_date(now, naive=True)) + + + print("parse_datetime : %s" % parse_datetime(datestring_now)) + print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc)) + print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) + print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local')) + print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris')) + print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc)) + print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal())) + print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local')) + print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo')) + print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True)) + + print("parse_date : %s" % parse_date(datestring_now)) + print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc)) + print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) + print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local')) + print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris')) + print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc)) + print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal())) + print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local')) + print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo')) + print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True)) diff --git a/mylib/scripts/pbar_test.py b/mylib/scripts/pbar_test.py new file mode 100644 index 0000000..703924d --- /dev/null +++ b/mylib/scripts/pbar_test.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +""" Test Progress bar """ +import logging +import time +import sys + +from mylib.pbar import Pbar +from mylib.scripts.helpers import get_opts_parser +from mylib.scripts.helpers import init_logging + + +log = logging.getLogger('mylib.scripts.pbar_test') + +def main(argv=None): #pylint: disable=too-many-locals,too-many-statements + """ Script main """ + if argv is None: + argv = sys.argv[1:] + + # Options parser + default_max_val = 10 + parser = get_opts_parser(progress=True) + + parser.add_argument( + '-c', '--count', + action="store", + type=int, + dest="count", + help="Progress bar max value (default: %s)" % default_max_val, + default=default_max_val + ) + + options = parser.parse_args() + + # Initialize logs + init_logging(options, 'Test Pbar') + + pbar = Pbar('Test', options.count, enabled=options.progress) + + for idx in range(0, options.count): # pylint: disable=unused-variable + pbar.increment() + time.sleep(0.3) + pbar.finish() diff --git a/mylib/scripts/report_test.py b/mylib/scripts/report_test.py new file mode 100644 index 0000000..48f78ad --- /dev/null +++ b/mylib/scripts/report_test.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +""" Test report """ +import logging +import sys + +from mylib.report import Report +from mylib.scripts.helpers import get_opts_parser, add_email_opts +from mylib.scripts.helpers import init_logging, init_email_client + + +log = logging.getLogger('mylib.scripts.report_test') + +def main(argv=None): #pylint: disable=too-many-locals,too-many-statements + """ Script main """ + if argv is None: + argv = sys.argv[1:] + + # Options parser + parser = get_opts_parser(just_try=True) + add_email_opts(parser) + + report_opts = parser.add_argument_group('Report options') + + report_opts.add_argument( + '-t', '--to', + action="store", + type=str, + dest="report_rcpt", + help="Send report to this email" + ) + + options = parser.parse_args() + + if not options.report_rcpt: + parser.error("You must specify a report recipient using -t/--to parameter") + + # Initialize logs + report = Report(rcpt_to=options.report_rcpt, subject='Test report') + init_logging(options, 'Test Report', report=report) + + email_client = init_email_client(options) + report.send_at_exit(email_client=email_client) + + logging.debug('Test debug message') + logging.info('Test info message') + logging.warning('Test warning message') + logging.error('Test error message') diff --git a/opening_hours.py b/opening_hours.py deleted file mode 100644 index cd50341..0000000 --- a/opening_hours.py +++ /dev/null @@ -1,207 +0,0 @@ -import datetime, re, time, logging - -week_days=['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi', 'samedi', 'dimanche'] -def easter_date(year): - a=year//100 - b=year%100 - c=(3*(a+25))//4 - d=(3*(a+25))%4 - e=(8*(a+11))//25 - f=(5*a+b)%19 - g=(19*f+c-e)%30 - h=(f+11*g)//319 - j=(60*(5-d)+b)//4 - k=(60*(5-d)+b)%4 - m=(2*j-k-g+h)%7 - n=(g-h+m+114)//31 - p=(g-h+m+114)%31 - day=p+1 - month=n - return datetime.date(year, month, day) - -def nonworking_french_public_days_of_the_year(year=None): - if year is None: - year=datetime.date.today().year - dp=easter_date(year) - return { - '1janvier': datetime.date(year, 1, 1), - 'paques': dp, - 'lundi_paques': (dp+datetime.timedelta(1)), - '1mai': datetime.date(year, 5, 1), - '8mai': datetime.date(year, 5, 8), - 'jeudi_ascension': (dp+datetime.timedelta(39)), - 'pentecote': (dp+datetime.timedelta(49)), - 'lundi_pentecote': (dp+datetime.timedelta(50)), - '14juillet': datetime.date(year, 7, 14), - '15aout': datetime.date(year, 8, 15), - '1novembre': datetime.date(year, 11, 1), - '11novembre': datetime.date(year, 11, 11), - 'noel': datetime.date(year, 12, 25), - 'saint_etienne': datetime.date(year, 12, 26), - } - -def parse_exceptional_closures(values): - exceptional_closures=[] - date_pattern=re.compile('^([0-9]{2})/([0-9]{2})/([0-9]{4})$') - time_pattern=re.compile('^([0-9]{1,2})h([0-9]{2})?$') - for value in values: - days=[] - hours_periods=[] - words=value.strip().split() - for word in words: - if word=='': - continue - parts=word.split('-') - if len(parts)==1: - # ex : 31/02/2017 - ptime=time.strptime(word,'%d/%m/%Y') - date=datetime.date(ptime.tm_year, ptime.tm_mon, ptime.tm_mday) - if date not in days: - days.append(date) - elif len(parts)==2: - # ex : 18/12/2017-20/12/2017 ou 9h-10h30 - if date_pattern.match(parts[0]) and date_pattern.match(parts[1]): - # ex : 18/12/2017-20/12/2017 - pstart=time.strptime(parts[0],'%d/%m/%Y') - pstop=time.strptime(parts[1],'%d/%m/%Y') - if pstop<=pstart: - raise ValueError('Day %s <= %s' % (parts[1],parts[0])) - - date=datetime.date(pstart.tm_year, pstart.tm_mon, pstart.tm_mday) - stop_date=datetime.date(pstop.tm_year, pstart.tm_mon, pstart.tm_mday) - while date<=stop_date: - if date not in days: - days.append(date) - date+=datetime.timedelta(days=1) - else: - # ex : 9h-10h30 - mstart=time_pattern.match(parts[0]) - mstop=time_pattern.match(parts[1]) - if not mstart or not mstop: - raise ValueError('"%s" is not a valid time period' % word) - hstart=datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0)) - hstop=datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0)) - if hstop<=hstart: - raise ValueError('Time %s <= %s' % (parts[1],parts[0])) - hours_periods.append({'start': hstart, 'stop': hstop}) - else: - raise ValueError('Invalid number of part in this word : "%s"' % word) - if not days: - raise ValueError('No days found in value "%s"' % word) - exceptional_closures.append({'days': days, 'hours_periods': hours_periods}) - return exceptional_closures - - -def parse_normal_opening_hours(values): - normal_opening_hours=[] - time_pattern=re.compile('^([0-9]{1,2})h([0-9]{2})?$') - for value in values: - days=[] - hours_periods=[] - words=value.strip().split() - for word in words: - if word=='': - continue - parts=word.split('-') - if len(parts)==1: - # ex : jeudi - if word not in week_days: - raise ValueError('"%s" is not a valid week day' % word) - if word not in days: - days.append(word) - elif len(parts)==2: - # ex : lundi-jeudi ou 9h-10h30 - if parts[0] in week_days and parts[1] in week_days: - # ex : lundi-jeudi - if week_days.index(parts[1]) <= week_days.index(parts[0]): - raise ValueError('"%s" is before "%s"' % (parts[1],parts[0])) - started=False - for d in week_days: - if not started and d!=parts[0]: - continue - started=True - if d not in days: - days.append(d) - if d==parts[1]: - break - else: - #ex : 9h-10h30 - mstart=time_pattern.match(parts[0]) - mstop=time_pattern.match(parts[1]) - if not mstart or not mstop: - raise ValueError('"%s" is not a valid time period' % word) - hstart=datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0)) - hstop=datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0)) - if hstop<=hstart: - raise ValueError('Time %s <= %s' % (parts[1],parts[0])) - hours_periods.append({'start': hstart, 'stop': hstop}) - else: - raise ValueError('Invalid number of part in this word : "%s"' % word) - if not days and not hours_periods: - raise ValueError('No days or hours period found in this value : "%s"' % value) - normal_opening_hours.append({'days': days, 'hours_periods': hours_periods}) - return normal_opening_hours - -def is_closed(normal_opening_hours_values=[],exceptional_closures_values=[],nonworking_public_holidays_values=[], when=datetime.datetime.now(), on_error='raise', exceptional_closure_on_nonworking_public_days=False): - when_date=when.date() - when_time=when.time() - when_weekday=week_days[when.timetuple().tm_wday] - on_error_result=None - if on_error=='closed': - on_error_result={'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False} - elif on_error=='opened': - on_error_result={'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} - - logging.debug("%s => %s / %s / %s" % (when, when_date, when_time, when_weekday)) - if len(nonworking_public_holidays_values)>0: - logging.debug("Nonworking public holidays : %s" % nonworking_public_holidays_values) - nonworking_days=nonworking_french_public_days_of_the_year() - for day in nonworking_public_holidays_values: - if day in nonworking_days and when_date==nonworking_days[day]: - logging.debug("Non working day : %s" % day) - return {'closed': True, 'exceptional_closure': exceptional_closure_on_nonworking_public_days, 'exceptional_closure_all_day': exceptional_closure_on_nonworking_public_days} - - if len(exceptional_closures_values)>0: - try: - exceptional_closures=parse_exceptional_closures(exceptional_closures_values) - logging.debug('Exceptional closures : %s' % exceptional_closures) - except Exception, e: - logging.error("%s => Not closed by default" % e) - if on_error_result is None: - raise e - return on_error_result - for cl in exceptional_closures: - if when_date not in cl['days']: - logging.debug("when_date (%s) no in days (%s)" % (when_date,cl['days'])) - continue - if not cl['hours_periods']: - # All day exceptional closure - return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': True} - for hp in cl['hours_periods']: - if hp['start']<=when_time and hp['stop']>= when_time: - return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': False} - - if len(normal_opening_hours_values)>0: - try: - normal_opening_hours=parse_normal_opening_hours(normal_opening_hours_values) - logging.debug('Normal opening hours : %s' % normal_opening_hours) - except Exception, e: - logging.error("%s => Not closed by default" % e) - if on_error_result is None: - raise e - return on_error_result - for oh in normal_opening_hours: - if oh['days'] and when_weekday not in oh['days']: - logging.debug("when_weekday (%s) no in days (%s)" % (when_weekday,oh['days'])) - continue - if not oh['hours_periods']: - # All day opened - return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} - for hp in oh['hours_periods']: - if hp['start']<=when_time and hp['stop']>= when_time: - return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} - logging.debug("Not in normal opening hours => closed") - return {'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False} - - # Not a nonworking day, not during exceptional closure and no normal opening hours defined => Opened - return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..795344a --- /dev/null +++ b/setup.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +from setuptools import find_packages +from setuptools import setup + + +here = os.path.abspath(os.path.dirname(__file__)) +with open(os.path.join(here, 'README.md')) as f: + README = f.read() + +setup( + name="mylib", + version='0.0', + long_description=README, + classifiers=[ + 'Programming Language :: Python', + ], + install_requires=[ + 'email', + 'mako', + 'mysqlclient', + 'progressbar', + 'psycopg2', + 'python-dateutil', + 'python-ldap', + 'pytz', + ], + extras_require={ + 'dev': [ + 'pytest', + 'pylint', + ], + }, + author='Benjamin Renard', + author_email='brenard@zionetrix.net', + url='https://gogs.zionetrix.net/bn8/python-mylib', + packages=find_packages(), + include_package_data=True, + zip_safe=False, + entry_points={ + 'console_scripts': [ + 'mylib-test-email = mylib.scripts.email_test:main', + 'mylib-test-pbar = mylib.scripts.pbar_test:main', + 'mylib-test-report = mylib.scripts.report_test:main', + 'mylib-test-ldap = mylib.scripts.ldap_test:main', + ], + }, +) diff --git a/tests/test_opening_hours.py b/tests/test_opening_hours.py new file mode 100644 index 0000000..f55563d --- /dev/null +++ b/tests/test_opening_hours.py @@ -0,0 +1,286 @@ +""" Tests on opening hours helpers """ + +import datetime +import pytest + +from mylib import opening_hours + +# +# Test on parse_exceptional_closures() +# + + +def test_parse_exceptional_closures_one_day_without_time_period(): + assert opening_hours.parse_exceptional_closures(["22/09/2017"]) == [{'days': [datetime.date(2017, 9, 22)], 'hours_periods': []}] + + +def test_parse_exceptional_closures_one_day_with_time_period(): + assert opening_hours.parse_exceptional_closures(["26/11/2017 9h30-12h30"]) == [ + { + 'days': [datetime.date(2017, 11, 26)], + 'hours_periods': [{'start': datetime.time(9, 30), 'stop': datetime.time(12, 30)}] + } + ] + + +def test_parse_exceptional_closures_one_day_with_multiple_time_periods(): + assert opening_hours.parse_exceptional_closures(["26/11/2017 9h30-12h30 14h-18h"]) == [ + { + 'days': [datetime.date(2017, 11, 26)], + 'hours_periods': [ + {'start': datetime.time(9, 30), 'stop': datetime.time(12, 30)}, + {'start': datetime.time(14, 0), 'stop': datetime.time(18, 0)}, + ] + } + ] + + +def test_parse_exceptional_closures_full_days_period(): + assert opening_hours.parse_exceptional_closures(["20/09/2017-22/09/2017"]) == [ + { + 'days': [datetime.date(2017, 9, 20), datetime.date(2017, 9, 21), datetime.date(2017, 9, 22)], + 'hours_periods': [] + } + ] + +def test_parse_exceptional_closures_invalid_days_period(): + with pytest.raises(ValueError): + opening_hours.parse_exceptional_closures(["22/09/2017-21/09/2017"]) + + +def test_parse_exceptional_closures_days_period_with_time_period(): + assert opening_hours.parse_exceptional_closures(["20/09/2017-22/09/2017 9h-12h"]) == [ + { + 'days': [datetime.date(2017, 9, 20), datetime.date(2017, 9, 21), datetime.date(2017, 9, 22)], + 'hours_periods': [{'start': datetime.time(9, 0), 'stop': datetime.time(12, 0)}] + } + ] + + +def test_parse_exceptional_closures_time_period_without_days(): + with pytest.raises(ValueError): + opening_hours.parse_exceptional_closures(["9h-12h"]) + + +def test_parse_exceptional_closures_invalid_time_period(): + with pytest.raises(ValueError): + opening_hours.parse_exceptional_closures(["20/09/2017 9h-8h"]) + + +def test_parse_exceptional_closures_multiple_periods(): + assert opening_hours.parse_exceptional_closures(["20/09/2017 25/11/2017-26/11/2017 9h30-12h30 14h-18h"]) == [ + { + 'days': [ + datetime.date(2017, 9, 20), + datetime.date(2017, 11, 25), + datetime.date(2017, 11, 26), + ], + 'hours_periods': [ + {'start': datetime.time(9, 30), 'stop': datetime.time(12, 30)}, + {'start': datetime.time(14, 0), 'stop': datetime.time(18, 0)}, + ] + } + ] + +# +# Tests on parse_normal_opening_hours() +# + + +def test_parse_normal_opening_hours_one_day(): + assert opening_hours.parse_normal_opening_hours(["jeudi"]) == [{'days': ["jeudi"], 'hours_periods': []}] + + +def test_parse_normal_opening_hours_multiple_days(): + assert opening_hours.parse_normal_opening_hours(["lundi jeudi"]) == [{'days': ["lundi", "jeudi"], 'hours_periods': []}] + + +def test_parse_normal_opening_hours_invalid_day(): + with pytest.raises(ValueError): + opening_hours.parse_exceptional_closures(["invalid"]) + + +def test_parse_normal_opening_hours_one_days_period(): + assert opening_hours.parse_normal_opening_hours(["lundi-jeudi"]) == [ + {'days': ["lundi", "mardi", "mercredi", "jeudi"], 'hours_periods': []} + ] + + +def test_parse_normal_opening_hours_one_day_with_one_time_period(): + assert opening_hours.parse_normal_opening_hours(["jeudi 9h-12h"]) == [ + {'days': ["jeudi"], 'hours_periods': [{'start': datetime.time(9, 0), 'stop': datetime.time(12, 0)}]}] + + +def test_parse_normal_opening_hours_invalid_days_period(): + with pytest.raises(ValueError): + opening_hours.parse_normal_opening_hours(["jeudi-mardi"]) + with pytest.raises(ValueError): + opening_hours.parse_normal_opening_hours(["lundi-mardi-mercredi"]) + + +def test_parse_normal_opening_hours_one_time_period(): + assert opening_hours.parse_normal_opening_hours(["9h-18h30"]) == [ + {'days': [], 'hours_periods': [{'start': datetime.time(9, 0), 'stop': datetime.time(18, 30)}]} + ] + + +def test_parse_normal_opening_hours_invalid_time_period(): + with pytest.raises(ValueError): + opening_hours.parse_normal_opening_hours(["12h-10h"]) + + +def test_parse_normal_opening_hours_multiple_periods(): + assert opening_hours.parse_normal_opening_hours(["lundi-vendredi 9h30-12h30 14h-18h", "samedi 9h30-18h", "dimanche 9h30-12h"]) == [ + { + 'days': ['lundi', 'mardi', 'mercredi', 'jeudi', 'vendredi'], + 'hours_periods': [ + {'start': datetime.time(9, 30), 'stop': datetime.time(12, 30)}, + {'start': datetime.time(14, 0), 'stop': datetime.time(18, 0)}, + ] + }, + { + 'days': ['samedi'], + 'hours_periods': [ + {'start': datetime.time(9, 30), 'stop': datetime.time(18, 0)}, + ] + }, + { + 'days': ['dimanche'], + 'hours_periods': [ + {'start': datetime.time(9, 30), 'stop': datetime.time(12, 0)}, + ] + }, + ] + +# +# Tests on is_closed +# + +exceptional_closures = ["22/09/2017", "20/09/2017-22/09/2017", "20/09/2017-22/09/2017 18/09/2017", "25/11/2017", "26/11/2017 9h30-12h30"] +normal_opening_hours = ["lundi-mardi jeudi 9h30-12h30 14h-16h30", "mercredi vendredi 9h30-12h30 14h-17h"] +nonworking_public_holidays = [ + '1janvier', + 'paques', + 'lundi_paques', + '1mai', + '8mai', + 'jeudi_ascension', + 'lundi_pentecote', + '14juillet', + '15aout', + '1novembre', + '11novembre', + 'noel', +] + + +def test_is_closed_when_normaly_closed_by_hour(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 5, 1, 20, 15) + ) == { + 'closed': True, + 'exceptional_closure': False, + 'exceptional_closure_all_day': False + } + + +def test_is_closed_on_exceptional_closure_full_day(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 9, 22, 14, 15) + ) == { + 'closed': True, + 'exceptional_closure': True, + 'exceptional_closure_all_day': True + } + + +def test_is_closed_on_exceptional_closure_day(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 11, 26, 10, 30) + ) == { + 'closed': True, + 'exceptional_closure': True, + 'exceptional_closure_all_day': False + } + + +def test_is_closed_on_nonworking_public_holidays(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 1, 1, 10, 30) + ) == { + 'closed': True, + 'exceptional_closure': False, + 'exceptional_closure_all_day': False + } + + +def test_is_closed_when_normaly_closed_by_day(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 5, 6, 14, 15) + ) == { + 'closed': True, + 'exceptional_closure': False, + 'exceptional_closure_all_day': False + } + + +def test_is_closed_when_normaly_opened(): + assert opening_hours.is_closed( + normal_opening_hours_values=normal_opening_hours, + exceptional_closures_values=exceptional_closures, + nonworking_public_holidays_values=nonworking_public_holidays, + when=datetime.datetime(2017, 5, 2, 15, 15) + ) == { + 'closed': False, + 'exceptional_closure': False, + 'exceptional_closure_all_day': False + } + + +def test_easter_date(): + assert opening_hours.easter_date(2010) == datetime.date(2010, 4, 4) + assert opening_hours.easter_date(2011) == datetime.date(2011, 4, 24) + assert opening_hours.easter_date(2012) == datetime.date(2012, 4, 8) + assert opening_hours.easter_date(2013) == datetime.date(2013, 3, 31) + assert opening_hours.easter_date(2014) == datetime.date(2014, 4, 20) + assert opening_hours.easter_date(2015) == datetime.date(2015, 4, 5) + assert opening_hours.easter_date(2016) == datetime.date(2016, 3, 27) + assert opening_hours.easter_date(2017) == datetime.date(2017, 4, 16) + assert opening_hours.easter_date(2018) == datetime.date(2018, 4, 1) + assert opening_hours.easter_date(2019) == datetime.date(2019, 4, 21) + assert opening_hours.easter_date(2020) == datetime.date(2020, 4, 12) + assert opening_hours.easter_date(2021) == datetime.date(2021, 4, 4) + + +def test_nonworking_french_public_days_of_the_year(): + assert opening_hours.nonworking_french_public_days_of_the_year(2021) == { + '1janvier': datetime.date(2021, 1, 1), + 'paques': datetime.date(2021, 4, 4), + 'lundi_paques': datetime.date(2021, 4, 5), + '1mai': datetime.date(2021, 5, 1), + '8mai': datetime.date(2021, 5, 8), + 'jeudi_ascension': datetime.date(2021, 5, 13), + 'pentecote': datetime.date(2021, 5, 23), + 'lundi_pentecote': datetime.date(2021, 5, 24), + '14juillet': datetime.date(2021, 7, 14), + '15aout': datetime.date(2021, 8, 15), + '1novembre': datetime.date(2021, 11, 1), + '11novembre': datetime.date(2021, 11, 11), + 'noel': datetime.date(2021, 12, 25), + 'saint_etienne': datetime.date(2021, 12, 26) + } diff --git a/tests/tests_opening_hours.py b/tests/tests_opening_hours.py deleted file mode 100644 index 5b6efef..0000000 --- a/tests/tests_opening_hours.py +++ /dev/null @@ -1,53 +0,0 @@ -import os, sys -sys.path.insert(0,os.path.dirname(os.path.dirname(os.path.abspath( __file__ )))) -import opening_hours -import datetime, logging - -debug=True - -if debug: - logging.basicConfig(level=logging.DEBUG) -else: - logging.basicConfig(level=logging.INFO) - -exceptional_closures=["22/09/2017", "20/09/2017-22/09/2017", "20/09/2017-22/09/2017 18/09/2017","25/11/2017", "26/11/2017 9h30-12h30"] -print "Raw exceptional closures value : %s" % exceptional_closures -print "Parsed exceptional closures : %s" % opening_hours.parse_exceptional_closures(exceptional_closures) - -normal_opening_hours=["lundi-mardi jeudi 9h30-12h30 14h-16h30", "mercredi vendredi 9h30-12h30 14h-17h"] -print "Raw normal opening hours : %s" % normal_opening_hours -print "Parsed normal opening hours : %s" % opening_hours.parse_normal_opening_hours(normal_opening_hours) - -nonworking_public_holidays=[ - '1janvier', - 'paques', - 'lundi_paques', - '1mai', - '8mai', - 'jeudi_ascension', - 'lundi_pentecote', - '14juillet', - '15aout', - '1novembre', - '11novembre', - 'noel', -] -print "Raw nonworking_public_holidays values : %s" % nonworking_public_holidays - -print "Is closed (now) : %s" % opening_hours.is_closed(normal_opening_hours,exceptional_closures,nonworking_public_holidays) - -tests=[ - { 'date_time': datetime.datetime(2017, 5, 1, 20, 15), 'result': {'exceptional_closure': False, 'closed': True, 'exceptional_closure_all_day': False} }, - { 'date_time': datetime.datetime(2017, 5, 2, 15, 15), 'result': {'exceptional_closure': False, 'closed': False, 'exceptional_closure_all_day': False} }, - { 'date_time': datetime.datetime(2017, 12, 25, 20, 15), 'result': {'exceptional_closure': False, 'closed': True, 'exceptional_closure_all_day': False} }, - { 'date_time': datetime.datetime(2017, 9, 22, 15, 15), 'result': {'exceptional_closure': True, 'closed': True, 'exceptional_closure_all_day': True} }, - { 'date_time': datetime.datetime(2017, 11, 25, 15, 15), 'result': {'exceptional_closure': True, 'closed': True, 'exceptional_closure_all_day': True} }, - { 'date_time': datetime.datetime(2017, 11, 26, 11, 15), 'result': {'exceptional_closure': True, 'closed': True, 'exceptional_closure_all_day': False} }, -] -for test in tests: - result=opening_hours.is_closed(normal_opening_hours,exceptional_closures,nonworking_public_holidays, test['date_time']) - if result == test['result']: - status='OK' - else: - status='ERROR' - print "Is closed (%s) : %s => %s" % (test['date_time'].isoformat(), result, status)