Compare commits

..

No commits in common. "master" and "2024.3.1" have entirely different histories.

15 changed files with 367 additions and 1325 deletions

View file

@ -32,7 +32,6 @@ jobs:
runs-on: docker
container:
image: docker.io/brenard/debian-python-deb:latest
needs: build
steps:
- name: Download Debian & Python packages files
uses: actions/download-artifact@v3
@ -68,7 +67,6 @@ jobs:
runs-on: docker
container:
image: docker.io/brenard/aptly-publish:latest
needs: build
steps:
- name: "Download Debian package files"
uses: actions/download-artifact@v3

View file

@ -5,10 +5,26 @@ jobs:
tests:
runs-on: docker
container:
image: docker.io/brenard/mylib:dev-master
options: "--workdir /src"
image: docker.io/brenard/python-pre-commit:latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
run: |
apt-get -qq update
apt-get -qq -y install --no-install-recommends \
build-essential \
python3 \
python3-dev \
libldap2-dev \
libsasl2-dev \
pkg-config \
libsystemd-dev \
libpq-dev \
libmariadb-dev
- name: Run tests.sh
run: ./tests.sh --no-venv
run: ./tests.sh

View file

@ -114,7 +114,6 @@ $GITDCH \
--release-notes ../../dist/release_notes.md \
--path ../../ \
--exclude "^CI: " \
--exclude "^Docker: " \
--exclude "^pre-commit: " \
--exclude "\.?woodpecker(\.yml)?" \
--exclude "build(\.sh)?" \

View file

@ -1,6 +1,5 @@
FROM brenard/mylib:latest
RUN apt-get remove -y python3-mylib && \
git clone https://gitea.zionetrix.net/bn8/python-mylib.git /src && \
pip install --break-system-packages /src[dev] && \
cd /src && \
pre-commit run --all-files
RUN apt-get remove -y python3-mylib
RUN python3 -m pip install -U git+https://gitea.zionetrix.net/bn8/python-mylib.git
RUN git clone https://gitea.zionetrix.net/bn8/python-mylib.git /usr/local/src/python-mylib && pip install /usr/local/src/python-mylib[dev]
RUN cd /usr/local/src/python-mylib && pre-commit run --all-files

View file

