Compare commits

...

22 commits

Author SHA1 Message Date
Benjamin Renard 296618a34e
config: allow to customize default config file mode
Some checks failed
Run tests / tests (push) Failing after 1m0s
2024-04-16 10:39:55 +02:00
Benjamin Renard 28103836ac config: Add ask_value() helper to section & config objects
Some checks failed
Run tests / tests (push) Failing after 57s
2024-04-13 18:30:15 +02:00
Benjamin Renard 3cf6a2682c opening_hours: global rework to make module more atomic and add somes new helper methods
Some checks failed
Run tests / tests (push) Failing after 1m35s
2024-04-01 19:37:51 +02:00
Benjamin Renard eb87516e1a Docker: upgrade images and based them on node:16-bookworm-slim to allow to use it with Forgejo Actions
All checks were successful
Run tests / tests (push) Successful in 2m15s
2024-03-15 12:01:45 +01:00
Benjamin Renard 5dbdb0ffe6 CI: add missing publish jobs dependency on build job 2024-03-15 11:34:07 +01:00
Benjamin Renard b45819428d Switch from Woodpecker CI to Forgejo Actions
All checks were successful
Run tests / tests (push) Successful in 3m44s
2024-03-15 10:46:33 +01:00
Benjamin Renard 85caf81ac2 Introduce some new pre-commit hooks 2024-03-15 10:23:21 +01:00
Benjamin Renard 09c422efe2
Fix including test email template
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-03-01 16:34:45 +01:00
Benjamin Renard e368521a96
config: add _set_option() method to ConfigurableObject 2024-01-03 15:12:52 +01:00
Benjamin Renard 25cdf9d4dc
config: Add logging sections in __init__() to allow to set their default values 2023-12-19 18:36:09 +01:00
Benjamin Renard 4962b16099
config: fix configure() method to validate configuration only if -V/--validate parameter is provided 2023-12-15 13:41:53 +01:00
Benjamin Renard 371d194728
PgDB: fix doSelect() method to retreive list of dicts instead of list of lists. 2023-12-15 12:12:48 +01:00
Benjamin Renard dcaec24ea4
PgDB / MyDB / OracleDB: add limit parameter to select() method 2023-12-15 11:35:43 +01:00
Benjamin Renard 2736fc30ae
report: add add_logging_handler & send_at_exit parameters 2023-12-14 21:41:16 +01:00
Benjamin Renard 73795d27b8
config: add default_config_filename parameter 2023-12-14 21:25:00 +01:00
Benjamin Renard 07ab4490d2
config: Add OctalOption 2023-12-14 17:24:59 +01:00
Benjamin Renard 68c2103c58
config: add console log_level parameter
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2023-10-30 14:04:43 +01:00
Benjamin Renard 0064fa979c
config: fix python 3.9 compatibility
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2023-10-27 13:43:55 +02:00
Benjamin Renard b92a814577
config: Add logfile feature
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2023-10-27 13:36:32 +02:00
Benjamin Renard 8a0a65465d
Update pre-commit config and fix some pylint & bandit warnings 2023-10-27 13:35:59 +02:00
Benjamin Renard 8e0e75f30e
setup.py: use README.md file as long_description (required by latest stdeb lib) 2023-07-10 16:04:56 +02:00
Benjamin Renard 14d82fe796
build.sh: exclude pre-commit commits on computing Debian changelog 2023-07-10 14:19:41 +02:00
33 changed files with 1950 additions and 545 deletions

View file