@ -1,26 +1,5 @@
FROM node:16-bookworm-slim
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && \
apt-get \
-o Acquire::AllowInsecureRepositories=true \
-o Acquire::AllowDowngradeToInsecureRepositories=true \
update && \
apt-get \
-o APT::Get::AllowUnauthenticated=true \
install --yes zionetrix-archive-keyring && \
apt-get update && \
apt-get upgrade -y && \
apt-get install -y \
python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git \
libldap2-dev libsasl2-dev \
pkg-config libsystemd-dev \
libpq-dev libmariadb-dev \
wget unzip && \
apt-get clean && \
rm -fr rm -rf /var/lib/apt/lists/*
RUN python3 -m pip install --break-system-packages pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
RUN wget --no-verbose \
-O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip \
https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && \
ldconfig
FROM debian:latest
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && apt-get -o Acquire::AllowInsecureRepositories=true -o Acquire::AllowDowngradeToInsecureRepositories=true update && apt-get -o APT::Get::AllowUnauthenticated=true install --yes zionetrix-archive-keyring && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get upgrade -y && apt-get install -y python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git libldap2-dev libsasl2-dev pkg-config libsystemd-dev libpq-dev libmariadb-dev wget unzip && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
RUN python3 -m pip install pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
RUN wget --no-verbose -O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && ldconfig

View file

@ -6,6 +6,7 @@ import argparse
import logging
import os
import re
import stat
import sys
import textwrap
import traceback
@ -23,7 +24,6 @@ log = logging.getLogger(__name__)
# Constants
DEFAULT_ENCODING = "utf-8"
DEFAULT_CONFIG_DIRPATH = os.path.expanduser("./")
DEFAULT_CONFIG_FILE_MODE = 0o600
DEFAULT_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
DEFAULT_CONSOLE_LOG_FORMAT = DEFAULT_LOG_FORMAT
DEFAULT_FILELOG_FORMAT = DEFAULT_LOG_FORMAT
@ -249,16 +249,12 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
Ask to user to enter value of this option and set or
return it regarding set parameter
"""
value = self._ask_value()
if set_it:
self.set(value)
return self.set(value)
return value
@ -514,12 +510,8 @@ class PasswordOption(StringOption):
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
Ask to user to enter value of this option and set or
return it regarding set parameter
"""
value = self._ask_value()
if set_it:
@ -538,7 +530,7 @@ class PasswordOption(StringOption):
use_keyring = False
else:
print("Invalid answer. Possible values: Y or N (case insensitive)")
self.set(value, use_keyring=use_keyring)
return self.set(value, use_keyring=use_keyring)
return value
@ -618,32 +610,28 @@ class ConfigSection:
:param set_it: If True (default), option value will be updated with user input
:return: a dict of configuration options and their value.
:return: If set_it is True, return True if valid value for each configuration
option have been retrieved and set. If False, return a dict of configuration
options and their value.
:rtype: bool of dict
"""
if self.comment:
print(f"# {self.comment}")
print(f"[{self.name}]\n")
result = {}
error = False
for name, option in self.options.items():
result[name] = option.ask_value(set_it=set_it)
option_result = option.ask_value(set_it=set_it)
if set_it:
result[name] = option_result
elif not option_result:
error = True
print()
print()
if set_it:
return not error
return result
def ask_value(self, option, set_it=True):
"""
Ask user to enter value for the specified configuration option of the section
:param options: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].ask_value(set_it=set_it)
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
"""
@ -679,7 +667,6 @@ class Config: # pylint: disable=too-many-instance-attributes
config_file_env_variable=None,
default_config_dirpath=None,
default_config_filename=None,
default_config_file_mode=None,
):
self.appname = appname
self.shortname = shortname
@ -695,7 +682,6 @@ class Config: # pylint: disable=too-many-instance-attributes
self.config_file_env_variable = config_file_env_variable
self.default_config_dirpath = default_config_dirpath
self.default_config_filename = default_config_filename
self.default_config_file_mode = default_config_file_mode or DEFAULT_CONFIG_FILE_MODE
self.add_logging_sections()
self._init_config_parser()
@ -896,7 +882,7 @@ class Config: # pylint: disable=too-many-instance-attributes
fd.write("\n".join(lines).encode(self.encoding))
# Privacy!
os.chmod(filepath, self.default_config_file_mode)
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
except Exception: # pylint: disable=broad-except
log.exception("Failed to write generated configuration file %s", filepath)
return False
@ -1087,7 +1073,8 @@ class Config: # pylint: disable=too-many-instance-attributes
self._loaded()
if self.get_option("mylib_config_reconfigure", default=False):
self.ask_values(set_it=True)
if self.ask_values(set_it=True) and self.save():
sys.exit(0)
sys.exit(1)
return options
@ -1113,32 +1100,27 @@ class Config: # pylint: disable=too-many-instance-attributes
:param execute_callback: Sections's loaded callbacks will be finally executed
(only if set_it is True, default: False)
:return: a dict of configuration section and their options value.
:rtype: dict
:return: If set_it is True, return True if valid value for each configuration
option have been retrieved and set. If False, return a dict of configuration
section and their options value.
:rtype: bool of dict
"""
result = {}
error = False
for name, section in self.sections.items():
result[name] = section.ask_values(set_it=set_it)
if set_it and execute_callback:
self._loaded()
section_result = section.ask_values(set_it=set_it)
if not set_it:
result[name] = section_result
elif not section_result:
error = True
if set_it:
if error:
return False
if execute_callback:
self._loaded()
return True
return result
def ask_value(self, section, option, set_it=True):
"""
Ask user to enter value for the specified configuration option
:param section: The configuration section name
:param option: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
return self.sections[section].ask_value(option, set_it=set_it)
def configure(self, argv=None, description=False):
"""
Entry point of a script you could use to created your configuration file

View file

@ -1,6 +1,5 @@
""" Email client to forge and send emails """
import base64
import email.utils
import logging
import os
@ -10,7 +9,6 @@ from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import magic
from mako.template import Template as MakoTemplate
from mylib.config import (
@ -24,14 +22,6 @@ from mylib.config import (
log = logging.getLogger(__name__)
def load_image_as_base64(path):
"""Load image file as base64"""
log.debug("Load image file '%s'", path)
with open(path, "rb") as file_desc:
data = file_desc.read()
return f"data:{magic.from_buffer(data, mime=True)};base64, {base64.b64encode(data).decode()}"
class EmailClient(
ConfigurableObject
): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
@ -175,7 +165,7 @@ class EmailClient(
continue
template_type = "text" if template_type == ".txt" else template_type[1:]
if template_name not in self.templates:
self.templates[template_name] = {"path": templates_path}
self.templates[template_name] = {}
log.debug("Load email template %s %s from %s", template_name, template_type, filepath)
with open(filepath, encoding="utf8") as file_desc:
self.templates[template_name][template_type] = MakoTemplate(
@ -249,7 +239,6 @@ class EmailClient(
msg["Date"] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self._get_option("encoding")
if template:
log.debug("Forge email from template %s", template)
assert template in self.templates, f"Unknown template {template}"
# Handle subject from template
if not subject:
@ -275,9 +264,6 @@ class EmailClient(
)
if self.templates[template].get("html"):
if isinstance(self.templates[template]["html"], MakoTemplate):
template_vars["load_image_as_base64"] = self.template_image_loader(
self.templates[template].get("path")
)
parts.append((self.templates[template]["html"].render(**template_vars), "html"))
else:
parts.append((self.templates[template]["html"].format(**template_vars), "html"))
@ -310,19 +296,6 @@ class EmailClient(
msg.attach(part)
return msg
@staticmethod
def template_image_loader(directory_path):
"""Return wrapper for the load_image_as_base64 function bind on the template directory"""
def _load_image_as_base64(path):
return load_image_as_base64(
os.path.join(directory_path, path)
if directory_path and not os.path.isabs(path)
else path
)
return _load_image_as_base64
def send(
self, recipients, msg=None, subject=None, just_try=None, cc=None, bcc=None, **forge_args
):
@ -433,3 +406,189 @@ class EmailClient(
server.quit()
return not error
if __name__ == "__main__":
# Run tests
import argparse
import datetime
import sys
# Options parser
parser = argparse.ArgumentParser()
parser.add_argument(
"-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode"
)
parser.add_argument(
"-d", "--debug", action="store_true", dest="debug", help="Enable debug mode"
)
parser.add_argument(
"-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path"
)
parser.add_argument(
"-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode"
)
email_opts = parser.add_argument_group("Email options")
email_opts.add_argument(
"-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host"
)
email_opts.add_argument(
"-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port"
)
email_opts.add_argument(
"-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL"
)
email_opts.add_argument(
"-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS"
)
email_opts.add_argument(
"-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username"
)
email_opts.add_argument(
"-p",
"--smtp-password",
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password",
)
email_opts.add_argument(
"-D",
"--smtp-debug",
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection",
)
email_opts.add_argument(
"-e",
"--email-encoding",
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding",
)
email_opts.add_argument(
"-f",
"--sender-name",
action="store",
type=str,
dest="email_sender_name",
help="Sender name",
)
email_opts.add_argument(
"-F",
"--sender-email",
action="store",
type=str,
dest="email_sender_email",
help="Sender email",
)
email_opts.add_argument(
"-C",
"--catch-all",
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address",
)
test_opts = parser.add_argument_group("Test email options")
test_opts.add_argument(
"-t",
"--to",
action="store",
type=str,
dest="test_to",
help="Test email recipient",
)
test_opts.add_argument(
"-m",
"--mako",
action="store_true",
dest="test_mako",
help="Test mako templating",
)
options = parser.parse_args()
if not options.test_to:
parser.error("You must specify test email recipient using -t/--to parameter")
sys.exit(1)
# Initialize logs
logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s"
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
loglevel = logging.INFO
else:
loglevel = logging.WARNING
if options.logfile:
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
else:
logging.basicConfig(level=loglevel, format=logformat)
if options.email_smtp_user and not options.email_smtp_password:
import getpass
options.email_smtp_password = getpass.getpass("Please enter SMTP password: ")
logging.info("Initialize Email client")
email_client = EmailClient(
smtp_host=options.email_smtp_host,
smtp_port=options.email_smtp_port,
smtp_ssl=options.email_smtp_ssl,
smtp_tls=options.email_smtp_tls,
smtp_user=options.email_smtp_user,
smtp_password=options.email_smtp_password,
smtp_debug=options.email_smtp_debug,
sender_name=options.email_sender_name,
sender_email=options.email_sender_email,
catch_all_addr=options.email_catch_all,
just_try=options.just_try,
encoding=options.email_encoding,
templates={
"test": {
"subject": "Test email",
"text": (
"Just a test email sent at {sent_date}."
if not options.test_mako
else MakoTemplate("Just a test email sent at ${sent_date | h}.") # nosec
),
"html": (
"<strong>Just a test email.</strong> <small>(sent at {sent_date | h})</small>"
if not options.test_mako
else MakoTemplate( # nosec
"<strong>Just a test email.</strong> "
"<small>(sent at ${sent_date | h})</small>"
)
),
}
},
)
logging.info("Send a test email to %s", options.test_to)
if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()):
logging.info("Test email sent")
sys.exit(0)
logging.error("Fail to send test email")
sys.exit(1)

View file

@ -575,7 +575,6 @@ class LdapClient:
warn=True,
paged_search=False,
pagesize=None,
nocache=False,
):
"""
Retrieve objects from LDAP
@ -593,11 +592,9 @@ class LdapClient:
(optional, default: False)
:param pagesize: When using paged search, the page size
(optional, default: see LdapServer.paged_search)
:param nocache: If True, disable using cache
"""
if name in self._cached_objects and not nocache:
if name in self._cached_objects:
log.debug("Retrieved %s objects from cache", name)
objects = self._cached_objects[name]
else:
assert self._conn or self.initialize()
log.debug(
@ -627,11 +624,13 @@ class LdapClient:
if not obj_dn or not isinstance(obj_attrs, dict):
continue
objects[obj_dn] = self._get_obj(obj_dn, obj_attrs)
if not nocache:
self._cached_objects[name] = objects
self._cached_objects[name] = objects
if not key_attr or key_attr == "dn":
return objects
return {self.get_attr(objects[dn], key_attr): objects[dn] for dn in objects}
return self._cached_objects[name]
return {
self.get_attr(self._cached_objects[name][dn], key_attr): self._cached_objects[name][dn]
for dn in self._cached_objects[name]
}
def get_object(self, type_name, object_name, filterstr, basedn, attrs, warn=True):
"""

View file

@ -11,7 +11,6 @@ week_days = ["lundi", "mardi", "mercredi", "jeudi", "vendredi", "samedi", "diman
date_format = "%d/%m/%Y"
date_pattern = re.compile("^([0-9]{2})/([0-9]{2})/([0-9]{4})$")
time_pattern = re.compile("^([0-9]{1,2})h([0-9]{2})?$")
_nonworking_french_public_days_of_the_year_cache = {}
def easter_date(year):
@ -38,25 +37,23 @@ def nonworking_french_public_days_of_the_year(year=None):
"""Compute dict of nonworking french public days for the specified year"""
if year is None:
year = datetime.date.today().year
if year not in _nonworking_french_public_days_of_the_year_cache:
dp = easter_date(year)
_nonworking_french_public_days_of_the_year_cache[year] = {
"1janvier": datetime.date(year, 1, 1),
"paques": dp,
"lundi_paques": (dp + datetime.timedelta(1)),
"1mai": datetime.date(year, 5, 1),
"8mai": datetime.date(year, 5, 8),
"jeudi_ascension": (dp + datetime.timedelta(39)),
"pentecote": (dp + datetime.timedelta(49)),
"lundi_pentecote": (dp + datetime.timedelta(50)),
"14juillet": datetime.date(year, 7, 14),
"15aout": datetime.date(year, 8, 15),
"1novembre": datetime.date(year, 11, 1),
"11novembre": datetime.date(year, 11, 11),
"noel": datetime.date(year, 12, 25),
"saint_etienne": datetime.date(year, 12, 26),
}
return _nonworking_french_public_days_of_the_year_cache[year]
dp = easter_date(year)
return {
"1janvier": datetime.date(year, 1, 1),
"paques": dp,
"lundi_paques": (dp + datetime.timedelta(1)),
"1mai": datetime.date(year, 5, 1),
"8mai": datetime.date(year, 5, 8),
"jeudi_ascension": (dp + datetime.timedelta(39)),
"pentecote": (dp + datetime.timedelta(49)),
"lundi_pentecote": (dp + datetime.timedelta(50)),
"14juillet": datetime.date(year, 7, 14),
"15aout": datetime.date(year, 8, 15),
"1novembre": datetime.date(year, 11, 1),
"11novembre": datetime.date(year, 11, 11),
"noel": datetime.date(year, 12, 25),
"saint_etienne": datetime.date(year, 12, 26),
}
def parse_exceptional_closures(values):
@ -158,153 +155,7 @@ def parse_normal_opening_hours(values):
if not days and not hours_periods:
raise ValueError(f'No days or hours period found in this value: "{value}"')
normal_opening_hours.append({"days": days, "hours_periods": hours_periods})
for idx, noh in enumerate(normal_opening_hours):
normal_opening_hours[idx]["hours_periods"] = sorted_hours_periods(noh["hours_periods"])
return sorted_opening_hours(normal_opening_hours)
def sorted_hours_periods(hours_periods):
"""Sort hours periods"""
return sorted(hours_periods, key=lambda hp: (hp["start"], hp["stop"]))
def sorted_opening_hours(opening_hours):
"""Sort opening hours"""
return sorted(
opening_hours,
key=lambda x: (
week_days.index(x["days"][0]) if x["days"] else None,
x["hours_periods"][0]["start"] if x["hours_periods"] else datetime.datetime.min.time(),
x["hours_periods"][0]["stop"] if x["hours_periods"] else datetime.datetime.max.time(),
),
)
def its_nonworking_day(nonworking_public_holidays_values, date=None):
"""Check if is a non-working day"""
if not nonworking_public_holidays_values:
return False
date = date if date else datetime.date.today()
log.debug("its_nonworking_day(%s): values=%s", date, nonworking_public_holidays_values)
nonworking_days = nonworking_french_public_days_of_the_year(year=date.year)
for day in nonworking_public_holidays_values:
if day in nonworking_days and nonworking_days[day] == date:
log.debug("its_nonworking_day(%s): %s", date, day)
return True
return False
def its_exceptionally_closed(exceptional_closures_values, when=None, parse=True, all_day=False):
"""Check if it's exceptionally closed"""
if not exceptional_closures_values:
return False
when = when if when else datetime.datetime.now()
assert isinstance(when, (datetime.date, datetime.datetime))
when_date = when.date() if isinstance(when, datetime.datetime) else when
exceptional_closures = (
parse_exceptional_closures(exceptional_closures_values)
if parse
else exceptional_closures_values
)
log.debug("its_exceptionally_closed(%s): exceptional closures=%s", when, exceptional_closures)
for cl in exceptional_closures:
if when_date not in cl["days"]:
log.debug(
"its_exceptionally_closed(%s): %s not in days (%s)", when, when_date, cl["days"]
)
continue
if not cl["hours_periods"]:
# All day exceptional closure
return True
if all_day:
# Wanted an all day closure, ignore it
continue
for hp in cl["hours_periods"]:
if hp["start"] <= when.time() <= hp["stop"]:
return True
return False
def get_exceptional_closures_hours(exceptional_closures_values, date=None, parse=True):
"""Get exceptional closures hours of the day"""
if not exceptional_closures_values:
return []
date = date if date else datetime.date.today()
exceptional_closures = (
parse_exceptional_closures(exceptional_closures_values)
if parse
else exceptional_closures_values
)
log.debug(
"get_exceptional_closures_hours(%s): exceptional closures=%s", date, exceptional_closures
)
exceptional_closures_hours = []
for cl in exceptional_closures:
if date not in cl["days"]:
log.debug("get_exceptional_closures_hours(%s): not in days (%s)", date, cl["days"])
continue
if not cl["hours_periods"]:
log.debug(
"get_exceptional_closures_hours(%s): it's exceptionally closed all the day", date
)
return [
{
"start": datetime.datetime.min.time(),
"stop": datetime.datetime.max.time(),
}
]
exceptional_closures_hours.extend(cl["hours_periods"])
log.debug(
"get_exceptional_closures_hours(%s): exceptional closures hours=%s",
date,
exceptional_closures_hours,
)
return sorted_hours_periods(exceptional_closures_hours)
def its_normally_open(normal_opening_hours_values, when=None, parse=True, ignore_time=False):
"""Check if it's normally open"""
when = when if when else datetime.datetime.now()
if not normal_opening_hours_values:
log.debug(
"its_normally_open(%s): no normal opening hours defined, consider as opened", when
)
return True
when_weekday = week_days[when.timetuple().tm_wday]
log.debug("its_normally_open(%s): week day=%s", when, when_weekday)
normal_opening_hours = (
parse_normal_opening_hours(normal_opening_hours_values)
if parse
else normal_opening_hours_values
)
log.debug("its_normally_open(%s): normal opening hours=%s", when, normal_opening_hours)
for oh in normal_opening_hours:
if oh["days"] and when_weekday not in oh["days"]:
log.debug("its_normally_open(%s): %s not in days (%s)", when, when_weekday, oh["days"])
continue
if not oh["hours_periods"] or ignore_time:
return True
for hp in oh["hours_periods"]:
if hp["start"] <= when.time() <= hp["stop"]:
return True
log.debug("its_normally_open(%s): not in normal opening hours", when)
return False
def its_opening_day(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
parse=True,
):
"""Check if it's an opening day"""
date = date if date else datetime.date.today()
if its_nonworking_day(nonworking_public_holidays_values, date=date):
return False
if its_exceptionally_closed(exceptional_closures_values, when=date, all_day=True, parse=parse):
return False
return its_normally_open(normal_opening_hours_values, when=date, parse=parse, ignore_time=True)
return normal_opening_hours
def is_closed(
@ -342,578 +193,76 @@ def is_closed(
when_time,
when_weekday,
)
# Handle non-working days
if its_nonworking_day(nonworking_public_holidays_values, date=when_date):
return {
"closed": True,
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
}
if nonworking_public_holidays_values:
log.debug("Nonworking public holidays: %s", nonworking_public_holidays_values)
nonworking_days = nonworking_french_public_days_of_the_year()
for day in nonworking_public_holidays_values:
if day in nonworking_days and when_date == nonworking_days[day]:
log.debug("Non working day: %s", day)
return {
"closed": True,
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
}
# Handle exceptional closures
try:
if its_exceptionally_closed(exceptional_closures_values, when=when):
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": its_exceptionally_closed(
exceptional_closures_values, when=when, all_day=True
),
}
except ValueError as e:
if on_error_result is None:
log.error("Fail to parse exceptional closures", exc_info=True)
raise e from e
log.error("Fail to parse exceptional closures, consider as %s", on_error, exc_info=True)
return on_error_result
# Finally, handle normal opening hours
try:
return {
"closed": not its_normally_open(normal_opening_hours_values, when=when),
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
except ValueError as e: # pylint: disable=broad-except
if on_error_result is None:
log.error("Fail to parse normal opening hours", exc_info=True)
raise e from e
log.error("Fail to parse normal opening hours, consider as %s", on_error, exc_info=True)
return on_error_result
def next_opening_date(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the next opening day"""
date = date if date else datetime.date.today()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
if exceptional_closures_values:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"next_opening_date(%s): fail to parse normal opening hours or exceptional closures",
date,
exc_info=True,
)
return False
added_days = 0
while added_days <= max_anaylse_days:
test_date = date + datetime.timedelta(days=added_days)
if its_opening_day(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=test_date,
parse=False,
):
return test_date
added_days += 1
log.debug(
"next_opening_date(%s): no opening day found in the next %d days", date, max_anaylse_days
)
return False
def next_opening_hour(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
when=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the next opening hour"""
when = when if when else datetime.datetime.now()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"next_opening_hour(%s): fail to parse normal opening hours or exceptional closures",
when,
exc_info=True,
)
return False
date = next_opening_date(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=when.date(),
max_anaylse_days=max_anaylse_days,
parse=False,
)
if not date:
log.debug(
"next_opening_hour(%s): no opening day found in the next %d days",
when,
max_anaylse_days,
)
return False
log.debug("next_opening_hour(%s): next opening date=%s", when, date)
weekday = week_days[date.timetuple().tm_wday]
log.debug("next_opening_hour(%s): next opening week day=%s", when, weekday)
exceptional_closures_hours = get_exceptional_closures_hours(
exceptional_closures_values, date=date, parse=False
)
log.debug(
"next_opening_hour(%s): next opening day exceptional closures hours=%s",
when,
exceptional_closures_hours,
)
next_opening_datetime = None
exceptionally_closed = False
exceptionally_closed_all_day = False
in_opening_hours = date != when.date()
for oh in normal_opening_hours_values:
if exceptionally_closed_all_day:
break
if oh["days"] and weekday not in oh["days"]:
log.debug("next_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
continue
log.debug(
"next_opening_hour(%s): %s in days (%s), handle opening hours %s",
when,
weekday,
oh["days"],
oh["hours_periods"],
)
if not oh["hours_periods"]:
log.debug(
"next_opening_hour(%s): %s is an all day opening day, handle exceptional closures "
"hours %s to find the minimal opening time",
when,
weekday,
exceptional_closures_hours,
)
if date == when.date():
in_opening_hours = True
test_time = when.time() if when.date() == date else datetime.datetime.min.time()
for cl in exceptional_closures_hours:
if cl["start"] <= test_time < cl["stop"]:
if cl["stop"] >= datetime.datetime.max.time():
exceptionally_closed = True
exceptionally_closed_all_day = True
next_opening_datetime = None
break
test_time = cl["stop"]
else:
break
if not exceptionally_closed_all_day:
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
next_opening_datetime = (
candidate_next_opening_datetime
if not next_opening_datetime
or candidate_next_opening_datetime < next_opening_datetime
else next_opening_datetime
)
continue
log.debug(
"next_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
"minimal starting time",
when,
oh["hours_periods"],
weekday,
)
test_time = datetime.datetime.max.time()
for hp in oh["hours_periods"]:
if date == when.date() and hp["stop"] < when.time():
log.debug(
"next_opening_hour(%s): ignore opening hours %s before specified when time %s",
when,
hp,
when.time(),
)
exceptional_closures = parse_exceptional_closures(exceptional_closures_values)
log.debug("Exceptional closures: %s", exceptional_closures)
except ValueError as e:
log.error("Fail to parse exceptional closures, consider as closed", exc_info=True)
if on_error_result is None:
raise e from e
return on_error_result
for cl in exceptional_closures:
if when_date not in cl["days"]:
log.debug("when_date (%s) no in days (%s)", when_date, cl["days"])
continue
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
in_opening_hours = True
if exceptional_closures_hours:
log.debug(
"next_opening_hour(%s): check if opening hours %s match with exceptional "
"closure hours %s",
when,
hp,
exceptional_closures_hours,
)
for cl in exceptional_closures_hours:
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
log.debug(
"next_opening_hour(%s): opening hour %s is included in exceptional "
"closure hours %s",
when,
hp,
cl,
)
exceptionally_closed = True
break
if hp["start"] < cl["start"]:
log.debug(
"next_opening_hour(%s): opening hour %s start before closure hours %s",
when,
hp,
cl,
)
test_time = hp["start"] if hp["start"] < test_time else test_time
elif cl["stop"] >= hp["start"] and cl["stop"] < hp["stop"]:
log.debug(
"next_opening_hour(%s): opening hour %s end after closure hours %s",
when,
hp,
cl,
)
test_time = cl["stop"] if cl["stop"] < test_time else test_time
elif hp["start"] < test_time:
log.debug(
"next_opening_hour(%s): no exceptional closure hours, use opening hours start "
"time %s",
when,
hp["start"],
)
test_time = hp["start"]
if not cl["hours_periods"]:
# All day exceptional closure
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": True,
}
for hp in cl["hours_periods"]:
if hp["start"] <= when_time <= hp["stop"]:
return {
"closed": True,
"exceptional_closure": True,
"exceptional_closure_all_day": False,
}
if test_time < datetime.datetime.max.time():
if date == when.date() and test_time < when.time():
test_time = when.time()
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
next_opening_datetime = (
candidate_next_opening_datetime
if not next_opening_datetime
or candidate_next_opening_datetime < next_opening_datetime
else next_opening_datetime
)
if not next_opening_datetime and (
exceptionally_closed or (date == when.date() and not in_opening_hours)
):
new_max_anaylse_days = max_anaylse_days - (date - when.date()).days
if new_max_anaylse_days > 0:
log.debug(
"next_opening_hour(%s): exceptionally closed on %s, try on following %d days",
when,
date,
new_max_anaylse_days,
)
next_opening_datetime = next_opening_hour(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
when=datetime.datetime.combine(
date + datetime.timedelta(days=1), datetime.datetime.min.time()
),
max_anaylse_days=new_max_anaylse_days,
parse=False,
)
if not next_opening_datetime:
log.debug(
"next_opening_hour(%s): no opening hours found in next %d days", when, max_anaylse_days
)
return False
log.debug("next_opening_hour(%s): next opening hours=%s", when, next_opening_datetime)
return next_opening_datetime
def previous_opening_date(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
date=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the previous opening day"""
date = date if date else datetime.date.today()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
if normal_opening_hours_values:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"previous_opening_date(%s): fail to parse normal opening hours or exceptional "
"closures",
date,
exc_info=True,
)
return False
days = 0
while days <= max_anaylse_days:
test_date = date - datetime.timedelta(days=days)
if its_opening_day(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=test_date,
parse=False,
):
return test_date
days += 1
log.debug(
"previous_opening_date(%s): no opening day found in the next %d days",
date,
max_anaylse_days,
)
return False
def previous_opening_hour(
normal_opening_hours_values=None,
exceptional_closures_values=None,
nonworking_public_holidays_values=None,
when=None,
max_anaylse_days=None,
parse=True,
):
"""Search for the previous opening hour"""
when = when if when else datetime.datetime.now()
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
if parse:
try:
normal_opening_hours_values = (
parse_normal_opening_hours(normal_opening_hours_values)
if normal_opening_hours_values
else None
)
exceptional_closures_values = (
parse_exceptional_closures(exceptional_closures_values)
if exceptional_closures_values
else None
)
except ValueError: # pylint: disable=broad-except
log.error(
"previous_opening_hour(%s): fail to parse normal opening hours or exceptional "
"closures",
when,
exc_info=True,
)
return False
date = previous_opening_date(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
date=when.date(),
max_anaylse_days=max_anaylse_days,
parse=False,
)
if not date:
log.debug(
"previous_opening_hour(%s): no opening day found in the previous %d days",
when,
max_anaylse_days,
)
return False
log.debug("previous_opening_hour(%s): previous opening date=%s", when, date)
weekday = week_days[date.timetuple().tm_wday]
log.debug("previous_opening_hour(%s): previous opening week day=%s", when, weekday)
exceptional_closures_hours = get_exceptional_closures_hours(
exceptional_closures_values, date=date, parse=False
)
log.debug(
"previous_opening_hour(%s): previous opening day exceptional closures hours=%s",
when,
exceptional_closures_hours,
)
previous_opening_datetime = None
exceptionally_closed = False
exceptionally_closed_all_day = False
in_opening_hours = date != when.date()
for oh in reversed(normal_opening_hours_values):
if exceptionally_closed_all_day:
break
if oh["days"] and weekday not in oh["days"]:
log.debug("previous_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
continue
log.debug(
"previous_opening_hour(%s): %s in days (%s), handle opening hours %s",
when,
weekday,
oh["days"],
oh["hours_periods"],
)
if not oh["hours_periods"]:
log.debug(
"previous_opening_hour(%s): %s is an all day opening day, handle exceptional "
"closures hours %s to find the maximal opening time",
when,
weekday,
exceptional_closures_hours,
)
if date == when.date():
in_opening_hours = True
test_time = when.time() if when.date() == date else datetime.datetime.max.time()
for cl in exceptional_closures_hours:
if cl["start"] <= test_time < cl["stop"]:
if cl["start"] <= datetime.datetime.min.time():
exceptionally_closed = True
exceptionally_closed_all_day = True
previous_opening_datetime = None
break
test_time = cl["start"]
else:
break
if not exceptionally_closed_all_day:
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
previous_opening_datetime = (
candidate_previous_opening_datetime
if not previous_opening_datetime
or candidate_previous_opening_datetime > previous_opening_datetime
else previous_opening_datetime
)
continue
log.debug(
"previous_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
"maximal opening time",
when,
oh["hours_periods"],
weekday,
)
test_time = datetime.datetime.min.time()
for hp in reversed(oh["hours_periods"]):
if date == when.date() and hp["start"] > when.time():
log.debug(
"previous_opening_hour(%s): ignore opening hours %s starting before specified "
"when time %s",
when,
hp,
when.time(),
)
normal_opening_hours = parse_normal_opening_hours(normal_opening_hours_values)
log.debug("Normal opening hours: %s", normal_opening_hours)
except ValueError as e: # pylint: disable=broad-except
log.error("Fail to parse normal opening hours, consider as closed", exc_info=True)
if on_error_result is None:
raise e from e
return on_error_result
for oh in normal_opening_hours:
if oh["days"] and when_weekday not in oh["days"]:
log.debug("when_weekday (%s) no in days (%s)", when_weekday, oh["days"])
continue
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
in_opening_hours = True
if exceptional_closures_hours:
log.debug(
"previous_opening_hour(%s): check if opening hours %s match with exceptional "
"closure hours %s",
when,
hp,
exceptional_closures_hours,
)
for cl in reversed(exceptional_closures_hours):
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s is included in exceptional "
"closure hours %s",
when,
hp,
cl,
)
exceptionally_closed = True
break
if cl["stop"] < hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s end after closure hours %s",
when,
hp,
cl,
)
test_time = hp["stop"] if hp["stop"] > test_time else test_time
elif cl["start"] > hp["stop"]:
log.debug(
"previous_opening_hour(%s): opening hour %s start before closure hours "
"%s",
when,
hp,
cl,
)
test_time = hp["stop"] if hp["stop"] > test_time else test_time
elif cl["stop"] >= hp["stop"] and cl["start"] > hp["start"]:
log.debug(
"previous_opening_hour(%s): opening hour %s start before closure hours "
"%s",
when,
hp,
cl,
)
test_time = cl["start"] if cl["start"] > test_time else test_time
elif hp["stop"] > test_time:
log.debug(
"previous_opening_hour(%s): no exceptional closure hours, use opening hours "
"stop time %s",
when,
hp["stop"],
)
test_time = hp["stop"]
if not oh["hours_periods"]:
# All day opened
return {
"closed": False,
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
for hp in oh["hours_periods"]:
if hp["start"] <= when_time <= hp["stop"]:
return {
"closed": False,
"exceptional_closure": False,
"exceptional_closure_all_day": False,
}
log.debug("Not in normal opening hours => closed")
return {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
if test_time > datetime.datetime.min.time():
if date == when.date() and test_time > when.time():
test_time = when.time()
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
previous_opening_datetime = (
candidate_previous_opening_datetime
if not previous_opening_datetime
or candidate_previous_opening_datetime > previous_opening_datetime
else previous_opening_datetime
)
if not previous_opening_datetime and (
exceptionally_closed or (date == when.date() and not in_opening_hours)
):
new_max_anaylse_days = max_anaylse_days - (when.date() - date).days
if new_max_anaylse_days > 0:
log.debug(
"previous_opening_hour(%s): exceptionally closed on %s, try on previous %d days",
when,
date,
new_max_anaylse_days,
)
previous_opening_datetime = previous_opening_hour(
normal_opening_hours_values=normal_opening_hours_values,
exceptional_closures_values=exceptional_closures_values,
nonworking_public_holidays_values=nonworking_public_holidays_values,
when=datetime.datetime.combine(
date - datetime.timedelta(days=1), datetime.datetime.max.time()
),
max_anaylse_days=new_max_anaylse_days,
parse=False,
)
if not previous_opening_datetime:
log.debug(
"previous_opening_hour(%s): no opening hours found in previous %d days",
when,
max_anaylse_days,
)
return False
log.debug(
"previous_opening_hour(%s): previous opening hours=%s", when, previous_opening_datetime
)
return previous_opening_datetime
# Not a nonworking day, not during exceptional closure and no normal opening
# hours defined => Opened
return {"closed": False, "exceptional_closure": False, "exceptional_closure_all_day": False}

View file

@ -132,7 +132,7 @@ class Report(ConfigurableObject): # pylint: disable=useless-object-inheritance
" email_client"
)
content = self.get_content()
if not content and not self._attachment_files and not self._attachment_payloads:
if not content:
log.debug("Report is empty, do not send it")
return True
msg = email_client.forge_message(

View file

@ -1,62 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!-- Created with Inkscape (http://www.inkscape.org/) -->
<svg
width="92.738403mm"
height="17.141003mm"
viewBox="0 0 92.738403 17.141003"
version="1.1"
id="svg5"
inkscape:version="1.2.2 (b0a8486541, 2022-12-01)"
sodipodi:docname="header.svg"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns="http://www.w3.org/2000/svg"
xmlns:svg="http://www.w3.org/2000/svg">
<sodipodi:namedview
id="namedview7"
pagecolor="#ffffff"
bordercolor="#666666"
borderopacity="1.0"
inkscape:showpageshadow="2"
inkscape:pageopacity="0.0"
inkscape:pagecheckerboard="0"
inkscape:deskcolor="#d1d1d1"
inkscape:document-units="mm"
showgrid="false"
inkscape:zoom="0.84096521"
inkscape:cx="92.156012"
inkscape:cy="315.11411"
inkscape:window-width="1920"
inkscape:window-height="1011"
inkscape:window-x="0"
inkscape:window-y="32"
inkscape:window-maximized="1"
inkscape:current-layer="layer1" />
<defs
id="defs2" />
<g
inkscape:label="Calque 1"
inkscape:groupmode="layer"
id="layer1"
transform="translate(-44.730637,-68.858589)">
<rect
style="fill:none;stroke:#004787;stroke-width:2;stroke-linecap:round;stroke-linejoin:round;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
id="rect234"
width="90.738403"
height="15.141003"
x="45.730637"
y="69.858589" />
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:5.29167px;line-height:125%;font-family:'DejaVu Sans Mono';-inkscape-font-specification:'DejaVu Sans Mono';letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:0.264583px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
x="82.664459"
y="79.623909"
id="text2144"><tspan
sodipodi:role="line"
id="tspan2142"
style="fill:#004787;fill-opacity:1;stroke-width:0.264583px"
x="82.664459"
y="79.623909">Header</tspan></text>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 2.2 KiB

View file

@ -1,2 +1 @@
<img src="${load_image_as_base64('header.svg')}" style="display: block" />
<p><strong>Just a test email.</strong> <small>(sent at ${sent_date})</small></p>
<strong>Just a test email.</strong> <small>(sent at ${sent_date})</small>

View file

@ -38,10 +38,19 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
"-T",
"--template",
action="store_true",
dest="template",
help="Template name to send (default: test)",
default="test",
)
test_opts.add_argument(
"-m",
"--mako",
action="store_true",
dest="test_mako",
help="Test mako templating",
)
test_opts.add_argument(
"--cc",
action="store",
@ -84,7 +93,7 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
options.test_to,
cc=options.test_cc,
bcc=options.test_bcc,
template=options.template,
template="test",
sent_date=datetime.datetime.now(),
):
log.info("Test email sent")

View file

@ -22,7 +22,6 @@ extras_require = {
"pytz",
],
"email": [
"python-magic",
"mako",
],
"pgsql": [

View file

@ -182,96 +182,27 @@ def test_parse_normal_opening_hours_multiple_periods():
]
def test_parse_normal_opening_hours_is_sorted():
assert opening_hours.parse_normal_opening_hours(
[
"samedi 9h30-18h",
"lundi-vendredi 14h-18h 9h30-12h30",
"samedi 9h30-12h",
"dimanche 9h30-12h",
]
) == [
{
"days": ["lundi", "mardi", "mercredi", "jeudi", "vendredi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
{"start": datetime.time(14, 0), "stop": datetime.time(18, 0)},
],
},
{
"days": ["samedi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
],
},
{
"days": ["samedi"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(18, 0)},
],
},
{
"days": ["dimanche"],
"hours_periods": [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
],
},
]
#
# Tests on is_closed
#
#
# Tests on normal opening hours
#
exceptional_closures = [
"22/09/2017",
"20/09/2017-22/09/2017",
"20/09/2017-22/09/2017 18/09/2017",
"25/11/2017",
"26/11/2017 9h30-12h30",
]
normal_opening_hours = [
"lundi-mardi jeudi 9h30-12h30 14h-16h30",
"mercredi vendredi 9h30-12h30 14h-17h",
"samedi",
]
normally_opened_datetime = datetime.datetime(2024, 3, 1, 10, 15)
normally_opened_all_day_datetime = datetime.datetime(2024, 4, 6, 10, 15)
normally_closed_datetime = datetime.datetime(2017, 3, 1, 20, 15)
normally_closed_all_day_datetime = datetime.datetime(2024, 4, 7, 20, 15)
def test_its_normally_open():
assert opening_hours.its_normally_open(normal_opening_hours, when=normally_opened_datetime)
def test_its_normally_open_all_day():
assert opening_hours.its_normally_open(
normal_opening_hours, when=normally_opened_all_day_datetime
)
def test_its_normally_closed():
assert not opening_hours.its_normally_open(normal_opening_hours, when=normally_closed_datetime)
def test_its_normally_closed_all_day():
assert not opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_all_day_datetime
)
def test_its_normally_open_ignore_time():
assert opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_datetime.date(), ignore_time=True
)
def test_its_normally_closed_ignore_time():
assert not opening_hours.its_normally_open(
normal_opening_hours, when=normally_closed_all_day_datetime.date(), ignore_time=True
)
#
# Tests on non working days
#
nonworking_public_holidays = [
"1janvier",
"paques",
"lundi_paques",
"1mai",
"8mai",
"jeudi_ascension",
"lundi_pentecote",
@ -281,120 +212,6 @@ nonworking_public_holidays = [
"11novembre",
"noel",
]
nonworking_date = datetime.date(2017, 1, 1)
not_included_nonworking_date = datetime.date(2017, 5, 1)
not_nonworking_date = datetime.date(2017, 5, 2)
def test_its_nonworking_day():
assert (
opening_hours.its_nonworking_day(nonworking_public_holidays, date=nonworking_date) is True
)
def test_its_not_nonworking_day():
assert (
opening_hours.its_nonworking_day(
nonworking_public_holidays,
date=not_nonworking_date,
)
is False
)
def test_its_not_included_nonworking_day():
assert (
opening_hours.its_nonworking_day(
nonworking_public_holidays,
date=not_included_nonworking_date,
)
is False
)
#
# Tests in exceptional closures
#
exceptional_closures = [
"22/09/2017",
"20/09/2017-22/09/2017",
"20/09/2017-22/09/2017 18/09/2017",
"25/11/2017",
"26/11/2017 9h30-12h30",
"27/11/2017 17h-18h 9h30-12h30",
]
exceptional_closure_all_day_date = datetime.date(2017, 9, 22)
exceptional_closure_all_day_datetime = datetime.datetime.combine(
exceptional_closure_all_day_date, datetime.time(20, 15)
)
exceptional_closure_datetime = datetime.datetime(2017, 11, 26, 10, 30)
exceptional_closure_datetime_hours_period = {
"start": datetime.time(9, 30),
"stop": datetime.time(12, 30),
}
not_exceptional_closure_date = datetime.date(2019, 9, 22)
def test_its_exceptionally_closed():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_all_day_datetime
)
is True
)
def test_its_not_exceptionally_closed():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=not_exceptional_closure_date
)
is False
)
def test_its_exceptionally_closed_all_day():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_all_day_datetime, all_day=True
)
is True
)
def test_its_not_exceptionally_closed_all_day():
assert (
opening_hours.its_exceptionally_closed(
exceptional_closures, when=exceptional_closure_datetime, all_day=True
)
is False
)
def test_get_exceptional_closures_hours():
assert opening_hours.get_exceptional_closures_hours(
exceptional_closures, date=exceptional_closure_datetime.date()
) == [exceptional_closure_datetime_hours_period]
def test_get_exceptional_closures_hours_all_day():
assert opening_hours.get_exceptional_closures_hours(
exceptional_closures, date=exceptional_closure_all_day_date
) == [{"start": datetime.datetime.min.time(), "stop": datetime.datetime.max.time()}]
def test_get_exceptional_closures_hours_is_sorted():
assert opening_hours.get_exceptional_closures_hours(
["27/11/2017 17h-18h 9h30-12h30"], date=datetime.date(2017, 11, 27)
) == [
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
{"start": datetime.time(17, 0), "stop": datetime.time(18, 0)},
]
#
# Tests on is_closed
#
def test_is_closed_when_normaly_closed_by_hour():
@ -438,7 +255,7 @@ def test_is_closed_when_normaly_closed_by_day():
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2017, 5, 7, 14, 15),
when=datetime.datetime(2017, 5, 6, 14, 15),
) == {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
@ -483,203 +300,3 @@ def test_nonworking_french_public_days_of_the_year():
"noel": datetime.date(2021, 12, 25),
"saint_etienne": datetime.date(2021, 12, 26),
}
def test_next_opening_date():
assert opening_hours.next_opening_date(
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
date=datetime.date(2021, 4, 4),
) == datetime.date(2021, 4, 6)
def test_next_opening_hour():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=normal_opening_hours,
exceptional_closures_values=exceptional_closures,
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 9, 30)
def test_next_opening_hour_with_exceptionnal_closure_hours():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 16, 0)
def test_next_opening_hour_with_exceptionnal_closure_day():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 7, 9, 0)
def test_next_opening_hour_with_overlapsed_opening_hours():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
) == datetime.datetime(2021, 4, 6, 8, 0)
def test_next_opening_hour_with_too_large_exceptionnal_closure_days():
assert (
opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/04/2021-16-04/2021"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 4, 10, 30),
max_anaylse_days=10,
)
is False
)
def test_next_opening_hour_on_opened_moment():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 10, 30),
) == datetime.datetime(2021, 4, 6, 10, 30)
def test_next_opening_hour_on_same_day():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 13, 0),
) == datetime.datetime(2021, 4, 6, 14, 0)
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 16, 0),
) == datetime.datetime(2021, 4, 6, 16, 0)
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 16, 0),
) == datetime.datetime(2021, 4, 6, 16, 0)
def test_next_opening_hour_on_opened_day_but_too_late():
assert opening_hours.next_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2021, 4, 6, 23, 0),
) == datetime.datetime(2021, 4, 7, 9, 0)
def test_previous_opening_date():
assert opening_hours.previous_opening_date(
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
date=datetime.date(2024, 4, 1),
) == datetime.date(2024, 3, 29)
def test_previous_opening_hour():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 18, 0)
def test_previous_opening_hour_with_exceptionnal_closure_hours():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024 14h-18h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 12, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024 16h-18h"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 29, 16, 0)
def test_previous_opening_hour_with_exceptionnal_closure_day():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["29/03/2024"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 1, 10, 30),
) == datetime.datetime(2024, 3, 28, 18, 0)
def test_previous_opening_hour_with_overlapsed_opening_hours():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 3, 8, 30),
) == datetime.datetime(2024, 4, 2, 19, 0)
def test_previous_opening_hour_with_too_large_exceptionnal_closure_days():
assert (
opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=["06/03/2024-16-04/2024"],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 17, 8, 30),
max_anaylse_days=10,
)
is False
)
def test_previous_opening_hour_on_opened_moment():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 10, 30),
) == datetime.datetime(2024, 4, 5, 10, 30)
def test_previous_opening_hour_on_same_day():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 13, 0),
) == datetime.datetime(2024, 4, 5, 12, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 16, 0),
) == datetime.datetime(2024, 4, 5, 16, 0)
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 16, 0),
) == datetime.datetime(2024, 4, 5, 16, 0)
def test_previous_opening_hour_on_opened_day_but_too_early():
assert opening_hours.previous_opening_hour(
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
exceptional_closures_values=[],
nonworking_public_holidays_values=nonworking_public_holidays,
when=datetime.datetime(2024, 4, 5, 8, 0),
) == datetime.datetime(2024, 4, 4, 18, 0)