@ -0,0 +1,86 @@
---
name: Build and publish Debian & Python packages
on: ["create"]
jobs:
build:
runs-on: docker
container:
image: docker.io/brenard/debian-python-deb:latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Build Debian & Python package
env:
MAINTAINER_NAME: ${{ vars.MAINTAINER_NAME }}
MAINTAINER_EMAIL: ${{ vars.MAINTAINER_EMAIL }}
DEBIAN_CODENAME: ${{ vars.DEBIAN_CODENAME }}
run: |
echo "${{ secrets.GPG_KEY }}"|base64 -d|gpg --import
./build.sh
rm -fr deb_dist/mylib-*
- name: Upload Debian & Python package files
uses: actions/upload-artifact@v3
with:
name: dist
path: |
dist
deb_dist
publish-forgejo:
runs-on: docker
container:
image: docker.io/brenard/debian-python-deb:latest
needs: build
steps:
- name: Download Debian & Python packages files
uses: actions/download-artifact@v3
with:
name: dist
- name: Create the release
id: create-release
shell: bash
run: |
mkdir release
mv dist/*.whl dist/*.tar.gz release/
mv deb_dist/*.deb release/
md5sum release/* > md5sum.txt
sha512sum release/* > sha512sum.txt
mv md5sum.txt sha512sum.txt release/
{
echo 'release_note<<EOF'
cat dist/release_notes.md
echo 'EOF'
} >> "$GITHUB_OUTPUT"
- name: Publish release on Forgejo
uses: actions/forgejo-release@v1
with:
direction: upload
url: https://gitea.zionetrix.net
token: ${{ secrets.forgejo_token }}
release-dir: release
release-notes: ${{ steps.create-release.outputs.release_note }}
publish-aptly:
runs-on: docker
container:
image: docker.io/brenard/aptly-publish:latest
needs: build
steps:
- name: "Download Debian package files"
uses: actions/download-artifact@v3
with:
name: dist
- name: "Publish Debian package on Aptly repository"
uses: https://gitea.zionetrix.net/bn8/aptly-publish@master
with:
api_url: ${{ vars.apt_api_url }}
api_username: ${{ vars.apt_api_username }}
api_password: ${{ secrets.apt_api_password }}
repo_name: ${{ vars.apt_repo_name }}
path: "deb_dist"
source_name: ${{ vars.apt_source_name }}

View file

@ -0,0 +1,14 @@
---
name: Run tests
on: [push]
jobs:
tests:
runs-on: docker
container:
image: docker.io/brenard/mylib:dev-master
options: "--workdir /src"
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Run tests.sh
run: ./tests.sh --no-venv

View file

@ -1,45 +1,71 @@
# Pre-commit hooks to run tests and ensure code is cleaned.
# See https://pre-commit.com for more information
---
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.6
hooks:
- id: pyupgrade
args: ['--keep-percent-format', '--py37-plus']
- repo: https://github.com/psf/black
rev: 22.12.0
- id: ruff
args: ["--fix"]
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
hooks:
- id: black
args: ['--target-version', 'py37', '--line-length', '100']
- repo: https://github.com/PyCQA/isort
rev: 5.11.5
- id: pyupgrade
args: ["--keep-percent-format", "--py37-plus"]
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: isort
args: ['--profile', 'black', '--line-length', '100']
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
- id: black
args: ["--target-version", "py37", "--line-length", "100"]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: flake8
args: ['--max-line-length=100']
- repo: local
- id: isort
args: ["--profile", "black", "--line-length", "100"]
- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
hooks:
- id: pylint
name: pylint
entry: pylint --extension-pkg-whitelist=cx_Oracle
language: system
types: [python]
require_serial: true
- repo: https://github.com/Lucas-C/pre-commit-hooks-bandit
rev: v1.0.5
- id: flake8
args: ["--max-line-length=100"]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.2
hooks:
- id: python-bandit-vulnerability-check
name: bandit
args: [--skip, "B101", --recursive, mylib]
- repo: local
- id: codespell
args:
- --ignore-words-list=exten
- --skip="./.*,*.csv,*.json,*.ini,*.subject,*.txt,*.html,*.log,*.conf"
- --quiet-level=2
- --ignore-regex=.*codespell-ignore$
# - --write-changes # Uncomment to write changes
exclude_types: [csv, json]
- repo: https://github.com/adrienverge/yamllint
rev: v1.32.0
hooks:
- id: pytest
name: pytest
entry: python3 -m pytest tests
language: system
types: [python]
pass_filenames: false
- id: yamllint
ignore: .github/
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.7.1
hooks:
- id: prettier
args: ["--print-width", "100"]
- repo: local
hooks:
- id: pylint
name: pylint
entry: ./.pre-commit-pylint --extension-pkg-whitelist=cx_Oracle
language: system
types: [python]
require_serial: true
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args: [--skip, "B101", --recursive, "mylib"]
- repo: local
hooks:
- id: pytest
name: pytest
entry: ./.pre-commit-pytest tests
language: system
types: [python]
pass_filenames: false

21
.pre-commit-pylint Executable file
View file

@ -0,0 +1,21 @@
#!/bin/bash
PWD=`pwd`
if [ -d "$PWD/venv" ]
then
echo "Run pylint inside venv ($PWD/venv)..."
[ ! -e "$PWD/venv/bin/pylint" ] && $PWD/venv/bin/python -m pip install pylint
$PWD/venv/bin/pylint "$@"
exit $?
elif [ -e "$PWD/pyproject.toml" ]
then
echo "Run pylint using poetry..."
poetry run pylint --version > /dev/null 2>&1 || poetry run python -m pip install pylint
poetry run pylint "$@"
exit $?
else
echo "Run pylint at system scope..."
pylint "$@"
exit $?
fi

21
.pre-commit-pytest Executable file
View file

@ -0,0 +1,21 @@
#!/bin/bash
PWD=`pwd`
if [ -d "$PWD/venv" ]
then
echo "Run pytest inside venv ($PWD/venv)..."
[ ! -e "$PWD/venv/bin/pytest" ] && $PWD/venv/bin/python -m pip install pytest
$PWD/venv/bin/pytest "$@"
exit $?
elif [ -e "$PWD/pyproject.toml" ]
then
echo "Run pytest using poetry..."
poetry run pytest --version > /dev/null 2>&1 || poetry run python -m pip install pytest
poetry run pytest "$@"
exit $?
else
echo "Run pytest at system scope..."
pytest "$@"
exit $?
fi

View file

@ -1,63 +0,0 @@
clone:
git:
image: woodpeckerci/plugin-git
tags: true
pipeline:
test:
image: brenard/mylib:dev-master
commands:
- ./tests.sh --no-venv
build:
image: brenard/debian-python-deb
when:
event: tag
commands:
- echo "$GPG_KEY"|base64 -d|gpg --import
- ./build.sh --quiet
- rm -fr deb_dist/mylib-*
secrets: [ maintainer_name, maintainer_email, gpg_key, debian_codename ]
publish-dryrun:
group: publish
image: alpine
when:
event: tag
commands:
- ls dist/*
- ls deb_dist/*
publish-gitea:
group: publish
image: plugins/gitea-release
when:
event: tag
settings:
api_key:
from_secret: gitea_token
base_url: https://gitea.zionetrix.net
note: dist/release_notes.md
files:
- dist/*
- deb_dist/*.deb
checksum:
- md5
- sha512
publish-apt:
group: publish
image: brenard/aptly-publish
when:
event: tag
settings:
api_url:
from_secret: apt_api_url
api_username:
from_secret: apt_api_username
api_password:
from_secret: apt_api_password
repo_name:
from_secret: apt_repo_name
path: deb_dist
source_name: mylib

View file

@ -35,35 +35,35 @@ Just run `pip install git+https://gitea.zionetrix.net/bn8/python-mylib.git`
Just run `python setup.py install`
**Note:** This project could previously use as independent python files (not as module). This old version is keep in *legacy* git branch (not maintained).
**Note:** This project could previously use as independent python files (not as module). This old version is keep in _legacy_ git branch (not maintained).
## Include libs
* **mylib.email.EmailClient:** An email client to forge (eventually using template) and send email via a SMTP server
* **mylib.ldap.LdapServer:** A small lib to make requesting LDAP server easier. It's also provide some helper functions to deal with LDAP date string.
* **mylib.mysql.MyDB:** An extra small lib to remember me how to interact with MySQL/MariaDB database
* **mylib.pgsql.PgDB:** An small lib to remember me how to interact with PostgreSQL database. **Warning:** The insert/update/delete/select methods demonstrate how to forge raw SQL request, but **it's a bad idea**: Prefer using prepared query.
* **mylib.opening_hours:** A set of helper functions to deal with french opening hours (including normal opening hours, exceptional closure and nonworking public holidays).
* **mylib.pbar.Pbar:** A small lib for progress bar
* **mylib.report.Report:** A small lib to implement logging based email report send at exit
- **mylib.email.EmailClient:** An email client to forge (eventually using template) and send email via a SMTP server
- **mylib.ldap.LdapServer:** A small lib to make requesting LDAP server easier. It's also provide some helper functions to deal with LDAP date string.
- **mylib.mysql.MyDB:** An extra small lib to remember me how to interact with MySQL/MariaDB database
- **mylib.pgsql.PgDB:** An small lib to remember me how to interact with PostgreSQL database. **Warning:** The insert/update/delete/select methods demonstrate how to forge raw SQL request, but **it's a bad idea**: Prefer using prepared query.
- **mylib.opening_hours:** A set of helper functions to deal with french opening hours (including normal opening hours, exceptional closure and nonworking public holidays).
- **mylib.pbar.Pbar:** A small lib for progress bar
- **mylib.report.Report:** A small lib to implement logging based email report send at exit
To know how to use these libs, you can take a look on *mylib.scripts* content or in *tests* directory.
To know how to use these libs, you can take a look on _mylib.scripts_ content or in _tests_ directory.
## Code Style
[pylint](https://pypi.org/project/pylint/) is used to check for errors and enforces a coding standard, using thoses parameters:
[pylint](https://pypi.org/project/pylint/) is used to check for errors and enforces a coding standard, using those parameters:
```bash
pylint --extension-pkg-whitelist=cx_Oracle
```
[flake8](https://pypi.org/project/flake8/) is also used to check for errors and enforces a coding standard, using thoses parameters:
[flake8](https://pypi.org/project/flake8/) is also used to check for errors and enforces a coding standard, using those parameters:
```bash
flake8 --max-line-length=100
```
[black](https://pypi.org/project/black/) is used to format the code, using thoses parameters:
[black](https://pypi.org/project/black/) is used to format the code, using those parameters:
```bash
black --target-version py37 --line-length 100
@ -83,7 +83,6 @@ pyupgrade --keep-percent-format --py37-plus
**Note:** There is `.pre-commit-config.yaml` to use [pre-commit](https://pre-commit.com/) to automatically run these tools before commits. After cloning the repository, execute `pre-commit install` to install the git hook.
## Copyright
Copyright (c) 2013-2021 Benjamin Renard <brenard@zionetrix.net>

View file

@ -81,7 +81,7 @@ cd deb_dist/mylib-$VERSION
if [ -z "$DEBIAN_CODENAME" ]
then
echo "Retreive debian codename using lsb_release..."
echo "Retrieve debian codename using lsb_release..."
DEBIAN_CODENAME=$( lsb_release -c -s )
[ $( lsb_release -r -s ) -ge 9 ] && DEBIAN_CODENAME="${DEBIAN_CODENAME}-ee"
else
@ -114,6 +114,8 @@ $GITDCH \
--release-notes ../../dist/release_notes.md \
--path ../../ \
--exclude "^CI: " \
--exclude "^Docker: " \
--exclude "^pre-commit: " \
--exclude "\.?woodpecker(\.yml)?" \
--exclude "build(\.sh)?" \
--exclude "tests(\.sh)?" \

View file

@ -1,5 +1,6 @@
FROM brenard/mylib:latest
RUN apt-get remove -y python3-mylib
RUN python3 -m pip install -U git+https://gitea.zionetrix.net/bn8/python-mylib.git
RUN git clone https://gitea.zionetrix.net/bn8/python-mylib.git /usr/local/src/python-mylib && pip install /usr/local/src/python-mylib[dev]
RUN cd /usr/local/src/python-mylib && pre-commit run --all-files
RUN apt-get remove -y python3-mylib && \
git clone https://gitea.zionetrix.net/bn8/python-mylib.git /src && \
pip install --break-system-packages /src[dev] && \
cd /src && \
pre-commit run --all-files

View file

@ -1,5 +1,26 @@
FROM debian:latest
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && apt-get -o Acquire::AllowInsecureRepositories=true -o Acquire::AllowDowngradeToInsecureRepositories=true update && apt-get -o APT::Get::AllowUnauthenticated=true install --yes zionetrix-archive-keyring && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get upgrade -y && apt-get install -y python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git libldap2-dev libsasl2-dev pkg-config libsystemd-dev libpq-dev libmariadb-dev wget unzip && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
RUN python3 -m pip install pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
RUN wget --no-verbose -O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && ldconfig
FROM node:16-bookworm-slim
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && \
apt-get \
-o Acquire::AllowInsecureRepositories=true \
-o Acquire::AllowDowngradeToInsecureRepositories=true \
update && \
apt-get \
-o APT::Get::AllowUnauthenticated=true \
install --yes zionetrix-archive-keyring && \
apt-get update && \
apt-get upgrade -y && \
apt-get install -y \
python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git \
libldap2-dev libsasl2-dev \
pkg-config libsystemd-dev \
libpq-dev libmariadb-dev \
wget unzip && \
apt-get clean && \
rm -fr rm -rf /var/lib/apt/lists/*
RUN python3 -m pip install --break-system-packages pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
RUN wget --no-verbose \
-O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip \
https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && \
ldconfig

View file

@ -1,7 +1,7 @@
""" Some really common helper functions """
#
# Pretty formating helpers
# Pretty formatting helpers
#
@ -11,7 +11,7 @@ def increment_prefix(prefix):
def pretty_format_value(value, encoding="utf8", prefix=None):
"""Returned pretty formated value to display"""
"""Returned pretty formatted value to display"""
if isinstance(value, dict):
return pretty_format_dict(value, encoding=encoding, prefix=prefix)
if isinstance(value, list):
@ -27,10 +27,10 @@ def pretty_format_value(value, encoding="utf8", prefix=None):
def pretty_format_value_in_list(value, encoding="utf8", prefix=None):
"""
Returned pretty formated value to display in list
Returned pretty formatted value to display in list
That method will prefix value with line return and incremented prefix
if pretty formated value contains line return.
if pretty formatted value contains line return.
"""
prefix = prefix if prefix else ""
value = pretty_format_value(value, encoding, prefix)
@ -41,7 +41,7 @@ def pretty_format_value_in_list(value, encoding="utf8", prefix=None):
def pretty_format_dict(value, encoding="utf8", prefix=None):
"""Returned pretty formated dict to display"""
"""Returned pretty formatted dict to display"""
prefix = prefix if prefix else ""
result = []
for key in sorted(value.keys()):
@ -53,7 +53,7 @@ def pretty_format_dict(value, encoding="utf8", prefix=None):
def pretty_format_list(row, encoding="utf8", prefix=None):
"""Returned pretty formated list to display"""
"""Returned pretty formatted list to display"""
prefix = prefix if prefix else ""
result = []
for idx, values in enumerate(row):

View file

@ -6,7 +6,6 @@ import argparse
import logging
import os
import re
import stat
import sys
import textwrap
import traceback
@ -24,7 +23,14 @@ log = logging.getLogger(__name__)
# Constants
DEFAULT_ENCODING = "utf-8"
DEFAULT_CONFIG_DIRPATH = os.path.expanduser("./")
DEFAULT_CONSOLE_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
DEFAULT_CONFIG_FILE_MODE = 0o600
DEFAULT_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
DEFAULT_CONSOLE_LOG_FORMAT = DEFAULT_LOG_FORMAT
DEFAULT_FILELOG_FORMAT = DEFAULT_LOG_FORMAT
class ConfigException(BaseException):
"""Configuration exception"""
class BaseOption: # pylint: disable=too-many-instance-attributes
@ -161,12 +167,12 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
args = [self.parser_argument_name]
if self.short_arg:
args.append(self.short_arg)
kwargs = dict(
action=self.parser_action,
dest=self.parser_dest,
help=self.parser_help,
default=self.default,
)
kwargs = {
"action": self.parser_action,
"dest": self.parser_dest,
"help": self.parser_help,
"default": self.default,
}
if self.parser_type: # pylint: disable=using-constant-test
kwargs["type"] = self.parser_type
@ -243,12 +249,16 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set or
return it regarding set parameter
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
value = self._ask_value()
if set_it:
return self.set(value)
self.set(value)
return value
@ -379,6 +389,59 @@ class IntegerOption(BaseOption):
print("Invalid answer. Must a integer value")
class OctalOption(BaseOption):
"""Octal configuration option class"""
@staticmethod
def octal(value):
"""Convert configuration octal string as integer"""
return int(str(value), 8)
@staticmethod
def octal_string(value):
"""Convert integer to configuration octal string"""
return oct(value)[2:]
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.octal(self.config.config_parser.getint(self.section.name, self.name))
def to_config(self, value=None):
"""Format value as stored in configuration file"""
value = value if value is not None else self.get()
return self.octal_string(value) if value is not None else ""
@property
def parser_type(self):
return self.octal
@property
def parser_help(self):
"""Get option help message in arguments parser options"""
if self.arg_help and self.default is not None:
# pylint: disable=consider-using-f-string
return "{} (Default: {})".format(
self.arg_help,
re.sub(r"%([^%])", r"%%\1", self.octal_string(self._default_in_config)),
)
if self.arg_help:
return self.arg_help
return None
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
default_value = kwargs.pop("default_value", self.get())
while True:
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
if value in ["", None, default_value]:
return default_value
try:
return self.octal(value)
except ValueError:
print("Invalid answer. Must an octal value")
class PasswordOption(StringOption):
"""Password configuration option class"""
@ -406,7 +469,7 @@ class PasswordOption(StringOption):
service_name = self._keyring_service_name
username = self._keyring_username
log.debug("Retreive password %s for username=%s from keyring", service_name, username)
log.debug("Retrieve password %s for username=%s from keyring", service_name, username)
value = keyring.get_password(service_name, username)
if value is None:
@ -451,8 +514,12 @@ class PasswordOption(StringOption):
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set or
return it regarding set parameter
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
value = self._ask_value()
if set_it:
@ -471,7 +538,7 @@ class PasswordOption(StringOption):
use_keyring = False
else:
print("Invalid answer. Possible values: Y or N (case insensitive)")
return self.set(value, use_keyring=use_keyring)
self.set(value, use_keyring=use_keyring)
return value
@ -551,28 +618,32 @@ class ConfigSection:
:param set_it: If True (default), option value will be updated with user input
:return: If set_it is True, return True if valid value for each configuration
option have been retrieved and set. If False, return a dict of configuration
options and their value.
:return: a dict of configuration options and their value.
:rtype: bool of dict
"""
if self.comment:
print(f"# {self.comment}")
print(f"[{self.name}]\n")
result = {}
error = False
for name, option in self.options.items():
option_result = option.ask_value(set_it=set_it)
if set_it:
result[name] = option_result
elif not option_result:
error = True
result[name] = option.ask_value(set_it=set_it)
print()
print()
if set_it:
return not error
return result
def ask_value(self, option, set_it=True):
"""
Ask user to enter value for the specified configuration option of the section
:param options: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].ask_value(set_it=set_it)
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
"""
@ -607,6 +678,8 @@ class Config: # pylint: disable=too-many-instance-attributes
encoding=None,
config_file_env_variable=None,
default_config_dirpath=None,
default_config_filename=None,
default_config_file_mode=None,
):
self.appname = appname
self.shortname = shortname
@ -621,8 +694,68 @@ class Config: # pylint: disable=too-many-instance-attributes
self._filepath = None
self.config_file_env_variable = config_file_env_variable
self.default_config_dirpath = default_config_dirpath
self.default_config_filename = default_config_filename
self.default_config_file_mode = default_config_file_mode or DEFAULT_CONFIG_FILE_MODE
self.add_logging_sections()
self._init_config_parser()
def add_logging_sections(self):
"""Add logging sections"""
console_section = self.add_section("console", comment="Console logging", order=998)
console_section.add_option(
BooleanOption,
"enabled",
default=False,
arg="--console",
short_arg="-C",
comment="Enable/disable console log",
)
console_section.add_option(
BooleanOption,
"force_stderr",
default=False,
arg="--console-stderr",
comment="Force console log on stderr",
)
console_section.add_option(
StringOption,
"log_format",
default=DEFAULT_CONSOLE_LOG_FORMAT,
arg="--console-log-format",
comment="Console log format",
)
console_section.add_option(
StringOption,
"log_level",
comment=(
"Console log level limit : by default, all logged messages (according to main log "
"level) will be logged to the console, but you can set a minimal level if you "
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
f"want. Possible values: {', '.join(logging._nameToLevel)}."
),
)
logfile_section = self.add_section("logfile", comment="Logging file", order=999)
logfile_section.add_option(StringOption, "path", comment="File log path")
logfile_section.add_option(
StringOption,
"format",
default=DEFAULT_FILELOG_FORMAT,
comment="File log format",
)
logfile_section.add_option(
StringOption,
"level",
comment=(
"File log level limit : by default, all logged messages (according to main log "
"level) will be logged to the log file, but you can set a minimal level if you "
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
f"want. Possible values: {', '.join(logging._nameToLevel)}."
),
)
def add_section(self, name, loaded_callback=None, **kwargs):
"""
Add section
@ -638,7 +771,7 @@ class Config: # pylint: disable=too-many-instance-attributes
self.sections[name] = ConfigSection(self, name, **kwargs)
if loaded_callback:
self._loaded_callbacks.append(loaded_callback)
# If configuration is already loaded, execute callback immediatly
# If configuration is already loaded, execute callback immediately
if self._filepath or self.options:
self._loaded()
return self.sections[name]
@ -737,7 +870,7 @@ class Config: # pylint: disable=too-many-instance-attributes
self._loaded_callbacks_executed.append(callback)
return not error
def save(self, filepath=None):
def save(self, filepath=None, reload=True):
"""Save configuration file"""
filepath = filepath if filepath else self._filepath
assert filepath, "Configuration filepath is not set or provided"
@ -763,11 +896,12 @@ class Config: # pylint: disable=too-many-instance-attributes
fd.write("\n".join(lines).encode(self.encoding))
# Privacy!
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
os.chmod(filepath, self.default_config_file_mode)
except Exception: # pylint: disable=broad-except
log.exception("Failed to write generated configuration file %s", filepath)
return False
self.load_file(filepath)
if reload:
return self.load_file(filepath)
return True
@property
@ -820,30 +954,6 @@ class Config: # pylint: disable=too-many-instance-attributes
"-v", "--verbose", action="store_true", help="Show verbose messages"
)
section = self.add_section("console", comment="Console logging")
section.add_option(
BooleanOption,
"enabled",
default=False,
arg="--console",
short_arg="-C",
comment="Enable/disable console log",
)
section.add_option(
BooleanOption,
"force_stderr",
default=False,
arg="--console-stderr",
comment="Force console log on stderr",
)
section.add_option(
StringOption,
"log_format",
default=DEFAULT_CONSOLE_LOG_FORMAT,
arg="--console-log-format",
comment="Console log format",
)
self.add_options_to_parser(self.options_parser)
return self.options_parser
@ -924,29 +1034,60 @@ class Config: # pylint: disable=too-many-instance-attributes
self.set(*opt_info)
if self.get("console", "enabled"):
stdout_console_handler = logging.StreamHandler(
sys.stderr if self.get("console", "force_stderr") else sys.stdout
console_log_level = (
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
logging._nameToLevel.get(self.get("console", "log_level"))
if self.get("console", "log_level")
else logging.DEBUG
)
stdout_console_handler.addFilter(StdoutInfoFilter())
stdout_console_handler.setLevel(logging.DEBUG)
if console_log_level < logging.WARNING:
stdout_console_handler = logging.StreamHandler(
sys.stderr if self.get("console", "force_stderr") else sys.stdout
)
stdout_console_handler.addFilter(StdoutInfoFilter())
stdout_console_handler.setLevel(console_log_level)
stderr_console_handler = logging.StreamHandler(sys.stderr)
stderr_console_handler.setLevel(logging.WARNING)
stderr_console_handler.setLevel(
console_log_level if console_log_level > logging.WARNING else logging.WARNING
)
if self.get("console", "log_format"):
console_formater = logging.Formatter(self.get("console", "log_format"))
stdout_console_handler.setFormatter(console_formater)
if console_log_level < logging.WARNING:
stdout_console_handler.setFormatter(console_formater)
stderr_console_handler.setFormatter(console_formater)
logging.getLogger().addHandler(stdout_console_handler)
if console_log_level < logging.WARNING:
logging.getLogger().addHandler(stdout_console_handler)
logging.getLogger().addHandler(stderr_console_handler)
if self.get("logfile", "path"):
logfile_handler = logging.FileHandler(self.get("logfile", "path"))
logfile_level = (
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
logging._nameToLevel.get(self.get("logfile", "level"))
if self.get("logfile", "level")
else logging.DEBUG
)
if logfile_level is None:
log.fatal("Invalid log file level specified (%s)", self.get("logfile", "level"))
sys.exit(1)
logfile_handler.setLevel(logfile_level)
if self.get("logfile", "format"):
logfile_formater = logging.Formatter(self.get("logfile", "format"))
logfile_handler.setFormatter(logfile_formater)
logging.getLogger().addHandler(logfile_handler)
if execute_callback:
self._loaded()
if self.get_option("mylib_config_reconfigure", default=False):
if self.ask_values(set_it=True) and self.save():
sys.exit(0)
self.ask_values(set_it=True)
sys.exit(1)
return options
@ -972,27 +1113,32 @@ class Config: # pylint: disable=too-many-instance-attributes
:param execute_callback: Sections's loaded callbacks will be finally executed
(only if set_it is True, default: False)
:return: If set_it is True, return True if valid value for each configuration
option have been retrieved and set. If False, return a dict of configuration
section and their options value.
:rtype: bool of dict
:return: a dict of configuration section and their options value.
:rtype: dict
"""
result = {}
error = False
for name, section in self.sections.items():
section_result = section.ask_values(set_it=set_it)
if not set_it:
result[name] = section_result
elif not section_result:
error = True
if set_it:
if error:
return False
if execute_callback:
self._loaded()
return True
result[name] = section.ask_values(set_it=set_it)
if set_it and execute_callback:
self._loaded()
return result
def ask_value(self, section, option, set_it=True):
"""
Ask user to enter value for the specified configuration option
:param section: The configuration section name
:param option: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
return self.sections[section].ask_value(option, set_it=set_it)
def configure(self, argv=None, description=False):
"""
Entry point of a script you could use to created your configuration file
@ -1027,41 +1173,49 @@ class Config: # pylint: disable=too-many-instance-attributes
dest="validate",
help=(
"Validate configuration: initialize application to test if provided parameters"
" works.\n\nNote: Validation will occured after configuration file creation or"
" works.\n\nNote: Validation will occurred after configuration file creation or"
" update. On error, re-run with -O/--overwrite parameter to fix it."
),
)
options = self.parse_arguments_options(argv, create=False, execute_callback=False)
def validate():
"""Validate configuration file"""
print("Validate your configuration...")
try:
if self.load_file(options.config):
print("Your configuration seem valid.")
else:
print("Error(s) occurred validating your configuration. See logs for details.")
sys.exit(1)
except Exception: # pylint: disable=broad-except
print(
"Exception occurred validating your configuration:\n"
f"{traceback.format_exc()}"
"\n\nSee logs for details."
)
sys.exit(2)
if os.path.exists(options.config) and not options.overwrite:
print(f"Configuration file {options.config} already exists")
print(
f"Configuration file {options.config} already exists. "
"Use -O/--overwrite parameter to overwrite it."
)
if options.validate:
validate()
sys.exit(0)
sys.exit(1)
if options.interactive:
self.ask_values(set_it=True)
if self.save(options.config):
if self.save(options.config, reload=False):
print(f"Configuration file {options.config} created.")
if options.validate:
print("Validate your configuration...")
try:
if self._loaded():
print("Your configuration seem valid.")
else:
print(
"Error(s) occurred validating your configuration. See logs for details."
)
sys.exit(1)
except Exception: # pylint: disable=broad-except
print(
"Exception occurred validating your configuration:\n"
f"{traceback.format_exc()}"
"\n\nSee logs for details."
)
sys.exit(2)
validate()
else:
print(f"Error occured creating configuration file {options.config}")
print(f"Error occurred creating configuration file {options.config}")
sys.exit(1)
sys.exit(0)
@ -1082,9 +1236,11 @@ class Config: # pylint: disable=too-many-instance-attributes
return self._filepath
if self.config_file_env_variable and os.environ.get(self.config_file_env_variable):
return os.environ.get(self.config_file_env_variable)
return os.path.join(
self.config_dir, f"{self.shortname}.ini" if self.shortname else "config.ini"
)
if self.default_config_filename:
filename = self.default_config_filename
else:
filename = f"{self.shortname}.ini" if self.shortname else "config.ini"
return os.path.join(self.config_dir, filename)
class ConfigurableObject:
@ -1132,7 +1288,7 @@ class ConfigurableObject:
elif self._config_name:
self._options_prefix = self._config_name + "_"
else:
raise Exception(f"No configuration name defined for {__name__}")
raise ConfigException(f"No configuration name defined for {__name__}")
if config:
self._config = config
@ -1141,10 +1297,10 @@ class ConfigurableObject:
elif self._config_name:
self._config_section = self._config_name
else:
raise Exception(f"No configuration name defined for {__name__}")
raise ConfigException(f"No configuration name defined for {__name__}")
def _get_option(self, option, default=None, required=False):
"""Retreive option value"""
"""Retrieve option value"""
if self._kwargs and option in self._kwargs:
return self._kwargs[option]
@ -1158,9 +1314,13 @@ class ConfigurableObject:
return default if default is not None else self._defaults.get(option)
def _set_option(self, option, value):
"""Set option value"""
self._kwargs[option] = value
def set_default(self, option, default_value):
"""Set option default value"""
assert option in self._defaults, f"Unkown option {option}"
assert option in self._defaults, f"Unknown option {option}"
self._defaults[option] = default_value
def set_defaults(self, **default_values):
@ -1252,7 +1412,7 @@ class ConfigurableObject:
return True
# If Config provided, use it's get_option() method to obtain a global just_try parameter
# value with a defaut to False, otherwise always false
# value with a default to False, otherwise always false
return self._config.get_option("just_try", default=False) if self._config else False
@ -1274,7 +1434,7 @@ class ConfigSectionAsDictWrapper:
self.__section.set(key, value)
def __delitem__(self, key):
raise Exception("Deleting a configuration option is not supported")
raise ConfigException("Deleting a configuration option is not supported")
# pylint: disable=too-few-public-methods

View file

@ -38,7 +38,7 @@ class DBFailToConnect(DBException, RuntimeError):
"""
def __init__(self, uri):
super().__init__("An error occured during database connection ({uri})", uri=uri)
super().__init__("An error occurred during database connection ({uri})", uri=uri)
class DBDuplicatedSQLParameter(DBException, KeyError):
@ -77,6 +77,19 @@ class DBInvalidOrderByClause(DBException, TypeError):
)
class DBInvalidLimitClause(DBException, TypeError):
"""
Raised when trying to select on table with invalid
LIMIT clause provided
"""
def __init__(self, limit):
super().__init__(
"Invalid LIMIT clause: {limit}. Must be a non-zero positive integer.",
limit=limit,
)
class DB:
"""Database client"""
@ -340,7 +353,14 @@ class DB:
return True
def select(
self, table, where_clauses=None, fields=None, where_op="AND", order_by=None, just_try=False
self,
table,
where_clauses=None,
fields=None,
where_op="AND",
order_by=None,
limit=None,
just_try=False,
):
"""Run SELECT SQL query"""
sql = "SELECT "
@ -374,6 +394,16 @@ class DB:
else:
raise DBInvalidOrderByClause(order_by)
if limit:
if not isinstance(limit, int):
try:
limit = int(limit)
except ValueError as err:
raise DBInvalidLimitClause(limit) from err
if limit <= 0:
raise DBInvalidLimitClause(limit)
sql += f" LIMIT {limit}"
if just_try:
log.debug("Just-try mode: execute SELECT query : %s", sql)
return just_try

View file

@ -239,7 +239,7 @@ class EmailClient(
msg["Date"] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self._get_option("encoding")
if template:
assert template in self.templates, f"Unknwon template {template}"
assert template in self.templates, f"Unknown template {template}"
# Handle subject from template
if not subject:
assert self.templates[template].get(
@ -251,7 +251,7 @@ class EmailClient(
else self.templates[template]["subject"].format(**template_vars)
)
# Put HTML part in last one to prefered it
# Put HTML part in last one to preferred it
parts = []
if self.templates[template].get("text"):
if isinstance(self.templates[template]["text"], MakoTemplate):
@ -322,7 +322,7 @@ class EmailClient(
catch_addr = self._get_option("catch_all_addr")
if catch_addr:
log.debug(
"Catch email originaly send to %s (CC:%s, BCC:%s) to %s",
"Catch email originally send to %s (CC:%s, BCC:%s) to %s",
", ".join(recipients),
", ".join(cc) if isinstance(cc, list) else cc,
", ".join(bcc) if isinstance(bcc, list) else bcc,
@ -566,15 +566,15 @@ if __name__ == "__main__":
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates=dict(
test=dict(
subject="Test email",
text=(
templates={
"test": {
"subject": "Test email",
"text": (
"Just a test email sent at {sent_date}."
if not options.test_mako
else MakoTemplate("Just a test email sent at ${sent_date | h}.") # nosec
),
html=(
"html": (
"<strong>Just a test email.</strong> <small>(sent at {sent_date | h})</small>"
if not options.test_mako
else MakoTemplate( # nosec
@ -582,8 +582,8 @@ if __name__ == "__main__":
"<small>(sent at ${sent_date | h})</small>"
)
),
)
),
}
},
)
logging.info("Send a test email to %s", options.test_to)

View file

@ -120,7 +120,7 @@ class LdapServer:
return ldap.SCOPE_ONELEVEL # pylint: disable=no-member
if scope == "sub":
return ldap.SCOPE_SUBTREE # pylint: disable=no-member
raise Exception(f'Unknown LDAP scope "{scope}"')
raise LdapServerException(f'Unknown LDAP scope "{scope}"')
def search(self, basedn, filterstr=None, attrs=None, sizelimit=None, scope=None):
"""Run a search on LDAP server"""
@ -211,7 +211,7 @@ class LdapServer:
result_page_control = rctrl
break
# If PagedResultsControl answer not detected, paged serach
# If PagedResultsControl answer not detected, paged search
if not result_page_control:
self._error(
"LdapServer - Server ignores RFC2696 control, paged search can not works",
@ -238,7 +238,7 @@ class LdapServer:
page_control.cookie = result_page_control.cookie
self.logger.debug(
"LdapServer - Paged search end: %d object(s) retreived in %d page(s) of %d object(s)",
"LdapServer - Paged search end: %d object(s) retrieved in %d page(s) of %d object(s)",
len(ret),
pages_count,
pagesize,
@ -379,12 +379,12 @@ class LdapServer:
@staticmethod
def get_dn(obj):
"""Retreive an on object DN from its entry in LDAP search result"""
"""Retrieve an on object DN from its entry in LDAP search result"""
return obj[0][0]
@staticmethod
def get_attr(obj, attr, all_values=None, default=None, decode=False):
"""Retreive an on object attribute value(s) from the object entry in LDAP search result"""
"""Retrieve an on object attribute value(s) from the object entry in LDAP search result"""
if attr not in obj:
for k in obj:
if k.lower() == attr.lower():
@ -399,18 +399,16 @@ class LdapServer:
return default
class LdapServerException(BaseException):
class LdapException(BaseException):
"""Generic LDAP exception"""
class LdapServerException(LdapException):
"""Generic exception raised by LdapServer"""
def __init__(self, msg):
BaseException.__init__(self, msg)
class LdapClientException(LdapServerException):
"""Generic exception raised by LdapServer"""
def __init__(self, msg):
LdapServerException.__init__(self, msg)
class LdapClientException(LdapException):
"""Generic exception raised by LdapClient"""
class LdapClient:
@ -439,7 +437,7 @@ class LdapClient:
self.initialize()
def _get_option(self, option, default=None, required=False):
"""Retreive option value"""
"""Retrieve option value"""
if self._options and hasattr(self._options, self._options_prefix + option):
return getattr(self._options, self._options_prefix + option)
@ -502,7 +500,7 @@ class LdapClient:
self.config = loaded_config
uri = self._get_option("uri", required=True)
binddn = self._get_option("binddn")
log.info("Connect to LDAP server %s as %s", uri, binddn if binddn else "annonymous")
log.info("Connect to LDAP server %s as %s", uri, binddn if binddn else "anonymous")
self._conn = LdapServer(
uri,
dn=binddn,
@ -541,7 +539,7 @@ class LdapClient:
:param dn: The object DN
:param attrs: The object attributes as return by python-ldap search
"""
obj = dict(dn=dn)
obj = {"dn": dn}
for attr in attrs:
obj[attr] = [self.decode(v) for v in self._conn.get_attr(attrs, attr, all_values=True)]
return obj
@ -555,7 +553,7 @@ class LdapClient:
:param attr: The attribute name
:param all_values: If True, all values of the attribute will be
returned instead of the first value only
(optinal, default: False)
(optional, default: False)
"""
if attr not in obj:
for k in obj:
@ -584,7 +582,7 @@ class LdapClient:
:param name: The object type name
:param filterstr: The LDAP filter to use to search objects on LDAP directory
:param basedn: The base DN of the search
:param attrs: The list of attribute names to retreive
:param attrs: The list of attribute names to retrieve
:param key_attr: The attribute name or 'dn' to use as key in result
(optional, if leave to None, the result will be a list)
:param warn: If True, a warning message will be logged if no object is found
@ -596,7 +594,7 @@ class LdapClient:
(optional, default: see LdapServer.paged_search)
"""
if name in self._cached_objects:
log.debug("Retreived %s objects from cache", name)
log.debug("Retrieved %s objects from cache", name)
else:
assert self._conn or self.initialize()
log.debug(
@ -645,7 +643,7 @@ class LdapClient:
:param object_name: The object name (only use in log messages)
:param filterstr: The LDAP filter to use to search the object on LDAP directory
:param basedn: The base DN of the search
:param attrs: The list of attribute names to retreive
:param attrs: The list of attribute names to retrieve
:param warn: If True, a warning message will be logged if no object is found
in LDAP directory (otherwise, it will be just a debug message)
(optional, default: True)
@ -857,7 +855,7 @@ class LdapClient:
Update an object
:param ldap_obj: The original LDAP object
:param changes: The changes to make on LDAP object (as formated by get_changes() method)
:param changes: The changes to make on LDAP object (as formatted by get_changes() method)
:param protected_attrs: An optional list of protected attributes
:param rdn_attr: The LDAP object RDN attribute (to detect renaming, default: auto-detected)
:param rdn_attr: Enable relax modification server control (optional, default: false)
@ -917,7 +915,7 @@ class LdapClient:
# Otherwise, update object DN
ldap_obj["dn"] = new_dn
else:
log.debug("%s: No change detected on RDN attibute %s", ldap_obj["dn"], rdn_attr)
log.debug("%s: No change detected on RDN attribute %s", ldap_obj["dn"], rdn_attr)
try:
if self._just_try:
@ -1018,7 +1016,7 @@ def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None):
elif isinstance(default_timezone, datetime.tzinfo):
date = date.replace(tzinfo=default_timezone)
else:
raise Exception("It's not supposed to happen!")
raise LdapException("It's not supposed to happen!")
elif naive:
return date.replace(tzinfo=None)
if to_timezone:
@ -1078,7 +1076,7 @@ def format_datetime(value, from_timezone=None, to_timezone=None, naive=None):
elif isinstance(from_timezone, datetime.tzinfo):
from_value = value.replace(tzinfo=from_timezone)
else:
raise Exception("It's not supposed to happen!")
raise LdapException("It's not supposed to happen!")
elif naive:
from_value = value.replace(tzinfo=pytz.utc)
else:
@ -1105,7 +1103,7 @@ def format_date(value, from_timezone=None, to_timezone=None, naive=True):
(optional, default : server local timezone)
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
:param naive: Use naive datetime : do not handle timezone conversion before
formating and return datetime as UTC (because LDAP required a
formatting and return datetime as UTC (because LDAP required a
timezone)
"""
assert isinstance(

View file

@ -29,7 +29,7 @@ Mapping configuration
'join': '[glue]', # If present, sources values will be join using the "glue"
# Alternative mapping
'or': { [map configuration] } # If this mapping case does not retreive any value, try to
'or': { [map configuration] } # If this mapping case does not retrieve any value, try to
# get value(s) with this other mapping configuration
},
'[dst key 2]': {

View file

@ -41,7 +41,7 @@ class MyDB(DB):
)
except Error as err:
log.fatal(
"An error occured during MySQL database connection (%s@%s:%s).",
"An error occurred during MySQL database connection (%s@%s:%s).",
self._user,
self._host,
self._db,

View file

@ -11,6 +11,7 @@ week_days = ["lundi", "mardi", "mercredi", "jeudi", "vendredi", "samedi", "diman
date_format = "%d/%m/%Y"
date_pattern = re.compile("^([0-9]{2})/([0-9]{2})/([0-9]{4})$")
time_pattern = re.compile("^([0-9]{1,2})h([0-9]{2})?$")
_nonworking_french_public_days_of_the_year_cache = {}
def easter_date(year):
@ -37,23 +38,25 @@ def nonworking_french_public_days_of_the_year(year=None):
"""Compute dict of nonworking french public days for the specified year"""
if year is None:
year = datetime.date.today().year
dp = easter_date(year)
return {
"1janvier": datetime.date(year, 1, 1),
"paques": dp,
"lundi_paques": (dp + datetime.timedelta(1)),
"1mai": datetime.date(year, 5, 1),
"8mai": datetime.date(year, 5, 8),
"jeudi_ascension": (dp + datetime.timedelta(39)),
"pentecote": (dp + datetime.timedelta(49)),
"lundi_pentecote": (dp + datetime.timedelta(50)),
"14juillet": datetime.date(year, 7, 14),
"15aout": datetime.date(year, 8, 15),
"1novembre": datetime.date(year, 11, 1),
"11novembre": datetime.date(year, 11, 11),
"noel": datetime.date(year, 12, 25),
"saint_etienne": datetime.date(year, 12, 26),
}
if year not in _nonworking_french_public_days_of_the_year_cache:
dp = easter_date(year)
_nonworking_french_public_days_of_the_year_cache[year] = {
"1janvier": datetime.date(year, 1, 1),
"paques": dp,
"lundi_paques": (dp + datetime.timedelta(1)),
"1mai": datetime.date(year, 5, 1),
"8mai": datetime.date(year, 5, 8),
"jeudi_ascension": (dp + datetime.timedelta(39)),
"pentecote": (dp + datetime.timedelta(49)),
"lundi_pentecote": (dp + datetime.timedelta(50)),
"14juillet": datetime.date(year, 7, 14),
"15aout": datetime.date(year, 8, 15),
"1novembre": datetime.date(year, 11, 1),
"11novembre": datetime.date(year, 11, 11),
"noel": datetime.date(year, 12, 25),
"saint_etienne": datetime.date(year, 12, 26),
}
return _nonworking_french_public_days_of_the_year_cache[year]
def parse_exceptional_closures(values):
@ -155,7 +158,153 @@ def parse_normal_opening_hours(values):
if not days and not hours_periods:
raise ValueError(f'No days or hours period found in this value: "{value}"')
normal_opening_hours.append({"days": days, "hours_periods": hours_periods})
return normal_opening_hours
for idx, noh in enumerate(normal_opening_hours):
normal_opening_hours[idx]["hours_periods"] = sorted_hours_periods(noh["hours_periods"])
return sorted_opening_hours(normal_opening_hours)
def sorted_hours_periods(hours_periods):
"""Sort hours periods"""
return sorted(hours_periods, key=lambda hp: (hp["start"], hp["stop"]))
def sorted_opening_hours(opening_hours):
"""Sort opening hours"""
return sorted(
opening_hours,
key=lambda x: (
week_days.index(x["days"][0]) if x["days"] else None,
x["hours_periods"][0]["start"] if x["hours_periods"] else datetime.datetime.min.time(),
x["hours_periods"][0]["stop"] if x["hours_periods"] else datetime.datetime.max.time(),
),
)
def its_nonworking_day(nonworking_public_holidays_values, date=None):
"""Check if is a non-working day"""
if not nonworking_public_holidays_values:
return False
date = date if date else datetime.date.today()
log.debug("its_nonworking_day(%s): values=%s", date, nonworking_public_holidays_values)
nonworking_days = nonworking_french_public_days_of_the_year(year=date.year)
for day in nonworking_public_holidays_values:
if day in nonworking_days and nonworking_days[day] == date:
log.debug("its_nonworking_day(%s): %s", date, day)
return True
return False
def its_exceptionally_closed(exceptional_closures_values, when=None, parse=True, all_day=False):
"""Check if it's exceptionally closed"""
if not exceptional_closures_values:
return False
when = when if when else datetime.datetime.now()
assert isinstance(when, (datetime.date, datetime.datetime))
when_date = when.date() if isinstance(when, datetime.datetime) else when
exceptional_closures = (
parse_exceptional_closures(exceptional_closures_values)
if parse
else exceptional_closures_values
)
log.debug("its_exceptionally_closed(%s): exceptional closures=%s", when, exceptional_closures)
for cl in exceptional_closures:
if when_date not in cl["days"]:
log.debug(
"its_exceptionally_closed(%s): %s not in days (%s)", when, when_date, cl["days"]
)
continue
if not cl["hours_periods"]:
# All day exceptional closure
return True
if all_day:
# Wanted an all day closure, ignore it
continue
for hp in cl["hours_periods"]:
if hp["start"] <= when.time() <= hp["stop"]:
return True
return False
def get_exceptional_closures_hours(exceptional_closures_values, date=None, parse=True):
"""Get exceptional closures hours of the day"""
if not exceptional_closures_values:
return []
date = date if date else datetime.date.today()
exceptional_closures = (
parse_exceptional_closures(exceptional_closures_values)
if parse
else exceptional_closures_values
)
log.debug(
"get_exceptional_closures_hours(%s): exceptional closures=%s", date, exceptional_closures
)
exceptional_closures_hours = []
for cl in exceptional_closures:
if date not in cl["days"]:
log.debug("get_exceptional_closures_hours(%s): not in days (%s)", date, cl["days"])
continue
if not cl["hours_periods"]:
log.debug(
"get_exceptional_closures_hours(%s): it's exceptionally closed all the day", date
)
return [
{
"start": datetime.datetime.min.time(),
"stop": datetime.datetime.max.time(),
}
]
exceptional_closures_hours.extend(cl["hours_periods"])
log.debug(
"get_exceptional_closures_hours(%s): exceptional closures hours=%s",
date,
exceptional_closures_hours,
)
return sorted_hours_periods(exceptional_closures_hours)
def its_normally_open(normal_opening_hours_values, when=None, parse=True, ignore_time=False):
"""Check if it's normally open"""
when = when if when else datetime.datetime.now()
if not normal_opening_hours_values:
log.debug(
"its_normally_open(%s): no normal opening hours defined, consider as opened", when
)
return True
when_weekday = week_days[when.timetuple().tm_wday]
log.debug("its_normally_open(%s): week day=%s", when, when_weekday)
normal_opening_hours = (
parse_normal_opening_hours(normal_opening_hours_values)
if parse
else normal_opening_hours_values
)
log.debug("its_normally_open(%s): normal opening hours=%s", when, normal_opening_hours)
for oh in normal_opening_hours:
if oh["days"] and when_weekday not in oh["days"]:
log.debug("its_normally_open(%s): %s not in days (%s)", when, when_weekday, oh["days"])
continue
if not oh["hours_periods"] or ignore_time:
return True
for hp in oh["hours_periods"]:
if hp["start"] <= when.time() <= hp["stop"]:
return True
log.debug("its_normally_open(%s): not in normal opening hours", when)
return False
def its_opening_day(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
parse=True,
):
"""Check if it's an opening day"""
date = date if date else datetime.date.today()
if its_nonworking_day(nonworking_public_holidays_values, date=date):
return False
if its_exceptionally_closed(exceptional_closures_values, when=date, all_day=True, parse=parse):
return False
return its_normally_open(normal_opening_hours_values, when=date, parse=parse, ignore_time=True)
def is_closed(
@ -193,76 +342,578 @@ def is_closed(
when_time,
when_weekday,
)
if nonworking_public_holidays_values:
log.debug("Nonworking public holidays: %s", nonworking_public_holidays_values)
nonworking_days = nonworking_french_public_days_of_the_year()
for day in nonworking_public_holidays_values:
if day in nonworking_days and when_date == nonworking_days[day]:
log.debug("Non working day: %s", day)
return {
"closed": True,
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
}
# Handle non-working days
if its_nonworking_day(nonworking_public_holidays_values, date=when_date):
return {
"closed": True,
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
}
if exceptional_closures_values:
# Handle exceptional closures
try:
if its_exceptionally_closed(exceptional_closures_values, when=when):
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": its_exceptionally_closed(
exceptional_closures_values, when=when, all_day=True
),
}
except ValueError as e:
if on_error_result is None:
log.error("Fail to parse exceptional closures", exc_info=True)
raise e from e
log.error("Fail to parse exceptional closures, consider as %s", on_error, exc_info=True)
return on_error_result
# Finally, handle normal opening hours
try:
return {
"closed": not its_normally_open(normal_opening_hours_values, when=when),
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
except ValueError as e: # pylint: disable=broad-except
if on_error_result is None:
log.error("Fail to parse normal opening hours", exc_info=True)
raise e from e
log.error("Fail to parse normal opening hours, consider as %s", on_error, exc_info=True)
return on_error_result
def next_opening_date(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the next opening day"""
date = date if date else datetime.date.today()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
exceptional_closures = parse_exceptional_closures(exceptional_closures_values)
log.debug("Exceptional closures: %s", exceptional_closures)
except ValueError as e:
log.error("Fail to parse exceptional closures, consider as closed", exc_info=True)
if on_error_result is None:
raise e from e
return on_error_result
for cl in exceptional_closures:
if when_date not in cl["days"]:
log.debug("when_date (%s) no in days (%s)", when_date, cl["days"])
continue
if not cl["hours_periods"]:
# All day exceptional closure
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": True,
}
for hp in cl["hours_periods"]:
if hp["start"] <= when_time <= hp["stop"]:
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": False,
}
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"next_opening_date(%s): fail to parse normal opening hours or exceptional closures",
date,
exc_info=True,
)
return False
added_days = 0
while added_days <= max_anaylse_days:
test_date = date + datetime.timedelta(days=added_days)
if its_opening_day(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=test_date,
parse=False,
):
return test_date
added_days += 1
log.debug(
"next_opening_date(%s): no opening day found in the next %d days", date, max_anaylse_days
)
return False
if normal_opening_hours_values:
def next_opening_hour(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
when=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the next opening hour"""
when = when if when else datetime.datetime.now()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
normal_opening_hours = parse_normal_opening_hours(normal_opening_hours_values)
log.debug("Normal opening hours: %s", normal_opening_hours)
except ValueError as e: # pylint: disable=broad-except
log.error("Fail to parse normal opening hours, consider as closed", exc_info=True)
if on_error_result is None:
raise e from e
return on_error_result
for oh in normal_opening_hours:
if oh["days"] and when_weekday not in oh["days"]:
log.debug("when_weekday (%s) no in days (%s)", when_weekday, oh["days"])
continue
if not oh["hours_periods"]:
# All day opened
return {
"closed": False,
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
for hp in oh["hours_periods"]:
if hp["start"] <= when_time <= hp["stop"]:
return {
"closed": False,
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
log.debug("Not in normal opening hours => closed")
return {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"next_opening_hour(%s): fail to parse normal opening hours or exceptional closures",
when,
exc_info=True,
)
return False
date = next_opening_date(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=when.date(),
max_anaylse_days=max_anaylse_days,
parse=False,
)
if not date:
log.debug(
"next_opening_hour(%s): no opening day found in the next %d days",
when,
max_anaylse_days,
)
return False
log.debug("next_opening_hour(%s): next opening date=%s", when, date)
weekday = week_days[date.timetuple().tm_wday]
log.debug("next_opening_hour(%s): next opening week day=%s", when, weekday)
exceptional_closures_hours = get_exceptional_closures_hours(
exceptional_closures_values, date=date, parse=False
)
log.debug(
"next_opening_hour(%s): next opening day exceptional closures hours=%s",
when,
exceptional_closures_hours,
)
next_opening_datetime = None
exceptionally_closed = False
exceptionally_closed_all_day = False
in_opening_hours = date != when.date()
for oh in normal_opening_hours_values:
if exceptionally_closed_all_day:
break
# Not a nonworking day, not during exceptional closure and no normal opening
# hours defined => Opened
return {"closed": False, "exceptional_closure": False, "exceptional_closure_all_day": False}
if oh["days"] and weekday not in oh["days"]:
log.debug("next_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
continue
log.debug(
"next_opening_hour(%s): %s in days (%s), handle opening hours %s",
when,
weekday,
oh["days"],
oh["hours_periods"],
)
if not oh["hours_periods"]:
log.debug(
"next_opening_hour(%s): %s is an all day opening day, handle exceptional closures "
"hours %s to find the minimal opening time",
when,
weekday,
exceptional_closures_hours,
)
if date == when.date():
in_opening_hours = True
test_time = when.time() if when.date() == date else datetime.datetime.min.time()
for cl in exceptional_closures_hours:
if cl["start"] <= test_time < cl["stop"]:
if cl["stop"] >= datetime.datetime.max.time():
exceptionally_closed = True
exceptionally_closed_all_day = True
next_opening_datetime = None
break
test_time = cl["stop"]
else:
break
if not exceptionally_closed_all_day:
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
next_opening_datetime = (
candidate_next_opening_datetime
if not next_opening_datetime
or candidate_next_opening_datetime < next_opening_datetime
else next_opening_datetime
)
continue
log.debug(
"next_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
"minimal starting time",
when,
oh["hours_periods"],
weekday,
)
test_time = datetime.datetime.max.time()
for hp in oh["hours_periods"]:
if date == when.date() and hp["stop"] < when.time():
log.debug(
"next_opening_hour(%s): ignore opening hours %s before specified when time %s",
when,
hp,
when.time(),
)
continue
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
in_opening_hours = True
if exceptional_closures_hours:
log.debug(
"next_opening_hour(%s): check if opening hours %s match with exceptional "
"closure hours %s",
when,
hp,
exceptional_closures_hours,
)
for cl in exceptional_closures_hours:
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
log.debug(
"next_opening_hour(%s): opening hour %s is included in exceptional "
"closure hours %s",
when,
hp,
cl,
)
exceptionally_closed = True
break
if hp["start"] < cl["start"]:
log.debug(
"next_opening_hour(%s): opening hour %s start before closure hours %s",
when,
hp,
cl,
)
test_time = hp["start"] if hp["start"] < test_time else test_time
elif cl["stop"] >= hp["start"] and cl["stop"] < hp["stop"]:
log.debug(
"next_opening_hour(%s): opening hour %s end after closure hours %s",
when,
hp,
cl,
)
test_time = cl["stop"] if cl["stop"] < test_time else test_time
elif hp["start"] < test_time:
log.debug(
"next_opening_hour(%s): no exceptional closure hours, use opening hours start "
"time %s",
when,
hp["start"],
)
test_time = hp["start"]
if test_time < datetime.datetime.max.time():
if date == when.date() and test_time < when.time():
test_time = when.time()
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
next_opening_datetime = (
candidate_next_opening_datetime
if not next_opening_datetime
or candidate_next_opening_datetime < next_opening_datetime
else next_opening_datetime
)
if not next_opening_datetime and (
exceptionally_closed or (date == when.date() and not in_opening_hours)
):
new_max_anaylse_days = max_anaylse_days - (date - when.date()).days
if new_max_anaylse_days > 0:
log.debug(
"next_opening_hour(%s): exceptionally closed on %s, try on following %d days",
when,
date,
new_max_anaylse_days,
)
next_opening_datetime = next_opening_hour(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
when=datetime.datetime.combine(
date + datetime.timedelta(days=1), datetime.datetime.min.time()
),
max_anaylse_days=new_max_anaylse_days,
parse=False,
)
if not next_opening_datetime:
log.debug(
"next_opening_hour(%s): no opening hours found in next %d days", when, max_anaylse_days
)
return False
log.debug("next_opening_hour(%s): next opening hours=%s", when, next_opening_datetime)
return next_opening_datetime
def previous_opening_date(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the previous opening day"""
date = date if date else datetime.date.today()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"previous_opening_date(%s): fail to parse normal opening hours or exceptional "
"closures",
date,
exc_info=True,
)
return False
days = 0
while days <= max_anaylse_days:
test_date = date - datetime.timedelta(days=days)
if its_opening_day(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=test_date,
parse=False,
):
return test_date
days += 1
log.debug(
"previous_opening_date(%s): no opening day found in the next %d days",
date,
max_anaylse_days,
)
return False
def previous_opening_hour(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
when=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the previous opening hour"""
when = when if when else datetime.datetime.now()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"previous_opening_hour(%s): fail to parse normal opening hours or exceptional "
"closures",
when,
exc_info=True,
)
return False
date = previous_opening_date(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=when.date(),
max_anaylse_days=max_anaylse_days,
parse=False,
)
if not date:
log.debug(
"previous_opening_hour(%s): no opening day found in the previous %d days",
when,
max_anaylse_days,
)
return False
log.debug("previous_opening_hour(%s): previous opening date=%s", when, date)
weekday = week_days[date.timetuple().tm_wday]
log.debug("previous_opening_hour(%s): previous opening week day=%s", when, weekday)
exceptional_closures_hours = get_exceptional_closures_hours(
exceptional_closures_values, date=date, parse=False
)
log.debug(
"previous_opening_hour(%s): previous opening day exceptional closures hours=%s",
when,
exceptional_closures_hours,
)
previous_opening_datetime = None
exceptionally_closed = False
exceptionally_closed_all_day = False
in_opening_hours = date != when.date()
for oh in reversed(normal_opening_hours_values):
if exceptionally_closed_all_day:
break
if oh["days"] and weekday not in oh["days"]:
log.debug("previous_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
continue
log.debug(
"previous_opening_hour(%s): %s in days (%s), handle opening hours %s",
when,
weekday,
oh["days"],
oh["hours_periods"],
)
if not oh["hours_periods"]:
log.debug(
"previous_opening_hour(%s): %s is an all day opening day, handle exceptional "
"closures hours %s to find the maximal opening time",
when,
weekday,
exceptional_closures_hours,
)
if date == when.date():
in_opening_hours = True
test_time = when.time() if when.date() == date else datetime.datetime.max.time()
for cl in exceptional_closures_hours:
if cl["start"] <= test_time < cl["stop"]:
if cl["start"] <= datetime.datetime.min.time():
exceptionally_closed = True
exceptionally_closed_all_day = True
previous_opening_datetime = None
break
test_time = cl["start"]
else:
break
if not exceptionally_closed_all_day:
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
previous_opening_datetime = (
candidate_previous_opening_datetime
if not previous_opening_datetime
or candidate_previous_opening_datetime > previous_opening_datetime
else previous_opening_datetime
)
continue
log.debug(
"previous_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
"maximal opening time",
when,
oh["hours_periods"],
weekday,
)
test_time = datetime.datetime.min.time()
for hp in reversed(oh["hours_periods"]):
if date == when.date() and hp["start"] > when.time():
log.debug(
"previous_opening_hour(%s): ignore opening hours %s starting before specified "
"when time %s",
when,
hp,
when.time(),
)
continue
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
in_opening_hours = True
if exceptional_closures_hours:
log.debug(
"previous_opening_hour(%s): check if opening hours %s match with exceptional "
"closure hours %s",
when,
hp,
exceptional_closures_hours,
)
for cl in reversed(exceptional_closures_hours):
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s is included in exceptional "
"closure hours %s",
when,
hp,
cl,
)
exceptionally_closed = True
break
if cl["stop"] < hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s end after closure hours %s",
when,
hp,
cl,
)
test_time = hp["stop"] if hp["stop"] > test_time else test_time
elif cl["start"] > hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s start before closure hours "
"%s",
when,
hp,
cl,
)
test_time = hp["stop"] if hp["stop"] > test_time else test_time
elif cl["stop"] >= hp["stop"] and cl["start"] > hp["start"]:
log.debug(
"previous_opening_hour(%s): opening hour %s start before closure hours "
"%s",
when,
hp,
cl,
)
test_time = cl["start"] if cl["start"] > test_time else test_time
elif hp["stop"] > test_time:
log.debug(
"previous_opening_hour(%s): no exceptional closure hours, use opening hours "
"stop time %s",
when,
hp["stop"],
)
test_time = hp["stop"]
if test_time > datetime.datetime.min.time():
if date == when.date() and test_time > when.time():
test_time = when.time()
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
previous_opening_datetime = (
candidate_previous_opening_datetime
if not previous_opening_datetime
or candidate_previous_opening_datetime > previous_opening_datetime
else previous_opening_datetime
)
if not previous_opening_datetime and (
exceptionally_closed or (date == when.date() and not in_opening_hours)
):
new_max_anaylse_days = max_anaylse_days - (when.date() - date).days
if new_max_anaylse_days > 0:
log.debug(
"previous_opening_hour(%s): exceptionally closed on %s, try on previous %d days",
when,
date,
new_max_anaylse_days,
)
previous_opening_datetime = previous_opening_hour(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
when=datetime.datetime.combine(
date - datetime.timedelta(days=1), datetime.datetime.max.time()
),
max_anaylse_days=new_max_anaylse_days,
parse=False,
)
if not previous_opening_datetime:
log.debug(
"previous_opening_hour(%s): no opening hours found in previous %d days",
when,
max_anaylse_days,
)
return False
log.debug(
"previous_opening_hour(%s): previous opening hours=%s", when, previous_opening_datetime
)
return previous_opening_datetime

View file

@ -31,7 +31,7 @@ class OracleDB(DB):
self._conn = cx_Oracle.connect(user=self._user, password=self._pwd, dsn=self._dsn)
except cx_Oracle.Error as err:
log.fatal(
"An error occured during Oracle database connection (%s@%s).",
"An error occurred during Oracle database connection (%s@%s).",
self._user,
self._dsn,
exc_info=1,

View file

@ -5,6 +5,7 @@ import logging
import sys
import psycopg2
from psycopg2.extras import RealDictCursor
from mylib.db import DB, DBFailToConnect
@ -44,7 +45,7 @@ class PgDB(DB):
)
except psycopg2.Error as err:
log.fatal(
"An error occured during Postgresql database connection (%s@%s, database=%s).",
"An error occurred during Postgresql database connection (%s@%s, database=%s).",
self._user,
self._host,
self._db,
@ -70,7 +71,7 @@ class PgDB(DB):
return True
except psycopg2.Error:
log.error(
'An error occured setting Postgresql database connection encoding to "%s"',
'An error occurred setting Postgresql database connection encoding to "%s"',
enc,
exc_info=1,
)
@ -114,22 +115,18 @@ class PgDB(DB):
:return: List of selected rows as dict on success, False otherwise
:rtype: list, bool
"""
cursor = self._conn.cursor()
cursor = self._conn.cursor(cursor_factory=RealDictCursor)
try:
self._log_query(sql, params)
cursor.execute(sql, params)
results = cursor.fetchall()
return results
return list(map(dict, results))
except psycopg2.Error:
self._log_query_exception(sql, params)
return False
@staticmethod
def _map_row_fields_by_index(fields, row):
return {field: row[idx] for idx, field in enumerate(fields)}
#
# Depreated helpers
# Deprecated helpers
#
@classmethod

View file

@ -27,9 +27,18 @@ class Report(ConfigurableObject): # pylint: disable=useless-object-inheritance
formatter = None
email_client = None
def __init__(self, email_client=None, initialize=True, **kwargs):
def __init__(
self,
email_client=None,
add_logging_handler=False,
send_at_exit=None,
initialize=True,
**kwargs,
):
super().__init__(**kwargs)
self.email_client = email_client
self.add_logging_handler = add_logging_handler
self._send_at_exit = send_at_exit
self._attachment_files = []
self._attachment_payloads = []
@ -81,8 +90,13 @@ class Report(ConfigurableObject): # pylint: disable=useless-object-inheritance
self.formatter = logging.Formatter(self._get_option("logformat"))
self.handler.setFormatter(self.formatter)
if self.add_logging_handler:
logging.getLogger().addHandler(self.handler)
if self._send_at_exit:
self.send_at_exit()
def get_handler(self):
"""Retreive logging handler"""
"""Retrieve logging handler"""
return self.handler
def write(self, msg):

View file

@ -31,7 +31,7 @@ def init_logging(options, name, report=None):
def get_default_opt_value(config, default_config, key):
"""Retreive default option value from config or default config dictionaries"""
"""Retrieve default option value from config or default config dictionaries"""
if config and key in config:
return config[key]
return default_config.get(key)

View file

@ -47,7 +47,7 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
sftp.connect()
atexit.register(sftp.close)
log.debug("Create tempory file")
log.debug("Create temporary file")
test_content = b"Juste un test."
tmp_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
tmp_file = os.path.join(

View file

@ -116,13 +116,13 @@ class SFTPClient(ConfigurableObject):
if self.initial_directory:
log.debug("Initial remote directory: '%s'", self.initial_directory)
else:
log.debug("Fail to retreive remote directory, use empty string instead")
log.debug("Fail to retrieve remote directory, use empty string instead")
self.initial_directory = ""
def get_file(self, remote_filepath, local_filepath):
"""Retrieve a file from SFTP server"""
self.connect()
log.debug("Retreive file '%s' to '%s'", remote_filepath, local_filepath)
log.debug("Retrieve file '%s' to '%s'", remote_filepath, local_filepath)
return self.sftp_client.get(remote_filepath, local_filepath) is None
def open_file(self, remote_filepath, mode="r"):

View file

@ -35,7 +35,7 @@ class TelltaleFile:
@property
def last_update(self):
"""Retreive last update datetime of the telltall file"""
"""Retrieve last update datetime of the telltall file"""
try:
return datetime.datetime.fromtimestamp(os.stat(self.filepath).st_mtime)
except FileNotFoundError:

View file

@ -45,10 +45,14 @@ for extra, deps in extras_require.items():
version = "0.1"
with open("README.md", encoding="utf-8") as fd:
long_description = fd.read()
setup(
name="mylib",
version=version,
description="A set of helpers small libs to make common tasks easier in my script development",
long_description=long_description,
classifiers=[
"Programming Language :: Python",
],
@ -59,6 +63,13 @@ setup(
url="https://gogs.zionetrix.net/bn8/python-mylib",
packages=find_packages(),
include_package_data=True,
package_data={
"": [
"scripts/email_templates/*.subject",
"scripts/email_templates/*.txt",
"scripts/email_templates/*.html",
],
},
zip_safe=False,
entry_points={
"console_scripts": [

View file

@ -32,7 +32,7 @@ do
set -x
;;
*)
usage "Unkown parameter '$OPT'"
usage "Unknown parameter '$OPT'"
esac
let idx=idx+1
done

View file

@ -10,7 +10,7 @@ import pytest
from mylib.config import BooleanOption, Config, ConfigSection, StringOption
runned = {}
tested = {}
def test_config_init_default_args():
@ -58,24 +58,24 @@ def test_add_section_with_callback():
config = Config("Test app")
name = "test_section"
global runned
runned["test_add_section_with_callback"] = False
global tested
tested["test_add_section_with_callback"] = False
def test_callback(loaded_config):
global runned
global tested
assert loaded_config == config
assert runned["test_add_section_with_callback"] is False
runned["test_add_section_with_callback"] = True
assert tested["test_add_section_with_callback"] is False
tested["test_add_section_with_callback"] = True
section = config.add_section(name, loaded_callback=test_callback)
assert isinstance(section, ConfigSection)
assert test_callback in config._loaded_callbacks
assert runned["test_add_section_with_callback"] is False
assert tested["test_add_section_with_callback"] is False
config.parse_arguments_options(argv=[], create=False)
assert runned["test_add_section_with_callback"] is True
assert tested["test_add_section_with_callback"] is True
assert test_callback in config._loaded_callbacks_executed
# Try to execute again to verify callback is not runned again
# Try to execute again to verify callback is not tested again
config._loaded()
@ -84,21 +84,21 @@ def test_add_section_with_callback_already_loaded():
name = "test_section"
config.parse_arguments_options(argv=[], create=False)
global runned
runned["test_add_section_with_callback_already_loaded"] = False
global tested
tested["test_add_section_with_callback_already_loaded"] = False
def test_callback(loaded_config):
global runned
global tested
assert loaded_config == config
assert runned["test_add_section_with_callback_already_loaded"] is False
runned["test_add_section_with_callback_already_loaded"] = True
assert tested["test_add_section_with_callback_already_loaded"] is False
tested["test_add_section_with_callback_already_loaded"] = True
section = config.add_section(name, loaded_callback=test_callback)
assert isinstance(section, ConfigSection)
assert runned["test_add_section_with_callback_already_loaded"] is True
assert tested["test_add_section_with_callback_already_loaded"] is True
assert test_callback in config._loaded_callbacks
assert test_callback in config._loaded_callbacks_executed
# Try to execute again to verify callback is not runned again
# Try to execute again to verify callback is not tested again
config._loaded()
@ -126,14 +126,14 @@ def test_add_option_custom_args():
section = config.add_section("my_section")
assert isinstance(section, ConfigSection)
name = "my_option"
kwargs = dict(
default="default value",
comment="my comment",
no_arg=True,
arg="--my-option",
short_arg="-M",
arg_help="My help",
)
kwargs = {
"default": "default value",
"comment": "my comment",
"no_arg": True,
"arg": "--my-option",
"short_arg": "-M",
"arg_help": "My help",
}
option = section.add_option(StringOption, name, **kwargs)
assert isinstance(option, StringOption)
assert name in section.options and section.options[name] == option

View file

@ -74,9 +74,14 @@ class FakeMySQLdb:
just_try = False
def __init__(self, **kwargs):
allowed_kwargs = dict(
db=str, user=str, passwd=(str, None), host=str, charset=str, use_unicode=bool
)
allowed_kwargs = {
"db": str,
"user": str,
"passwd": (str, None),
"host": str,
"charset": str,
"use_unicode": bool,
}
for arg, value in kwargs.items():
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
assert isinstance(
@ -200,21 +205,23 @@ mock_doSelect_just_try = mock_doSQL_just_try
def test_combine_params_with_to_add_parameter():
assert MyDB._combine_params(dict(test1=1), dict(test2=2)) == dict(test1=1, test2=2)
assert MyDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs():
assert MyDB._combine_params(dict(test1=1), test2=2) == dict(test1=1, test2=2)
assert MyDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs_and_to_add_parameter():
assert MyDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
test1=1, test2=2, test3=3
)
assert MyDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == {
"test1": 1,
"test2": 2,
"test3": 3,
}
def test_format_where_clauses_params_are_preserved():
args = ("test = test", dict(test1=1))
args = ("test = test", {"test1": 1})
assert MyDB._format_where_clauses(*args) == args
@ -223,12 +230,12 @@ def test_format_where_clauses_raw():
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", dict(test1=1, test2=2))
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2})
assert MyDB._format_where_clauses(where_clauses) == where_clauses
def test_format_where_clauses_dict():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._format_where_clauses(where_clauses) == (
"`test1` = %(test1)s AND `test2` = %(test2)s",
where_clauses,
@ -236,15 +243,15 @@ def test_format_where_clauses_dict():
def test_format_where_clauses_combined_types():
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", dict(test2=2)), dict(test3=3, test4=4))
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4})
assert MyDB._format_where_clauses(where_clauses) == (
"test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s",
dict(test2=2, test3=3, test4=4),
{"test2": 2, "test3": 3, "test4": 4},
)
def test_format_where_clauses_with_where_op():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._format_where_clauses(where_clauses, where_op="OR") == (
"`test1` = %(test1)s OR `test2` = %(test2)s",
where_clauses,
@ -253,7 +260,7 @@ def test_format_where_clauses_with_where_op():
def test_add_where_clauses():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._add_where_clauses(sql, None, where_clauses) == (
sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s",
where_clauses,
@ -262,11 +269,11 @@ def test_add_where_clauses():
def test_add_where_clauses_preserved_params():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
params = dict(fake1=1)
where_clauses = {"test1": 1, "test2": 2}
params = {"fake1": 1}
assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == (
sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s",
dict(**where_clauses, **params),
{**where_clauses, **params},
)
@ -281,11 +288,11 @@ def test_add_where_clauses_with_op():
def test_add_where_clauses_with_duplicated_field():
sql = "UPDATE table SET test1=%(test1)s"
params = dict(test1="new_value")
where_clauses = dict(test1="where_value")
params = {"test1": "new_value"}
where_clauses = {"test1": "where_value"}
assert MyDB._add_where_clauses(sql, params, where_clauses) == (
sql + " WHERE `test1` = %(test1_1)s",
dict(test1="new_value", test1_1="where_value"),
{"test1": "new_value", "test1_1": "where_value"},
)
@ -295,7 +302,7 @@ def test_quote_table_name():
def test_insert(mocker, test_mydb):
values = dict(test1=1, test2=2)
values = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
@ -308,18 +315,18 @@ def test_insert(mocker, test_mydb):
def test_insert_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try)
assert test_mydb.insert("mytable", dict(test1=1, test2=2), just_try=True)
assert test_mydb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True)
def test_update(mocker, test_mydb):
values = dict(test1=1, test2=2)
where_clauses = dict(test3=3, test4=4)
values = {"test1": 1, "test2": 2}
where_clauses = {"test3": 3, "test4": 4}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
"UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` ="
" %(test3)s AND `test4` = %(test4)s",
dict(**values, **where_clauses),
{**values, **where_clauses},
),
)
@ -328,11 +335,11 @@ def test_update(mocker, test_mydb):
def test_update_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try)
assert test_mydb.update("mytable", dict(test1=1, test2=2), None, just_try=True)
assert test_mydb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True)
def test_delete(mocker, test_mydb):
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
@ -361,23 +368,27 @@ def test_truncate_just_try(mocker, test_mydb):
def test_select(mocker, test_mydb):
fields = ("field1", "field2")
where_clauses = dict(test3=3, test4=4)
where_clauses = {"test3": 3, "test4": 4}
expected_return = [
dict(field1=1, field2=2),
dict(field1=2, field2=3),
{"field1": 1, "field2": 2},
{"field1": 2, "field2": 3},
]
order_by = "field1, DESC"
limit = 10
mocker.patch(
"mylib.mysql.MyDB.doSelect",
generate_mock_doSQL(
"SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` ="
" %(test4)s ORDER BY " + order_by,
" %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608
where_clauses,
expected_return,
),
)
assert test_mydb.select("mytable", where_clauses, fields, order_by=order_by) == expected_return
assert (
test_mydb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit)
== expected_return
)
def test_select_without_field_and_order_by(mocker, test_mydb):
@ -397,14 +408,14 @@ def test_select_just_try(mocker, test_mydb):
def test_connect(mocker, test_mydb):
expected_kwargs = dict(
db=test_mydb._db,
user=test_mydb._user,
host=test_mydb._host,
passwd=test_mydb._pwd,
charset=test_mydb._charset,
use_unicode=True,
)
expected_kwargs = {
"db": test_mydb._db,
"user": test_mydb._user,
"host": test_mydb._host,
"passwd": test_mydb._pwd,
"charset": test_mydb._charset,
"use_unicode": True,
}
mocker.patch("MySQLdb.connect", generate_mock_args(expected_kwargs=expected_kwargs))
@ -421,7 +432,7 @@ def test_close_connected(fake_connected_mydb):
def test_doSQL(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s"
fake_connected_mydb._conn.expected_params = dict(test1=1)
fake_connected_mydb._conn.expected_params = {"test1": 1}
fake_connected_mydb.doSQL(
fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params
)
@ -443,8 +454,8 @@ def test_doSQL_on_exception(fake_connected_mydb):
def test_doSelect(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_mydb._conn.expected_params = dict(test1=1)
fake_connected_mydb._conn.expected_return = [dict(test1=1)]
fake_connected_mydb._conn.expected_params = {"test1": 1}
fake_connected_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_mydb.doSelect(
fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params
@ -455,7 +466,7 @@ def test_doSelect(fake_connected_mydb):
def test_doSelect_without_params(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "SELECT * FROM table"
fake_connected_mydb._conn.expected_return = [dict(test1=1)]
fake_connected_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql)
== fake_connected_mydb._conn.expected_return
@ -469,8 +480,8 @@ def test_doSelect_on_exception(fake_connected_mydb):
def test_doSelect_just_try(fake_connected_just_try_mydb):
fake_connected_just_try_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_just_try_mydb._conn.expected_params = dict(test1=1)
fake_connected_just_try_mydb._conn.expected_return = [dict(test1=1)]
fake_connected_just_try_mydb._conn.expected_params = {"test1": 1}
fake_connected_just_try_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_just_try_mydb.doSelect(
fake_connected_just_try_mydb._conn.expected_sql,

View file

@ -182,27 +182,96 @@ def test_parse_normal_opening_hours_multiple_periods():
]
#
# Tests on is_closed
#
def test_parse_normal_opening_hours_is_sorted():
assert opening_hours.parse_normal_opening_hours(
[
"samedi 9h30-18h",
"lundi-vendredi 14h-18h 9h30-12h30",
"samedi 9h30-12h",
"dimanche 9h30-12h",
]
) == [
{
"days": ["lundi", "mardi", "mercredi", "jeudi", "vendredi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
{"start": datetime.time(14, 0), "stop": datetime.time(18, 0)},
],
},
{
"days": ["samedi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
],
},
{
"days": ["samedi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(18, 0)},
],
},
{
"days": ["dimanche"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
],
},
]
exceptional_closures = [
"22/09/2017",
"20/09/2017-22/09/2017",
"20/09/2017-22/09/2017 18/09/2017",
"25/11/2017",
"26/11/2017 9h30-12h30",
]
#
# Tests on normal opening hours
#
normal_opening_hours = [
"lundi-mardi jeudi 9h30-12h30 14h-16h30",
"mercredi vendredi 9h30-12h30 14h-17h",
"samedi",
]
normally_opened_datetime = datetime.datetime(2024, 3, 1, 10, 15)
normally_opened_all_day_datetime = datetime.datetime(2024, 4, 6, 10, 15)
normally_closed_datetime = datetime.datetime(2017, 3, 1, 20, 15)
normally_closed_all_day_datetime = datetime.datetime(2024, 4, 7, 20, 15)
def test_its_normally_open():
assert opening_hours.its_normally_open(normal_opening_hours, when=normally_opened_datetime)
def test_its_normally_open_all_day():
assert opening_hours.its_normally_open(
normal_opening_hours, when=normally_opened_all_day_datetime
)
def test_its_normally_closed():
assert not opening_hours.its_normally_open(normal_opening_hours, when=normally_closed_datetime)
def test_its_normally_closed_all_day():
assert not opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_all_day_datetime
)
def test_its_normally_open_ignore_time():
assert opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_datetime.date(), ignore_time=True
)
def test_its_normally_closed_ignore_time():
assert not opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_all_day_datetime.date(), ignore_time=True
)
#
# Tests on non working days
#
nonworking_public_holidays = [
"1janvier",
"paques",
"lundi_paques",
"1mai",
"8mai",
"jeudi_ascension",
"lundi_pentecote",
@ -212,6 +281,120 @@ nonworking_public_holidays = [
"11novembre",
"noel",
]
nonworking_date = datetime.date(2017, 1, 1)
not_included_nonworking_date = datetime.date(2017, 5, 1)
not_nonworking_date = datetime.date(2017, 5, 2)
def test_its_nonworking_day():
assert (
opening_hours.its_nonworking_day(nonworking_public_holidays, date=nonworking_date) is True
)
def test_its_not_nonworking_day():
assert (
opening_hours.its_nonworking_day(
nonworking_public_holidays,
date=not_nonworking_date,
)
is False
)
def test_its_not_included_nonworking_day():
assert (
opening_hours.its_nonworking_day(
nonworking_public_holidays,
date=not_included_nonworking_date,
)
is False
)
#
# Tests in exceptional closures
#
exceptional_closures = [
"22/09/2017",
"20/09/2017-22/09/2017",
"20/09/2017-22/09/2017 18/09/2017",
"25/11/2017",
"26/11/2017 9h30-12h30",
"27/11/2017 17h-18h 9h30-12h30",
]
exceptional_closure_all_day_date = datetime.date(2017, 9, 22)
exceptional_closure_all_day_datetime = datetime.datetime.combine(
exceptional_closure_all_day_date, datetime.time(20, 15)
)
exceptional_closure_datetime = datetime.datetime(2017, 11, 26, 10, 30)
exceptional_closure_datetime_hours_period = {
"start": datetime.time(9, 30),
"stop": datetime.time(12, 30),
}
not_exceptional_closure_date = datetime.date(2019, 9, 22)
def test_its_exceptionally_closed():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_all_day_datetime
)
is True
)
def test_its_not_exceptionally_closed():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=not_exceptional_closure_date
)
is False
)
def test_its_exceptionally_closed_all_day():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_all_day_datetime, all_day=True
)
is True
)
def test_its_not_exceptionally_closed_all_day():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_datetime, all_day=True
)
is False
)
def test_get_exceptional_closures_hours():
assert opening_hours.get_exceptional_closures_hours(
exceptional_closures, date=exceptional_closure_datetime.date()
) == [exceptional_closure_datetime_hours_period]
def test_get_exceptional_closures_hours_all_day():
assert opening_hours.get_exceptional_closures_hours(
exceptional_closures, date=exceptional_closure_all_day_date
) == [{"start": datetime.datetime.min.time(), "stop": datetime.datetime.max.time()}]
def test_get_exceptional_closures_hours_is_sorted():
assert opening_hours.get_exceptional_closures_hours(
["27/11/2017 17h-18h 9h30-12h30"], date=datetime.date(2017, 11, 27)
) == [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
{"start": datetime.time(17, 0), "stop": datetime.time(18, 0)},
]
#
# Tests on is_closed
#
def test_is_closed_when_normaly_closed_by_hour():
@ -255,7 +438,7 @@ def test_is_closed_when_normaly_closed_by_day():
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2017, 5, 6, 14, 15),
when=datetime.datetime(2017, 5, 7, 14, 15),
) == {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
@ -300,3 +483,203 @@ def test_nonworking_french_public_days_of_the_year():
"noel": datetime.date(2021, 12, 25),
"saint_etienne": datetime.date(2021, 12, 26),
}
def test_next_opening_date():
assert opening_hours.next_opening_date(
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
date=datetime.date(2021, 4, 4),
) == datetime.date(2021, 4, 6)
def test_next_opening_hour():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 9, 30)
def test_next_opening_hour_with_exceptionnal_closure_hours():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 16, 0)
def test_next_opening_hour_with_exceptionnal_closure_day():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 7, 9, 0)
def test_next_opening_hour_with_overlapsed_opening_hours():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 8, 0)
def test_next_opening_hour_with_too_large_exceptionnal_closure_days():
assert (
opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021-16-04/2021"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
max_anaylse_days=10,
)
is False
)
def test_next_opening_hour_on_opened_moment():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 10, 30),
) == datetime.datetime(2021, 4, 6, 10, 30)
def test_next_opening_hour_on_same_day():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 13, 0),
) == datetime.datetime(2021, 4, 6, 14, 0)
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 16, 0),
) == datetime.datetime(2021, 4, 6, 16, 0)
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 16, 0),
) == datetime.datetime(2021, 4, 6, 16, 0)
def test_next_opening_hour_on_opened_day_but_too_late():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 23, 0),
) == datetime.datetime(2021, 4, 7, 9, 0)
def test_previous_opening_date():
assert opening_hours.previous_opening_date(
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
date=datetime.date(2024, 4, 1),
) == datetime.date(2024, 3, 29)
def test_previous_opening_hour():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 18, 0)
def test_previous_opening_hour_with_exceptionnal_closure_hours():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024 14h-18h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 12, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024 16h-18h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 16, 0)
def test_previous_opening_hour_with_exceptionnal_closure_day():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 28, 18, 0)
def test_previous_opening_hour_with_overlapsed_opening_hours():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 3, 8, 30),
) == datetime.datetime(2024, 4, 2, 19, 0)
def test_previous_opening_hour_with_too_large_exceptionnal_closure_days():
assert (
opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/03/2024-16-04/2024"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 17, 8, 30),
max_anaylse_days=10,
)
is False
)
def test_previous_opening_hour_on_opened_moment():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 10, 30),
) == datetime.datetime(2024, 4, 5, 10, 30)
def test_previous_opening_hour_on_same_day():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 13, 0),
) == datetime.datetime(2024, 4, 5, 12, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 16, 0),
) == datetime.datetime(2024, 4, 5, 16, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 16, 0),
) == datetime.datetime(2024, 4, 5, 16, 0)
def test_previous_opening_hour_on_opened_day_but_too_early():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 8, 0),
) == datetime.datetime(2024, 4, 4, 18, 0)

View file

@ -73,7 +73,7 @@ class FakeCXOracle:
just_try = False
def __init__(self, **kwargs):
allowed_kwargs = dict(dsn=str, user=str, password=(str, None))
allowed_kwargs = {"dsn": str, "user": str, "password": (str, None)}
for arg, value in kwargs.items():
assert arg in allowed_kwargs, f"Invalid arg {arg}='{value}'"
assert isinstance(
@ -197,21 +197,23 @@ mock_doSelect_just_try = mock_doSQL_just_try
def test_combine_params_with_to_add_parameter():
assert OracleDB._combine_params(dict(test1=1), dict(test2=2)) == dict(test1=1, test2=2)
assert OracleDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs():
assert OracleDB._combine_params(dict(test1=1), test2=2) == dict(test1=1, test2=2)
assert OracleDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs_and_to_add_parameter():
assert OracleDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
test1=1, test2=2, test3=3
)
assert OracleDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == {
"test1": 1,
"test2": 2,
"test3": 3,
}
def test_format_where_clauses_params_are_preserved():
args = ("test = test", dict(test1=1))
args = ("test = test", {"test1": 1})
assert OracleDB._format_where_clauses(*args) == args
@ -220,12 +222,12 @@ def test_format_where_clauses_raw():
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = ("test1 = :test1 AND test2 = :test2", dict(test1=1, test2=2))
where_clauses = ("test1 = :test1 AND test2 = :test2", {"test1": 1, "test2": 2})
assert OracleDB._format_where_clauses(where_clauses) == where_clauses
def test_format_where_clauses_dict():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert OracleDB._format_where_clauses(where_clauses) == (
'"test1" = :test1 AND "test2" = :test2',
where_clauses,
@ -233,15 +235,15 @@ def test_format_where_clauses_dict():
def test_format_where_clauses_combined_types():
where_clauses = ("test1 = 1", ("test2 LIKE :test2", dict(test2=2)), dict(test3=3, test4=4))
where_clauses = ("test1 = 1", ("test2 LIKE :test2", {"test2": 2}), {"test3": 3, "test4": 4})
assert OracleDB._format_where_clauses(where_clauses) == (
'test1 = 1 AND test2 LIKE :test2 AND "test3" = :test3 AND "test4" = :test4',
dict(test2=2, test3=3, test4=4),
{"test2": 2, "test3": 3, "test4": 4},
)
def test_format_where_clauses_with_where_op():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert OracleDB._format_where_clauses(where_clauses, where_op="OR") == (
'"test1" = :test1 OR "test2" = :test2',
where_clauses,
@ -250,7 +252,7 @@ def test_format_where_clauses_with_where_op():
def test_add_where_clauses():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert OracleDB._add_where_clauses(sql, None, where_clauses) == (
sql + ' WHERE "test1" = :test1 AND "test2" = :test2',
where_clauses,
@ -259,11 +261,11 @@ def test_add_where_clauses():
def test_add_where_clauses_preserved_params():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
params = dict(fake1=1)
where_clauses = {"test1": 1, "test2": 2}
params = {"fake1": 1}
assert OracleDB._add_where_clauses(sql, params.copy(), where_clauses) == (
sql + ' WHERE "test1" = :test1 AND "test2" = :test2',
dict(**where_clauses, **params),
{**where_clauses, **params},
)
@ -278,11 +280,11 @@ def test_add_where_clauses_with_op():
def test_add_where_clauses_with_duplicated_field():
sql = "UPDATE table SET test1=:test1"
params = dict(test1="new_value")
where_clauses = dict(test1="where_value")
params = {"test1": "new_value"}
where_clauses = {"test1": "where_value"}
assert OracleDB._add_where_clauses(sql, params, where_clauses) == (
sql + ' WHERE "test1" = :test1_1',
dict(test1="new_value", test1_1="where_value"),
{"test1": "new_value", "test1_1": "where_value"},
)
@ -292,7 +294,7 @@ def test_quote_table_name():
def test_insert(mocker, test_oracledb):
values = dict(test1=1, test2=2)
values = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.oracle.OracleDB.doSQL",
generate_mock_doSQL(
@ -305,18 +307,18 @@ def test_insert(mocker, test_oracledb):
def test_insert_just_try(mocker, test_oracledb):
mocker.patch("mylib.oracle.OracleDB.doSQL", mock_doSQL_just_try)
assert test_oracledb.insert("mytable", dict(test1=1, test2=2), just_try=True)
assert test_oracledb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True)
def test_update(mocker, test_oracledb):
values = dict(test1=1, test2=2)
where_clauses = dict(test3=3, test4=4)
values = {"test1": 1, "test2": 2}
where_clauses = {"test3": 3, "test4": 4}
mocker.patch(
"mylib.oracle.OracleDB.doSQL",
generate_mock_doSQL(
'UPDATE "mytable" SET "test1" = :test1, "test2" = :test2 WHERE "test3" = :test3 AND'
' "test4" = :test4',
dict(**values, **where_clauses),
{**values, **where_clauses},
),
)
@ -325,11 +327,11 @@ def test_update(mocker, test_oracledb):
def test_update_just_try(mocker, test_oracledb):
mocker.patch("mylib.oracle.OracleDB.doSQL", mock_doSQL_just_try)
assert test_oracledb.update("mytable", dict(test1=1, test2=2), None, just_try=True)
assert test_oracledb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True)
def test_delete(mocker, test_oracledb):
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.oracle.OracleDB.doSQL",
generate_mock_doSQL(
@ -360,24 +362,26 @@ def test_truncate_just_try(mocker, test_oracledb):
def test_select(mocker, test_oracledb):
fields = ("field1", "field2")
where_clauses = dict(test3=3, test4=4)
where_clauses = {"test3": 3, "test4": 4}
expected_return = [
dict(field1=1, field2=2),
dict(field1=2, field2=3),
{"field1": 1, "field2": 2},
{"field1": 2, "field2": 3},
]
order_by = "field1, DESC"
limit = 10
mocker.patch(
"mylib.oracle.OracleDB.doSelect",
generate_mock_doSQL(
'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = :test3 AND "test4" = :test4'
" ORDER BY " + order_by,
" ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608
where_clauses,
expected_return,
),
)
assert (
test_oracledb.select("mytable", where_clauses, fields, order_by=order_by) == expected_return
test_oracledb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit)
== expected_return
)
@ -398,9 +402,11 @@ def test_select_just_try(mocker, test_oracledb):
def test_connect(mocker, test_oracledb):
expected_kwargs = dict(
dsn=test_oracledb._dsn, user=test_oracledb._user, password=test_oracledb._pwd
)
expected_kwargs = {
"dsn": test_oracledb._dsn,
"user": test_oracledb._user,
"password": test_oracledb._pwd,
}
mocker.patch("cx_Oracle.connect", generate_mock_args(expected_kwargs=expected_kwargs))
@ -417,7 +423,7 @@ def test_close_connected(fake_connected_oracledb):
def test_doSQL(fake_connected_oracledb):
fake_connected_oracledb._conn.expected_sql = "DELETE FROM table WHERE test1 = :test1"
fake_connected_oracledb._conn.expected_params = dict(test1=1)
fake_connected_oracledb._conn.expected_params = {"test1": 1}
fake_connected_oracledb.doSQL(
fake_connected_oracledb._conn.expected_sql, fake_connected_oracledb._conn.expected_params
)
@ -439,8 +445,8 @@ def test_doSQL_on_exception(fake_connected_oracledb):
def test_doSelect(fake_connected_oracledb):
fake_connected_oracledb._conn.expected_sql = "SELECT * FROM table WHERE test1 = :test1"
fake_connected_oracledb._conn.expected_params = dict(test1=1)
fake_connected_oracledb._conn.expected_return = [dict(test1=1)]
fake_connected_oracledb._conn.expected_params = {"test1": 1}
fake_connected_oracledb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_oracledb.doSelect(
fake_connected_oracledb._conn.expected_sql,
@ -452,7 +458,7 @@ def test_doSelect(fake_connected_oracledb):
def test_doSelect_without_params(fake_connected_oracledb):
fake_connected_oracledb._conn.expected_sql = "SELECT * FROM table"
fake_connected_oracledb._conn.expected_return = [dict(test1=1)]
fake_connected_oracledb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_oracledb.doSelect(fake_connected_oracledb._conn.expected_sql)
== fake_connected_oracledb._conn.expected_return
@ -466,8 +472,8 @@ def test_doSelect_on_exception(fake_connected_oracledb):
def test_doSelect_just_try(fake_connected_just_try_oracledb):
fake_connected_just_try_oracledb._conn.expected_sql = "SELECT * FROM table WHERE test1 = :test1"
fake_connected_just_try_oracledb._conn.expected_params = dict(test1=1)
fake_connected_just_try_oracledb._conn.expected_return = [dict(test1=1)]
fake_connected_just_try_oracledb._conn.expected_params = {"test1": 1}
fake_connected_just_try_oracledb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_just_try_oracledb.doSelect(
fake_connected_just_try_oracledb._conn.expected_sql,

View file

@ -3,6 +3,7 @@
import psycopg2
import pytest
from psycopg2.extras import RealDictCursor
from mylib.pgsql import PgDB
@ -57,13 +58,14 @@ class FakePsycopg2:
expected_sql = None
expected_params = None
expected_cursor_factory = None
expected_return = True
expected_just_try = False
expected_exception = False
just_try = False
def __init__(self, **kwargs):
allowed_kwargs = dict(dbname=str, user=str, password=(str, None), host=str)
allowed_kwargs = {"dbname": str, "user": str, "password": (str, None), "host": str}
for arg, value in kwargs.items():
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
assert isinstance(
@ -81,7 +83,8 @@ class FakePsycopg2:
raise psycopg2.Error(f"set_client_encoding({arg[0]}): Expected exception")
return self.expected_return
def cursor(self):
def cursor(self, cursor_factory=None):
assert cursor_factory is self.expected_cursor_factory
return FakePsycopg2Cursor(
self.expected_sql,
self.expected_params,
@ -194,21 +197,23 @@ mock_doSelect_just_try = mock_doSQL_just_try
def test_combine_params_with_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2)) == dict(test1=1, test2=2)
assert PgDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs():
assert PgDB._combine_params(dict(test1=1), test2=2) == dict(test1=1, test2=2)
assert PgDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs_and_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
test1=1, test2=2, test3=3
)
assert PgDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == {
"test1": 1,
"test2": 2,
"test3": 3,
}
def test_format_where_clauses_params_are_preserved():
args = ("test = test", dict(test1=1))
args = ("test = test", {"test1": 1})
assert PgDB._format_where_clauses(*args) == args
@ -217,12 +222,12 @@ def test_format_where_clauses_raw():
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", dict(test1=1, test2=2))
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2})
assert PgDB._format_where_clauses(where_clauses) == where_clauses
def test_format_where_clauses_dict():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert PgDB._format_where_clauses(where_clauses) == (
'"test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses,
@ -230,15 +235,15 @@ def test_format_where_clauses_dict():
def test_format_where_clauses_combined_types():
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", dict(test2=2)), dict(test3=3, test4=4))
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4})
assert PgDB._format_where_clauses(where_clauses) == (
'test1 = 1 AND test2 LIKE %(test2)s AND "test3" = %(test3)s AND "test4" = %(test4)s',
dict(test2=2, test3=3, test4=4),
{"test2": 2, "test3": 3, "test4": 4},
)
def test_format_where_clauses_with_where_op():
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert PgDB._format_where_clauses(where_clauses, where_op="OR") == (
'"test1" = %(test1)s OR "test2" = %(test2)s',
where_clauses,
@ -247,7 +252,7 @@ def test_format_where_clauses_with_where_op():
def test_add_where_clauses():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
assert PgDB._add_where_clauses(sql, None, where_clauses) == (
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses,
@ -256,11 +261,11 @@ def test_add_where_clauses():
def test_add_where_clauses_preserved_params():
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
params = dict(fake1=1)
where_clauses = {"test1": 1, "test2": 2}
params = {"fake1": 1}
assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == (
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
dict(**where_clauses, **params),
{**where_clauses, **params},
)
@ -275,11 +280,11 @@ def test_add_where_clauses_with_op():
def test_add_where_clauses_with_duplicated_field():
sql = "UPDATE table SET test1=%(test1)s"
params = dict(test1="new_value")
where_clauses = dict(test1="where_value")
params = {"test1": "new_value"}
where_clauses = {"test1": "where_value"}
assert PgDB._add_where_clauses(sql, params, where_clauses) == (
sql + ' WHERE "test1" = %(test1_1)s',
dict(test1="new_value", test1_1="where_value"),
{"test1": "new_value", "test1_1": "where_value"},
)
@ -289,7 +294,7 @@ def test_quote_table_name():
def test_insert(mocker, test_pgdb):
values = dict(test1=1, test2=2)
values = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.pgsql.PgDB.doSQL",
generate_mock_doSQL(
@ -302,18 +307,18 @@ def test_insert(mocker, test_pgdb):
def test_insert_just_try(mocker, test_pgdb):
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try)
assert test_pgdb.insert("mytable", dict(test1=1, test2=2), just_try=True)
assert test_pgdb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True)
def test_update(mocker, test_pgdb):
values = dict(test1=1, test2=2)
where_clauses = dict(test3=3, test4=4)
values = {"test1": 1, "test2": 2}
where_clauses = {"test3": 3, "test4": 4}
mocker.patch(
"mylib.pgsql.PgDB.doSQL",
generate_mock_doSQL(
'UPDATE "mytable" SET "test1" = %(test1)s, "test2" = %(test2)s WHERE "test3" ='
' %(test3)s AND "test4" = %(test4)s',
dict(**values, **where_clauses),
{**values, **where_clauses},
),
)
@ -322,11 +327,11 @@ def test_update(mocker, test_pgdb):
def test_update_just_try(mocker, test_pgdb):
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try)
assert test_pgdb.update("mytable", dict(test1=1, test2=2), None, just_try=True)
assert test_pgdb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True)
def test_delete(mocker, test_pgdb):
where_clauses = dict(test1=1, test2=2)
where_clauses = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.pgsql.PgDB.doSQL",
generate_mock_doSQL(
@ -355,23 +360,27 @@ def test_truncate_just_try(mocker, test_pgdb):
def test_select(mocker, test_pgdb):
fields = ("field1", "field2")
where_clauses = dict(test3=3, test4=4)
where_clauses = {"test3": 3, "test4": 4}
expected_return = [
dict(field1=1, field2=2),
dict(field1=2, field2=3),
{"field1": 1, "field2": 2},
{"field1": 2, "field2": 3},
]
order_by = "field1, DESC"
limit = 10
mocker.patch(
"mylib.pgsql.PgDB.doSelect",
generate_mock_doSQL(
'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = %(test3)s AND "test4" ='
" %(test4)s ORDER BY " + order_by,
" %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608
where_clauses,
expected_return,
),
)
assert test_pgdb.select("mytable", where_clauses, fields, order_by=order_by) == expected_return
assert (
test_pgdb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit)
== expected_return
)
def test_select_without_field_and_order_by(mocker, test_pgdb):
@ -391,9 +400,12 @@ def test_select_just_try(mocker, test_pgdb):
def test_connect(mocker, test_pgdb):
expected_kwargs = dict(
dbname=test_pgdb._db, user=test_pgdb._user, host=test_pgdb._host, password=test_pgdb._pwd
)
expected_kwargs = {
"dbname": test_pgdb._db,
"user": test_pgdb._user,
"host": test_pgdb._host,
"password": test_pgdb._pwd,
}
mocker.patch("psycopg2.connect", generate_mock_args(expected_kwargs=expected_kwargs))
@ -423,7 +435,7 @@ def test_setEncoding_on_exception(fake_connected_pgdb):
def test_doSQL(fake_connected_pgdb):
fake_connected_pgdb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s"
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb._conn.expected_params = {"test1": 1}
fake_connected_pgdb.doSQL(
fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params
)
@ -445,8 +457,9 @@ def test_doSQL_on_exception(fake_connected_pgdb):
def test_doSelect(fake_connected_pgdb):
fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
fake_connected_pgdb._conn.expected_params = {"test1": 1}
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
fake_connected_pgdb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_pgdb.doSelect(
fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params
@ -457,7 +470,8 @@ def test_doSelect(fake_connected_pgdb):
def test_doSelect_without_params(fake_connected_pgdb):
fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table"
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
fake_connected_pgdb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql)
== fake_connected_pgdb._conn.expected_return
@ -465,14 +479,16 @@ def test_doSelect_without_params(fake_connected_pgdb):
def test_doSelect_on_exception(fake_connected_pgdb):
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
fake_connected_pgdb._conn.expected_exception = True
assert fake_connected_pgdb.doSelect("SELECT * FROM table") is False
def test_doSelect_just_try(fake_connected_just_try_pgdb):
fake_connected_just_try_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_just_try_pgdb._conn.expected_params = dict(test1=1)
fake_connected_just_try_pgdb._conn.expected_return = [dict(test1=1)]
fake_connected_just_try_pgdb._conn.expected_params = {"test1": 1}
fake_connected_just_try_pgdb._conn.expected_cursor_factory = RealDictCursor
fake_connected_just_try_pgdb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_just_try_pgdb.doSelect(
fake_connected_just_try_pgdb._conn.expected_sql,

View file

@ -25,12 +25,12 @@ def test_create_telltale_file(tmp_path):
def test_create_telltale_file_with_filepath_and_invalid_dirpath():
with pytest.raises(AssertionError):
TelltaleFile(filepath="/tmp/test", dirpath="/var/tmp")
TelltaleFile(filepath="/tmp/test", dirpath="/var/tmp") # nosec: B108
def test_create_telltale_file_with_filepath_and_invalid_filename():
with pytest.raises(AssertionError):
TelltaleFile(filepath="/tmp/test", filename="other")
TelltaleFile(filepath="/tmp/test", filename="other") # nosec: B108
def test_remove_telltale_file(tmp_path):