Compare commits
9 commits
Author | SHA1 | Date | |
---|---|---|---|
|
601857a2d4 | ||
|
4c51d0086f | ||
|
384e8a441d | ||
|
9e3faee819 | ||
|
296618a34e | ||
28103836ac | |||
3cf6a2682c | |||
eb87516e1a | |||
5dbdb0ffe6 |
15 changed files with 1327 additions and 369 deletions
|
@ -32,6 +32,7 @@ jobs:
|
|||
runs-on: docker
|
||||
container:
|
||||
image: docker.io/brenard/debian-python-deb:latest
|
||||
needs: build
|
||||
steps:
|
||||
- name: Download Debian & Python packages files
|
||||
uses: actions/download-artifact@v3
|
||||
|
@ -67,6 +68,7 @@ jobs:
|
|||
runs-on: docker
|
||||
container:
|
||||
image: docker.io/brenard/aptly-publish:latest
|
||||
needs: build
|
||||
steps:
|
||||
- name: "Download Debian package files"
|
||||
uses: actions/download-artifact@v3
|
||||
|
|
|
@ -5,26 +5,10 @@ jobs:
|
|||
tests:
|
||||
runs-on: docker
|
||||
container:
|
||||
image: docker.io/brenard/python-pre-commit:latest
|
||||
image: docker.io/brenard/mylib:dev-master
|
||||
options: "--workdir /src"
|
||||
steps:
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- name: Install dependencies
|
||||
env:
|
||||
DEBIAN_FRONTEND: noninteractive
|
||||
run: |
|
||||
apt-get -qq update
|
||||
apt-get -qq -y install --no-install-recommends \
|
||||
build-essential \
|
||||
python3 \
|
||||
python3-dev \
|
||||
libldap2-dev \
|
||||
libsasl2-dev \
|
||||
pkg-config \
|
||||
libsystemd-dev \
|
||||
libpq-dev \
|
||||
libmariadb-dev
|
||||
- name: Run tests.sh
|
||||
run: ./tests.sh
|
||||
run: ./tests.sh --no-venv
|
||||
|
|
1
build.sh
1
build.sh
|
@ -114,6 +114,7 @@ $GITDCH \
|
|||
--release-notes ../../dist/release_notes.md \
|
||||
--path ../../ \
|
||||
--exclude "^CI: " \
|
||||
--exclude "^Docker: " \
|
||||
--exclude "^pre-commit: " \
|
||||
--exclude "\.?woodpecker(\.yml)?" \
|
||||
--exclude "build(\.sh)?" \
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
FROM brenard/mylib:latest
|
||||
RUN apt-get remove -y python3-mylib
|
||||
RUN python3 -m pip install -U git+https://gitea.zionetrix.net/bn8/python-mylib.git
|
||||
RUN git clone https://gitea.zionetrix.net/bn8/python-mylib.git /usr/local/src/python-mylib && pip install /usr/local/src/python-mylib[dev]
|
||||
RUN cd /usr/local/src/python-mylib && pre-commit run --all-files
|
||||
RUN apt-get remove -y python3-mylib && \
|
||||
git clone https://gitea.zionetrix.net/bn8/python-mylib.git /src && \
|
||||
pip install --break-system-packages /src[dev] && \
|
||||
cd /src && \
|
||||
pre-commit run --all-files
|
||||
|
|
|
@ -1,5 +1,26 @@
|
|||
FROM debian:latest
|
||||
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && apt-get -o Acquire::AllowInsecureRepositories=true -o Acquire::AllowDowngradeToInsecureRepositories=true update && apt-get -o APT::Get::AllowUnauthenticated=true install --yes zionetrix-archive-keyring && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
|
||||
RUN apt-get update && apt-get upgrade -y && apt-get install -y python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git libldap2-dev libsasl2-dev pkg-config libsystemd-dev libpq-dev libmariadb-dev wget unzip && apt-get clean && rm -fr rm -rf /var/lib/apt/lists/*
|
||||
RUN python3 -m pip install pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
|
||||
RUN wget --no-verbose -O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && ldconfig
|
||||
FROM node:16-bookworm-slim
|
||||
RUN echo "deb http://debian.zionetrix.net stable main" > /etc/apt/sources.list.d/zionetrix.list && \
|
||||
apt-get \
|
||||
-o Acquire::AllowInsecureRepositories=true \
|
||||
-o Acquire::AllowDowngradeToInsecureRepositories=true \
|
||||
update && \
|
||||
apt-get \
|
||||
-o APT::Get::AllowUnauthenticated=true \
|
||||
install --yes zionetrix-archive-keyring && \
|
||||
apt-get update && \
|
||||
apt-get upgrade -y && \
|
||||
apt-get install -y \
|
||||
python3-all python3-dev python3-pip python3-venv python3-mylib build-essential git \
|
||||
libldap2-dev libsasl2-dev \
|
||||
pkg-config libsystemd-dev \
|
||||
libpq-dev libmariadb-dev \
|
||||
wget unzip && \
|
||||
apt-get clean && \
|
||||
rm -fr rm -rf /var/lib/apt/lists/*
|
||||
RUN python3 -m pip install --break-system-packages pylint pytest flake8 flake8-junit-report pylint-junit junitparser pre-commit
|
||||
RUN wget --no-verbose \
|
||||
-O /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip \
|
||||
https://download.oracle.com/otn_software/linux/instantclient/214000/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
|
||||
unzip -qq -d /opt /opt/instantclient-basic-linux.x64-21.4.0.0.0dbru.zip && \
|
||||
echo /opt/instantclient_* > /etc/ld.so.conf.d/oracle-instantclient.conf && \
|
||||
ldconfig
|
||||
|
|
|
@ -6,7 +6,6 @@ import argparse
|
|||
import logging
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
import sys
|
||||
import textwrap
|
||||
import traceback
|
||||
|
@ -24,6 +23,7 @@ log = logging.getLogger(__name__)
|
|||
# Constants
|
||||
DEFAULT_ENCODING = "utf-8"
|
||||
DEFAULT_CONFIG_DIRPATH = os.path.expanduser("./")
|
||||
DEFAULT_CONFIG_FILE_MODE = 0o600
|
||||
DEFAULT_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
|
||||
DEFAULT_CONSOLE_LOG_FORMAT = DEFAULT_LOG_FORMAT
|
||||
DEFAULT_FILELOG_FORMAT = DEFAULT_LOG_FORMAT
|
||||
|
@ -249,12 +249,16 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
|
|||
|
||||
def ask_value(self, set_it=True):
|
||||
"""
|
||||
Ask to user to enter value of this option and set or
|
||||
return it regarding set parameter
|
||||
Ask to user to enter value of this option and set it if set_it parameter is True
|
||||
|
||||
:param set_it: If True (default), option value will be updated with user input
|
||||
|
||||
:return: The configuration option value.
|
||||
:rtype: mixed
|
||||
"""
|
||||
value = self._ask_value()
|
||||
if set_it:
|
||||
return self.set(value)
|
||||
self.set(value)
|
||||
return value
|
||||
|
||||
|
||||
|
@ -510,8 +514,12 @@ class PasswordOption(StringOption):
|
|||
|
||||
def ask_value(self, set_it=True):
|
||||
"""
|
||||
Ask to user to enter value of this option and set or
|
||||
return it regarding set parameter
|
||||
Ask to user to enter value of this option and set it if set_it parameter is True
|
||||
|
||||
:param set_it: If True (default), option value will be updated with user input
|
||||
|
||||
:return: The configuration option value.
|
||||
:rtype: mixed
|
||||
"""
|
||||
value = self._ask_value()
|
||||
if set_it:
|
||||
|
@ -530,7 +538,7 @@ class PasswordOption(StringOption):
|
|||
use_keyring = False
|
||||
else:
|
||||
print("Invalid answer. Possible values: Y or N (case insensitive)")
|
||||
return self.set(value, use_keyring=use_keyring)
|
||||
self.set(value, use_keyring=use_keyring)
|
||||
return value
|
||||
|
||||
|
||||
|
@ -610,28 +618,32 @@ class ConfigSection:
|
|||
|
||||
:param set_it: If True (default), option value will be updated with user input
|
||||
|
||||
:return: If set_it is True, return True if valid value for each configuration
|
||||
option have been retrieved and set. If False, return a dict of configuration
|
||||
options and their value.
|
||||
:return: a dict of configuration options and their value.
|
||||
:rtype: bool of dict
|
||||
"""
|
||||
if self.comment:
|
||||
print(f"# {self.comment}")
|
||||
print(f"[{self.name}]\n")
|
||||
result = {}
|
||||
error = False
|
||||
for name, option in self.options.items():
|
||||
option_result = option.ask_value(set_it=set_it)
|
||||
if set_it:
|
||||
result[name] = option_result
|
||||
elif not option_result:
|
||||
error = True
|
||||
result[name] = option.ask_value(set_it=set_it)
|
||||
print()
|
||||
print()
|
||||
if set_it:
|
||||
return not error
|
||||
return result
|
||||
|
||||
def ask_value(self, option, set_it=True):
|
||||
"""
|
||||
Ask user to enter value for the specified configuration option of the section
|
||||
|
||||
:param options: The configuration option name
|
||||
:param set_it: If True (default), option value will be updated with user input
|
||||
|
||||
:return: The configuration option value.
|
||||
:rtype: mixed
|
||||
"""
|
||||
assert self.defined(option), f"Option {option} unknown"
|
||||
return self.options[option].ask_value(set_it=set_it)
|
||||
|
||||
|
||||
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
|
||||
"""
|
||||
|
@ -667,6 +679,7 @@ class Config: # pylint: disable=too-many-instance-attributes
|
|||
config_file_env_variable=None,
|
||||
default_config_dirpath=None,
|
||||
default_config_filename=None,
|
||||
default_config_file_mode=None,
|
||||
):
|
||||
self.appname = appname
|
||||
self.shortname = shortname
|
||||
|
@ -682,6 +695,7 @@ class Config: # pylint: disable=too-many-instance-attributes
|
|||
self.config_file_env_variable = config_file_env_variable
|
||||
self.default_config_dirpath = default_config_dirpath
|
||||
self.default_config_filename = default_config_filename
|
||||
self.default_config_file_mode = default_config_file_mode or DEFAULT_CONFIG_FILE_MODE
|
||||
self.add_logging_sections()
|
||||
self._init_config_parser()
|
||||
|
||||
|
@ -882,7 +896,7 @@ class Config: # pylint: disable=too-many-instance-attributes
|
|||
fd.write("\n".join(lines).encode(self.encoding))
|
||||
|
||||
# Privacy!
|
||||
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
|
||||
os.chmod(filepath, self.default_config_file_mode)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.exception("Failed to write generated configuration file %s", filepath)
|
||||
return False
|
||||
|
@ -1073,8 +1087,7 @@ class Config: # pylint: disable=too-many-instance-attributes
|
|||
self._loaded()
|
||||
|
||||
if self.get_option("mylib_config_reconfigure", default=False):
|
||||
if self.ask_values(set_it=True) and self.save():
|
||||
sys.exit(0)
|
||||
self.ask_values(set_it=True)
|
||||
sys.exit(1)
|
||||
|
||||
return options
|
||||
|
@ -1100,27 +1113,32 @@ class Config: # pylint: disable=too-many-instance-attributes
|
|||
:param execute_callback: Sections's loaded callbacks will be finally executed
|
||||
(only if set_it is True, default: False)
|
||||
|
||||
:return: If set_it is True, return True if valid value for each configuration
|
||||
option have been retrieved and set. If False, return a dict of configuration
|
||||
section and their options value.
|
||||
:rtype: bool of dict
|
||||
:return: a dict of configuration section and their options value.
|
||||
:rtype: dict
|
||||
"""
|
||||
result = {}
|
||||
error = False
|
||||
for name, section in self.sections.items():
|
||||
section_result = section.ask_values(set_it=set_it)
|
||||
if not set_it:
|
||||
result[name] = section_result
|
||||
elif not section_result:
|
||||
error = True
|
||||
if set_it:
|
||||
if error:
|
||||
return False
|
||||
if execute_callback:
|
||||
result[name] = section.ask_values(set_it=set_it)
|
||||
|
||||
if set_it and execute_callback:
|
||||
self._loaded()
|
||||
return True
|
||||
|
||||
return result
|
||||
|
||||
def ask_value(self, section, option, set_it=True):
|
||||
"""
|
||||
Ask user to enter value for the specified configuration option
|
||||
|
||||
:param section: The configuration section name
|
||||
:param option: The configuration option name
|
||||
:param set_it: If True (default), option value will be updated with user input
|
||||
|
||||
:return: The configuration option value.
|
||||
:rtype: mixed
|
||||
"""
|
||||
assert self.defined(section, option), f"Unknown option {section}.{option}"
|
||||
return self.sections[section].ask_value(option, set_it=set_it)
|
||||
|
||||
def configure(self, argv=None, description=False):
|
||||
"""
|
||||
Entry point of a script you could use to created your configuration file
|
||||
|
|
215
mylib/email.py
215
mylib/email.py
|
@ -1,5 +1,6 @@
|
|||
""" Email client to forge and send emails """
|
||||
|
||||
import base64
|
||||
import email.utils
|
||||
import logging
|
||||
import os
|
||||
|
@ -9,6 +10,7 @@ from email.mime.base import MIMEBase
|
|||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
import magic
|
||||
from mako.template import Template as MakoTemplate
|
||||
|
||||
from mylib.config import (
|
||||
|
@ -22,6 +24,14 @@ from mylib.config import (
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_image_as_base64(path):
|
||||
"""Load image file as base64"""
|
||||
log.debug("Load image file '%s'", path)
|
||||
with open(path, "rb") as file_desc:
|
||||
data = file_desc.read()
|
||||
return f"data:{magic.from_buffer(data, mime=True)};base64, {base64.b64encode(data).decode()}"
|
||||
|
||||
|
||||
class EmailClient(
|
||||
ConfigurableObject
|
||||
): # pylint: disable=useless-object-inheritance,too-many-instance-attributes
|
||||
|
@ -165,7 +175,7 @@ class EmailClient(
|
|||
continue
|
||||
template_type = "text" if template_type == ".txt" else template_type[1:]
|
||||
if template_name not in self.templates:
|
||||
self.templates[template_name] = {}
|
||||
self.templates[template_name] = {"path": templates_path}
|
||||
log.debug("Load email template %s %s from %s", template_name, template_type, filepath)
|
||||
with open(filepath, encoding="utf8") as file_desc:
|
||||
self.templates[template_name][template_type] = MakoTemplate(
|
||||
|
@ -239,6 +249,7 @@ class EmailClient(
|
|||
msg["Date"] = email.utils.formatdate(None, True)
|
||||
encoding = encoding if encoding else self._get_option("encoding")
|
||||
if template:
|
||||
log.debug("Forge email from template %s", template)
|
||||
assert template in self.templates, f"Unknown template {template}"
|
||||
# Handle subject from template
|
||||
if not subject:
|
||||
|
@ -264,6 +275,9 @@ class EmailClient(
|
|||
)
|
||||
if self.templates[template].get("html"):
|
||||
if isinstance(self.templates[template]["html"], MakoTemplate):
|
||||
template_vars["load_image_as_base64"] = self.template_image_loader(
|
||||
self.templates[template].get("path")
|
||||
)
|
||||
parts.append((self.templates[template]["html"].render(**template_vars), "html"))
|
||||
else:
|
||||
parts.append((self.templates[template]["html"].format(**template_vars), "html"))
|
||||
|
@ -296,6 +310,19 @@ class EmailClient(
|
|||
msg.attach(part)
|
||||
return msg
|
||||
|
||||
@staticmethod
|
||||
def template_image_loader(directory_path):
|
||||
"""Return wrapper for the load_image_as_base64 function bind on the template directory"""
|
||||
|
||||
def _load_image_as_base64(path):
|
||||
return load_image_as_base64(
|
||||
os.path.join(directory_path, path)
|
||||
if directory_path and not os.path.isabs(path)
|
||||
else path
|
||||
)
|
||||
|
||||
return _load_image_as_base64
|
||||
|
||||
def send(
|
||||
self, recipients, msg=None, subject=None, just_try=None, cc=None, bcc=None, **forge_args
|
||||
):
|
||||
|
@ -406,189 +433,3 @@ class EmailClient(
|
|||
server.quit()
|
||||
|
||||
return not error
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run tests
|
||||
import argparse
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
# Options parser
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="store_true", dest="verbose", help="Enable verbose mode"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-d", "--debug", action="store_true", dest="debug", help="Enable debug mode"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-l", "--log-file", action="store", type=str, dest="logfile", help="Log file path"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-j", "--just-try", action="store_true", dest="just_try", help="Enable just-try mode"
|
||||
)
|
||||
|
||||
email_opts = parser.add_argument_group("Email options")
|
||||
|
||||
email_opts.add_argument(
|
||||
"-H", "--smtp-host", action="store", type=str, dest="email_smtp_host", help="SMTP host"
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-P", "--smtp-port", action="store", type=int, dest="email_smtp_port", help="SMTP port"
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-S", "--smtp-ssl", action="store_true", dest="email_smtp_ssl", help="Use SSL"
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-T", "--smtp-tls", action="store_true", dest="email_smtp_tls", help="Use TLS"
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-u", "--smtp-user", action="store", type=str, dest="email_smtp_user", help="SMTP username"
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-p",
|
||||
"--smtp-password",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="email_smtp_password",
|
||||
help="SMTP password",
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-D",
|
||||
"--smtp-debug",
|
||||
action="store_true",
|
||||
dest="email_smtp_debug",
|
||||
help="Debug SMTP connection",
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-e",
|
||||
"--email-encoding",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="email_encoding",
|
||||
help="SMTP encoding",
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-f",
|
||||
"--sender-name",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="email_sender_name",
|
||||
help="Sender name",
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-F",
|
||||
"--sender-email",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="email_sender_email",
|
||||
help="Sender email",
|
||||
)
|
||||
|
||||
email_opts.add_argument(
|
||||
"-C",
|
||||
"--catch-all",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="email_catch_all",
|
||||
help="Catch all sent email: specify catch recipient email address",
|
||||
)
|
||||
|
||||
test_opts = parser.add_argument_group("Test email options")
|
||||
|
||||
test_opts.add_argument(
|
||||
"-t",
|
||||
"--to",
|
||||
action="store",
|
||||
type=str,
|
||||
dest="test_to",
|
||||
help="Test email recipient",
|
||||
)
|
||||
|
||||
test_opts.add_argument(
|
||||
"-m",
|
||||
"--mako",
|
||||
action="store_true",
|
||||
dest="test_mako",
|
||||
help="Test mako templating",
|
||||
)
|
||||
|
||||
options = parser.parse_args()
|
||||
|
||||
if not options.test_to:
|
||||
parser.error("You must specify test email recipient using -t/--to parameter")
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize logs
|
||||
logformat = "%(asctime)s - Test EmailClient - %(levelname)s - %(message)s"
|
||||
if options.debug:
|
||||
loglevel = logging.DEBUG
|
||||
elif options.verbose:
|
||||
loglevel = logging.INFO
|
||||
else:
|
||||
loglevel = logging.WARNING
|
||||
|
||||
if options.logfile:
|
||||
logging.basicConfig(filename=options.logfile, level=loglevel, format=logformat)
|
||||
else:
|
||||
logging.basicConfig(level=loglevel, format=logformat)
|
||||
|
||||
if options.email_smtp_user and not options.email_smtp_password:
|
||||
import getpass
|
||||
|
||||
options.email_smtp_password = getpass.getpass("Please enter SMTP password: ")
|
||||
|
||||
logging.info("Initialize Email client")
|
||||
email_client = EmailClient(
|
||||
smtp_host=options.email_smtp_host,
|
||||
smtp_port=options.email_smtp_port,
|
||||
smtp_ssl=options.email_smtp_ssl,
|
||||
smtp_tls=options.email_smtp_tls,
|
||||
smtp_user=options.email_smtp_user,
|
||||
smtp_password=options.email_smtp_password,
|
||||
smtp_debug=options.email_smtp_debug,
|
||||
sender_name=options.email_sender_name,
|
||||
sender_email=options.email_sender_email,
|
||||
catch_all_addr=options.email_catch_all,
|
||||
just_try=options.just_try,
|
||||
encoding=options.email_encoding,
|
||||
templates={
|
||||
"test": {
|
||||
"subject": "Test email",
|
||||
"text": (
|
||||
"Just a test email sent at {sent_date}."
|
||||
if not options.test_mako
|
||||
else MakoTemplate("Just a test email sent at ${sent_date | h}.") # nosec
|
||||
),
|
||||
"html": (
|
||||
"<strong>Just a test email.</strong> <small>(sent at {sent_date | h})</small>"
|
||||
if not options.test_mako
|
||||
else MakoTemplate( # nosec
|
||||
"<strong>Just a test email.</strong> "
|
||||
"<small>(sent at ${sent_date | h})</small>"
|
||||
)
|
||||
),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
logging.info("Send a test email to %s", options.test_to)
|
||||
if email_client.send(options.test_to, template="test", sent_date=datetime.datetime.now()):
|
||||
logging.info("Test email sent")
|
||||
sys.exit(0)
|
||||
logging.error("Fail to send test email")
|
||||
sys.exit(1)
|
||||
|
|
|
@ -575,6 +575,7 @@ class LdapClient:
|
|||
warn=True,
|
||||
paged_search=False,
|
||||
pagesize=None,
|
||||
nocache=False,
|
||||
):
|
||||
"""
|
||||
Retrieve objects from LDAP
|
||||
|
@ -592,9 +593,11 @@ class LdapClient:
|
|||
(optional, default: False)
|
||||
:param pagesize: When using paged search, the page size
|
||||
(optional, default: see LdapServer.paged_search)
|
||||
:param nocache: If True, disable using cache
|
||||
"""
|
||||
if name in self._cached_objects:
|
||||
if name in self._cached_objects and not nocache:
|
||||
log.debug("Retrieved %s objects from cache", name)
|
||||
objects = self._cached_objects[name]
|
||||
else:
|
||||
assert self._conn or self.initialize()
|
||||
log.debug(
|
||||
|
@ -624,13 +627,11 @@ class LdapClient:
|
|||
if not obj_dn or not isinstance(obj_attrs, dict):
|
||||
continue
|
||||
objects[obj_dn] = self._get_obj(obj_dn, obj_attrs)
|
||||
if not nocache:
|
||||
self._cached_objects[name] = objects
|
||||
if not key_attr or key_attr == "dn":
|
||||
return self._cached_objects[name]
|
||||
return {
|
||||
self.get_attr(self._cached_objects[name][dn], key_attr): self._cached_objects[name][dn]
|
||||
for dn in self._cached_objects[name]
|
||||
}
|
||||
return objects
|
||||
return {self.get_attr(objects[dn], key_attr): objects[dn] for dn in objects}
|
||||
|
||||
def get_object(self, type_name, object_name, filterstr, basedn, attrs, warn=True):
|
||||
"""
|
||||
|
|
|
@ -11,6 +11,7 @@ week_days = ["lundi", "mardi", "mercredi", "jeudi", "vendredi", "samedi", "diman
|
|||
date_format = "%d/%m/%Y"
|
||||
date_pattern = re.compile("^([0-9]{2})/([0-9]{2})/([0-9]{4})$")
|
||||
time_pattern = re.compile("^([0-9]{1,2})h([0-9]{2})?$")
|
||||
_nonworking_french_public_days_of_the_year_cache = {}
|
||||
|
||||
|
||||
def easter_date(year):
|
||||
|
@ -37,8 +38,9 @@ def nonworking_french_public_days_of_the_year(year=None):
|
|||
"""Compute dict of nonworking french public days for the specified year"""
|
||||
if year is None:
|
||||
year = datetime.date.today().year
|
||||
if year not in _nonworking_french_public_days_of_the_year_cache:
|
||||
dp = easter_date(year)
|
||||
return {
|
||||
_nonworking_french_public_days_of_the_year_cache[year] = {
|
||||
"1janvier": datetime.date(year, 1, 1),
|
||||
"paques": dp,
|
||||
"lundi_paques": (dp + datetime.timedelta(1)),
|
||||
|
@ -54,6 +56,7 @@ def nonworking_french_public_days_of_the_year(year=None):
|
|||
"noel": datetime.date(year, 12, 25),
|
||||
"saint_etienne": datetime.date(year, 12, 26),
|
||||
}
|
||||
return _nonworking_french_public_days_of_the_year_cache[year]
|
||||
|
||||
|
||||
def parse_exceptional_closures(values):
|
||||
|
@ -155,7 +158,153 @@ def parse_normal_opening_hours(values):
|
|||
if not days and not hours_periods:
|
||||
raise ValueError(f'No days or hours period found in this value: "{value}"')
|
||||
normal_opening_hours.append({"days": days, "hours_periods": hours_periods})
|
||||
return normal_opening_hours
|
||||
for idx, noh in enumerate(normal_opening_hours):
|
||||
normal_opening_hours[idx]["hours_periods"] = sorted_hours_periods(noh["hours_periods"])
|
||||
return sorted_opening_hours(normal_opening_hours)
|
||||
|
||||
|
||||
def sorted_hours_periods(hours_periods):
|
||||
"""Sort hours periods"""
|
||||
return sorted(hours_periods, key=lambda hp: (hp["start"], hp["stop"]))
|
||||
|
||||
|
||||
def sorted_opening_hours(opening_hours):
|
||||
"""Sort opening hours"""
|
||||
return sorted(
|
||||
opening_hours,
|
||||
key=lambda x: (
|
||||
week_days.index(x["days"][0]) if x["days"] else None,
|
||||
x["hours_periods"][0]["start"] if x["hours_periods"] else datetime.datetime.min.time(),
|
||||
x["hours_periods"][0]["stop"] if x["hours_periods"] else datetime.datetime.max.time(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def its_nonworking_day(nonworking_public_holidays_values, date=None):
|
||||
"""Check if is a non-working day"""
|
||||
if not nonworking_public_holidays_values:
|
||||
return False
|
||||
date = date if date else datetime.date.today()
|
||||
log.debug("its_nonworking_day(%s): values=%s", date, nonworking_public_holidays_values)
|
||||
nonworking_days = nonworking_french_public_days_of_the_year(year=date.year)
|
||||
for day in nonworking_public_holidays_values:
|
||||
if day in nonworking_days and nonworking_days[day] == date:
|
||||
log.debug("its_nonworking_day(%s): %s", date, day)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def its_exceptionally_closed(exceptional_closures_values, when=None, parse=True, all_day=False):
|
||||
"""Check if it's exceptionally closed"""
|
||||
if not exceptional_closures_values:
|
||||
return False
|
||||
when = when if when else datetime.datetime.now()
|
||||
assert isinstance(when, (datetime.date, datetime.datetime))
|
||||
when_date = when.date() if isinstance(when, datetime.datetime) else when
|
||||
exceptional_closures = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if parse
|
||||
else exceptional_closures_values
|
||||
)
|
||||
log.debug("its_exceptionally_closed(%s): exceptional closures=%s", when, exceptional_closures)
|
||||
for cl in exceptional_closures:
|
||||
if when_date not in cl["days"]:
|
||||
log.debug(
|
||||
"its_exceptionally_closed(%s): %s not in days (%s)", when, when_date, cl["days"]
|
||||
)
|
||||
continue
|
||||
if not cl["hours_periods"]:
|
||||
# All day exceptional closure
|
||||
return True
|
||||
if all_day:
|
||||
# Wanted an all day closure, ignore it
|
||||
continue
|
||||
for hp in cl["hours_periods"]:
|
||||
if hp["start"] <= when.time() <= hp["stop"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_exceptional_closures_hours(exceptional_closures_values, date=None, parse=True):
|
||||
"""Get exceptional closures hours of the day"""
|
||||
if not exceptional_closures_values:
|
||||
return []
|
||||
date = date if date else datetime.date.today()
|
||||
exceptional_closures = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if parse
|
||||
else exceptional_closures_values
|
||||
)
|
||||
log.debug(
|
||||
"get_exceptional_closures_hours(%s): exceptional closures=%s", date, exceptional_closures
|
||||
)
|
||||
exceptional_closures_hours = []
|
||||
for cl in exceptional_closures:
|
||||
if date not in cl["days"]:
|
||||
log.debug("get_exceptional_closures_hours(%s): not in days (%s)", date, cl["days"])
|
||||
continue
|
||||
if not cl["hours_periods"]:
|
||||
log.debug(
|
||||
"get_exceptional_closures_hours(%s): it's exceptionally closed all the day", date
|
||||
)
|
||||
return [
|
||||
{
|
||||
"start": datetime.datetime.min.time(),
|
||||
"stop": datetime.datetime.max.time(),
|
||||
}
|
||||
]
|
||||
exceptional_closures_hours.extend(cl["hours_periods"])
|
||||
log.debug(
|
||||
"get_exceptional_closures_hours(%s): exceptional closures hours=%s",
|
||||
date,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
return sorted_hours_periods(exceptional_closures_hours)
|
||||
|
||||
|
||||
def its_normally_open(normal_opening_hours_values, when=None, parse=True, ignore_time=False):
|
||||
"""Check if it's normally open"""
|
||||
when = when if when else datetime.datetime.now()
|
||||
if not normal_opening_hours_values:
|
||||
log.debug(
|
||||
"its_normally_open(%s): no normal opening hours defined, consider as opened", when
|
||||
)
|
||||
return True
|
||||
when_weekday = week_days[when.timetuple().tm_wday]
|
||||
log.debug("its_normally_open(%s): week day=%s", when, when_weekday)
|
||||
normal_opening_hours = (
|
||||
parse_normal_opening_hours(normal_opening_hours_values)
|
||||
if parse
|
||||
else normal_opening_hours_values
|
||||
)
|
||||
log.debug("its_normally_open(%s): normal opening hours=%s", when, normal_opening_hours)
|
||||
for oh in normal_opening_hours:
|
||||
if oh["days"] and when_weekday not in oh["days"]:
|
||||
log.debug("its_normally_open(%s): %s not in days (%s)", when, when_weekday, oh["days"])
|
||||
continue
|
||||
if not oh["hours_periods"] or ignore_time:
|
||||
return True
|
||||
for hp in oh["hours_periods"]:
|
||||
if hp["start"] <= when.time() <= hp["stop"]:
|
||||
return True
|
||||
log.debug("its_normally_open(%s): not in normal opening hours", when)
|
||||
return False
|
||||
|
||||
|
||||
def its_opening_day(
|
||||
normal_opening_hours_values=None,
|
||||
exceptional_closures_values=None,
|
||||
nonworking_public_holidays_values=None,
|
||||
date=None,
|
||||
parse=True,
|
||||
):
|
||||
"""Check if it's an opening day"""
|
||||
date = date if date else datetime.date.today()
|
||||
if its_nonworking_day(nonworking_public_holidays_values, date=date):
|
||||
return False
|
||||
if its_exceptionally_closed(exceptional_closures_values, when=date, all_day=True, parse=parse):
|
||||
return False
|
||||
return its_normally_open(normal_opening_hours_values, when=date, parse=parse, ignore_time=True)
|
||||
|
||||
|
||||
def is_closed(
|
||||
|
@ -193,76 +342,578 @@ def is_closed(
|
|||
when_time,
|
||||
when_weekday,
|
||||
)
|
||||
if nonworking_public_holidays_values:
|
||||
log.debug("Nonworking public holidays: %s", nonworking_public_holidays_values)
|
||||
nonworking_days = nonworking_french_public_days_of_the_year()
|
||||
for day in nonworking_public_holidays_values:
|
||||
if day in nonworking_days and when_date == nonworking_days[day]:
|
||||
log.debug("Non working day: %s", day)
|
||||
# Handle non-working days
|
||||
if its_nonworking_day(nonworking_public_holidays_values, date=when_date):
|
||||
return {
|
||||
"closed": True,
|
||||
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
|
||||
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
|
||||
}
|
||||
|
||||
if exceptional_closures_values:
|
||||
# Handle exceptional closures
|
||||
try:
|
||||
exceptional_closures = parse_exceptional_closures(exceptional_closures_values)
|
||||
log.debug("Exceptional closures: %s", exceptional_closures)
|
||||
if its_exceptionally_closed(exceptional_closures_values, when=when):
|
||||
return {
|
||||
"closed": True,
|
||||
"exceptional_closure": True,
|
||||
"exceptional_closure_all_day": its_exceptionally_closed(
|
||||
exceptional_closures_values, when=when, all_day=True
|
||||
),
|
||||
}
|
||||
except ValueError as e:
|
||||
log.error("Fail to parse exceptional closures, consider as closed", exc_info=True)
|
||||
if on_error_result is None:
|
||||
log.error("Fail to parse exceptional closures", exc_info=True)
|
||||
raise e from e
|
||||
log.error("Fail to parse exceptional closures, consider as %s", on_error, exc_info=True)
|
||||
return on_error_result
|
||||
for cl in exceptional_closures:
|
||||
if when_date not in cl["days"]:
|
||||
log.debug("when_date (%s) no in days (%s)", when_date, cl["days"])
|
||||
continue
|
||||
if not cl["hours_periods"]:
|
||||
# All day exceptional closure
|
||||
return {
|
||||
"closed": True,
|
||||
"exceptional_closure": True,
|
||||
"exceptional_closure_all_day": True,
|
||||
}
|
||||
for hp in cl["hours_periods"]:
|
||||
if hp["start"] <= when_time <= hp["stop"]:
|
||||
return {
|
||||
"closed": True,
|
||||
"exceptional_closure": True,
|
||||
"exceptional_closure_all_day": False,
|
||||
}
|
||||
|
||||
if normal_opening_hours_values:
|
||||
# Finally, handle normal opening hours
|
||||
try:
|
||||
normal_opening_hours = parse_normal_opening_hours(normal_opening_hours_values)
|
||||
log.debug("Normal opening hours: %s", normal_opening_hours)
|
||||
return {
|
||||
"closed": not its_normally_open(normal_opening_hours_values, when=when),
|
||||
"exceptional_closure": False,
|
||||
"exceptional_closure_all_day": False,
|
||||
}
|
||||
except ValueError as e: # pylint: disable=broad-except
|
||||
log.error("Fail to parse normal opening hours, consider as closed", exc_info=True)
|
||||
if on_error_result is None:
|
||||
log.error("Fail to parse normal opening hours", exc_info=True)
|
||||
raise e from e
|
||||
log.error("Fail to parse normal opening hours, consider as %s", on_error, exc_info=True)
|
||||
return on_error_result
|
||||
for oh in normal_opening_hours:
|
||||
if oh["days"] and when_weekday not in oh["days"]:
|
||||
log.debug("when_weekday (%s) no in days (%s)", when_weekday, oh["days"])
|
||||
continue
|
||||
if not oh["hours_periods"]:
|
||||
# All day opened
|
||||
return {
|
||||
"closed": False,
|
||||
"exceptional_closure": False,
|
||||
"exceptional_closure_all_day": False,
|
||||
}
|
||||
for hp in oh["hours_periods"]:
|
||||
if hp["start"] <= when_time <= hp["stop"]:
|
||||
return {
|
||||
"closed": False,
|
||||
"exceptional_closure": False,
|
||||
"exceptional_closure_all_day": False,
|
||||
}
|
||||
log.debug("Not in normal opening hours => closed")
|
||||
return {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
|
||||
|
||||
# Not a nonworking day, not during exceptional closure and no normal opening
|
||||
# hours defined => Opened
|
||||
return {"closed": False, "exceptional_closure": False, "exceptional_closure_all_day": False}
|
||||
|
||||
def next_opening_date(
|
||||
normal_opening_hours_values=None,
|
||||
exceptional_closures_values=None,
|
||||
nonworking_public_holidays_values=None,
|
||||
date=None,
|
||||
max_anaylse_days=None,
|
||||
parse=True,
|
||||
):
|
||||
"""Search for the next opening day"""
|
||||
date = date if date else datetime.date.today()
|
||||
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
||||
if parse:
|
||||
try:
|
||||
normal_opening_hours_values = (
|
||||
parse_normal_opening_hours(normal_opening_hours_values)
|
||||
if normal_opening_hours_values
|
||||
else None
|
||||
)
|
||||
exceptional_closures_values = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if exceptional_closures_values
|
||||
else None
|
||||
)
|
||||
except ValueError: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"next_opening_date(%s): fail to parse normal opening hours or exceptional closures",
|
||||
date,
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
added_days = 0
|
||||
while added_days <= max_anaylse_days:
|
||||
test_date = date + datetime.timedelta(days=added_days)
|
||||
if its_opening_day(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
date=test_date,
|
||||
parse=False,
|
||||
):
|
||||
return test_date
|
||||
added_days += 1
|
||||
log.debug(
|
||||
"next_opening_date(%s): no opening day found in the next %d days", date, max_anaylse_days
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def next_opening_hour(
|
||||
normal_opening_hours_values=None,
|
||||
exceptional_closures_values=None,
|
||||
nonworking_public_holidays_values=None,
|
||||
when=None,
|
||||
max_anaylse_days=None,
|
||||
parse=True,
|
||||
):
|
||||
"""Search for the next opening hour"""
|
||||
when = when if when else datetime.datetime.now()
|
||||
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
||||
if parse:
|
||||
try:
|
||||
normal_opening_hours_values = (
|
||||
parse_normal_opening_hours(normal_opening_hours_values)
|
||||
if normal_opening_hours_values
|
||||
else None
|
||||
)
|
||||
exceptional_closures_values = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if exceptional_closures_values
|
||||
else None
|
||||
)
|
||||
except ValueError: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"next_opening_hour(%s): fail to parse normal opening hours or exceptional closures",
|
||||
when,
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
date = next_opening_date(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
date=when.date(),
|
||||
max_anaylse_days=max_anaylse_days,
|
||||
parse=False,
|
||||
)
|
||||
if not date:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): no opening day found in the next %d days",
|
||||
when,
|
||||
max_anaylse_days,
|
||||
)
|
||||
return False
|
||||
log.debug("next_opening_hour(%s): next opening date=%s", when, date)
|
||||
weekday = week_days[date.timetuple().tm_wday]
|
||||
log.debug("next_opening_hour(%s): next opening week day=%s", when, weekday)
|
||||
exceptional_closures_hours = get_exceptional_closures_hours(
|
||||
exceptional_closures_values, date=date, parse=False
|
||||
)
|
||||
log.debug(
|
||||
"next_opening_hour(%s): next opening day exceptional closures hours=%s",
|
||||
when,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
next_opening_datetime = None
|
||||
exceptionally_closed = False
|
||||
exceptionally_closed_all_day = False
|
||||
in_opening_hours = date != when.date()
|
||||
for oh in normal_opening_hours_values:
|
||||
if exceptionally_closed_all_day:
|
||||
break
|
||||
|
||||
if oh["days"] and weekday not in oh["days"]:
|
||||
log.debug("next_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
|
||||
continue
|
||||
|
||||
log.debug(
|
||||
"next_opening_hour(%s): %s in days (%s), handle opening hours %s",
|
||||
when,
|
||||
weekday,
|
||||
oh["days"],
|
||||
oh["hours_periods"],
|
||||
)
|
||||
|
||||
if not oh["hours_periods"]:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): %s is an all day opening day, handle exceptional closures "
|
||||
"hours %s to find the minimal opening time",
|
||||
when,
|
||||
weekday,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
if date == when.date():
|
||||
in_opening_hours = True
|
||||
test_time = when.time() if when.date() == date else datetime.datetime.min.time()
|
||||
for cl in exceptional_closures_hours:
|
||||
if cl["start"] <= test_time < cl["stop"]:
|
||||
if cl["stop"] >= datetime.datetime.max.time():
|
||||
exceptionally_closed = True
|
||||
exceptionally_closed_all_day = True
|
||||
next_opening_datetime = None
|
||||
break
|
||||
test_time = cl["stop"]
|
||||
else:
|
||||
break
|
||||
if not exceptionally_closed_all_day:
|
||||
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
|
||||
next_opening_datetime = (
|
||||
candidate_next_opening_datetime
|
||||
if not next_opening_datetime
|
||||
or candidate_next_opening_datetime < next_opening_datetime
|
||||
else next_opening_datetime
|
||||
)
|
||||
continue
|
||||
|
||||
log.debug(
|
||||
"next_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
|
||||
"minimal starting time",
|
||||
when,
|
||||
oh["hours_periods"],
|
||||
weekday,
|
||||
)
|
||||
test_time = datetime.datetime.max.time()
|
||||
for hp in oh["hours_periods"]:
|
||||
if date == when.date() and hp["stop"] < when.time():
|
||||
log.debug(
|
||||
"next_opening_hour(%s): ignore opening hours %s before specified when time %s",
|
||||
when,
|
||||
hp,
|
||||
when.time(),
|
||||
)
|
||||
continue
|
||||
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
|
||||
in_opening_hours = True
|
||||
if exceptional_closures_hours:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): check if opening hours %s match with exceptional "
|
||||
"closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
for cl in exceptional_closures_hours:
|
||||
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): opening hour %s is included in exceptional "
|
||||
"closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
exceptionally_closed = True
|
||||
break
|
||||
if hp["start"] < cl["start"]:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): opening hour %s start before closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
test_time = hp["start"] if hp["start"] < test_time else test_time
|
||||
elif cl["stop"] >= hp["start"] and cl["stop"] < hp["stop"]:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): opening hour %s end after closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
test_time = cl["stop"] if cl["stop"] < test_time else test_time
|
||||
elif hp["start"] < test_time:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): no exceptional closure hours, use opening hours start "
|
||||
"time %s",
|
||||
when,
|
||||
hp["start"],
|
||||
)
|
||||
test_time = hp["start"]
|
||||
|
||||
if test_time < datetime.datetime.max.time():
|
||||
if date == when.date() and test_time < when.time():
|
||||
test_time = when.time()
|
||||
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
|
||||
next_opening_datetime = (
|
||||
candidate_next_opening_datetime
|
||||
if not next_opening_datetime
|
||||
or candidate_next_opening_datetime < next_opening_datetime
|
||||
else next_opening_datetime
|
||||
)
|
||||
|
||||
if not next_opening_datetime and (
|
||||
exceptionally_closed or (date == when.date() and not in_opening_hours)
|
||||
):
|
||||
new_max_anaylse_days = max_anaylse_days - (date - when.date()).days
|
||||
if new_max_anaylse_days > 0:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): exceptionally closed on %s, try on following %d days",
|
||||
when,
|
||||
date,
|
||||
new_max_anaylse_days,
|
||||
)
|
||||
next_opening_datetime = next_opening_hour(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
when=datetime.datetime.combine(
|
||||
date + datetime.timedelta(days=1), datetime.datetime.min.time()
|
||||
),
|
||||
max_anaylse_days=new_max_anaylse_days,
|
||||
parse=False,
|
||||
)
|
||||
if not next_opening_datetime:
|
||||
log.debug(
|
||||
"next_opening_hour(%s): no opening hours found in next %d days", when, max_anaylse_days
|
||||
)
|
||||
return False
|
||||
log.debug("next_opening_hour(%s): next opening hours=%s", when, next_opening_datetime)
|
||||
return next_opening_datetime
|
||||
|
||||
|
||||
def previous_opening_date(
|
||||
normal_opening_hours_values=None,
|
||||
exceptional_closures_values=None,
|
||||
nonworking_public_holidays_values=None,
|
||||
date=None,
|
||||
max_anaylse_days=None,
|
||||
parse=True,
|
||||
):
|
||||
"""Search for the previous opening day"""
|
||||
date = date if date else datetime.date.today()
|
||||
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
||||
if parse:
|
||||
try:
|
||||
normal_opening_hours_values = (
|
||||
parse_normal_opening_hours(normal_opening_hours_values)
|
||||
if normal_opening_hours_values
|
||||
else None
|
||||
)
|
||||
exceptional_closures_values = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if exceptional_closures_values
|
||||
else None
|
||||
)
|
||||
except ValueError: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"previous_opening_date(%s): fail to parse normal opening hours or exceptional "
|
||||
"closures",
|
||||
date,
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
days = 0
|
||||
while days <= max_anaylse_days:
|
||||
test_date = date - datetime.timedelta(days=days)
|
||||
if its_opening_day(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
date=test_date,
|
||||
parse=False,
|
||||
):
|
||||
return test_date
|
||||
days += 1
|
||||
log.debug(
|
||||
"previous_opening_date(%s): no opening day found in the next %d days",
|
||||
date,
|
||||
max_anaylse_days,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def previous_opening_hour(
|
||||
normal_opening_hours_values=None,
|
||||
exceptional_closures_values=None,
|
||||
nonworking_public_holidays_values=None,
|
||||
when=None,
|
||||
max_anaylse_days=None,
|
||||
parse=True,
|
||||
):
|
||||
"""Search for the previous opening hour"""
|
||||
when = when if when else datetime.datetime.now()
|
||||
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
||||
if parse:
|
||||
try:
|
||||
normal_opening_hours_values = (
|
||||
parse_normal_opening_hours(normal_opening_hours_values)
|
||||
if normal_opening_hours_values
|
||||
else None
|
||||
)
|
||||
exceptional_closures_values = (
|
||||
parse_exceptional_closures(exceptional_closures_values)
|
||||
if exceptional_closures_values
|
||||
else None
|
||||
)
|
||||
except ValueError: # pylint: disable=broad-except
|
||||
log.error(
|
||||
"previous_opening_hour(%s): fail to parse normal opening hours or exceptional "
|
||||
"closures",
|
||||
when,
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
date = previous_opening_date(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
date=when.date(),
|
||||
max_anaylse_days=max_anaylse_days,
|
||||
parse=False,
|
||||
)
|
||||
if not date:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): no opening day found in the previous %d days",
|
||||
when,
|
||||
max_anaylse_days,
|
||||
)
|
||||
return False
|
||||
log.debug("previous_opening_hour(%s): previous opening date=%s", when, date)
|
||||
weekday = week_days[date.timetuple().tm_wday]
|
||||
log.debug("previous_opening_hour(%s): previous opening week day=%s", when, weekday)
|
||||
exceptional_closures_hours = get_exceptional_closures_hours(
|
||||
exceptional_closures_values, date=date, parse=False
|
||||
)
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): previous opening day exceptional closures hours=%s",
|
||||
when,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
previous_opening_datetime = None
|
||||
exceptionally_closed = False
|
||||
exceptionally_closed_all_day = False
|
||||
in_opening_hours = date != when.date()
|
||||
for oh in reversed(normal_opening_hours_values):
|
||||
if exceptionally_closed_all_day:
|
||||
break
|
||||
|
||||
if oh["days"] and weekday not in oh["days"]:
|
||||
log.debug("previous_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
|
||||
continue
|
||||
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): %s in days (%s), handle opening hours %s",
|
||||
when,
|
||||
weekday,
|
||||
oh["days"],
|
||||
oh["hours_periods"],
|
||||
)
|
||||
|
||||
if not oh["hours_periods"]:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): %s is an all day opening day, handle exceptional "
|
||||
"closures hours %s to find the maximal opening time",
|
||||
when,
|
||||
weekday,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
if date == when.date():
|
||||
in_opening_hours = True
|
||||
test_time = when.time() if when.date() == date else datetime.datetime.max.time()
|
||||
for cl in exceptional_closures_hours:
|
||||
if cl["start"] <= test_time < cl["stop"]:
|
||||
if cl["start"] <= datetime.datetime.min.time():
|
||||
exceptionally_closed = True
|
||||
exceptionally_closed_all_day = True
|
||||
previous_opening_datetime = None
|
||||
break
|
||||
test_time = cl["start"]
|
||||
else:
|
||||
break
|
||||
if not exceptionally_closed_all_day:
|
||||
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
|
||||
previous_opening_datetime = (
|
||||
candidate_previous_opening_datetime
|
||||
if not previous_opening_datetime
|
||||
or candidate_previous_opening_datetime > previous_opening_datetime
|
||||
else previous_opening_datetime
|
||||
)
|
||||
continue
|
||||
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
|
||||
"maximal opening time",
|
||||
when,
|
||||
oh["hours_periods"],
|
||||
weekday,
|
||||
)
|
||||
test_time = datetime.datetime.min.time()
|
||||
for hp in reversed(oh["hours_periods"]):
|
||||
if date == when.date() and hp["start"] > when.time():
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): ignore opening hours %s starting before specified "
|
||||
"when time %s",
|
||||
when,
|
||||
hp,
|
||||
when.time(),
|
||||
)
|
||||
continue
|
||||
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
|
||||
in_opening_hours = True
|
||||
if exceptional_closures_hours:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): check if opening hours %s match with exceptional "
|
||||
"closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
exceptional_closures_hours,
|
||||
)
|
||||
for cl in reversed(exceptional_closures_hours):
|
||||
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): opening hour %s is included in exceptional "
|
||||
"closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
exceptionally_closed = True
|
||||
break
|
||||
if cl["stop"] < hp["stop"]:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): opening hour %s end after closure hours %s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
test_time = hp["stop"] if hp["stop"] > test_time else test_time
|
||||
elif cl["start"] > hp["stop"]:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): opening hour %s start before closure hours "
|
||||
"%s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
test_time = hp["stop"] if hp["stop"] > test_time else test_time
|
||||
elif cl["stop"] >= hp["stop"] and cl["start"] > hp["start"]:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): opening hour %s start before closure hours "
|
||||
"%s",
|
||||
when,
|
||||
hp,
|
||||
cl,
|
||||
)
|
||||
test_time = cl["start"] if cl["start"] > test_time else test_time
|
||||
elif hp["stop"] > test_time:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): no exceptional closure hours, use opening hours "
|
||||
"stop time %s",
|
||||
when,
|
||||
hp["stop"],
|
||||
)
|
||||
test_time = hp["stop"]
|
||||
|
||||
if test_time > datetime.datetime.min.time():
|
||||
if date == when.date() and test_time > when.time():
|
||||
test_time = when.time()
|
||||
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
|
||||
previous_opening_datetime = (
|
||||
candidate_previous_opening_datetime
|
||||
if not previous_opening_datetime
|
||||
or candidate_previous_opening_datetime > previous_opening_datetime
|
||||
else previous_opening_datetime
|
||||
)
|
||||
|
||||
if not previous_opening_datetime and (
|
||||
exceptionally_closed or (date == when.date() and not in_opening_hours)
|
||||
):
|
||||
new_max_anaylse_days = max_anaylse_days - (when.date() - date).days
|
||||
if new_max_anaylse_days > 0:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): exceptionally closed on %s, try on previous %d days",
|
||||
when,
|
||||
date,
|
||||
new_max_anaylse_days,
|
||||
)
|
||||
previous_opening_datetime = previous_opening_hour(
|
||||
normal_opening_hours_values=normal_opening_hours_values,
|
||||
exceptional_closures_values=exceptional_closures_values,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
||||
when=datetime.datetime.combine(
|
||||
date - datetime.timedelta(days=1), datetime.datetime.max.time()
|
||||
),
|
||||
max_anaylse_days=new_max_anaylse_days,
|
||||
parse=False,
|
||||
)
|
||||
if not previous_opening_datetime:
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): no opening hours found in previous %d days",
|
||||
when,
|
||||
max_anaylse_days,
|
||||
)
|
||||
return False
|
||||
log.debug(
|
||||
"previous_opening_hour(%s): previous opening hours=%s", when, previous_opening_datetime
|
||||
)
|
||||
return previous_opening_datetime
|
||||
|
|
|
@ -132,7 +132,7 @@ class Report(ConfigurableObject): # pylint: disable=useless-object-inheritance
|
|||
" email_client"
|
||||
)
|
||||
content = self.get_content()
|
||||
if not content:
|
||||
if not content and not self._attachment_files and not self._attachment_payloads:
|
||||
log.debug("Report is empty, do not send it")
|
||||
return True
|
||||
msg = email_client.forge_message(
|
||||
|
|
62
mylib/scripts/email_templates/header.svg
Normal file
62
mylib/scripts/email_templates/header.svg
Normal file
|
@ -0,0 +1,62 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!-- Created with Inkscape (http://www.inkscape.org/) -->
|
||||
|
||||
<svg
|
||||
width="92.738403mm"
|
||||
height="17.141003mm"
|
||||
viewBox="0 0 92.738403 17.141003"
|
||||
version="1.1"
|
||||
id="svg5"
|
||||
inkscape:version="1.2.2 (b0a8486541, 2022-12-01)"
|
||||
sodipodi:docname="header.svg"
|
||||
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
|
||||
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
xmlns:svg="http://www.w3.org/2000/svg">
|
||||
<sodipodi:namedview
|
||||
id="namedview7"
|
||||
pagecolor="#ffffff"
|
||||
bordercolor="#666666"
|
||||
borderopacity="1.0"
|
||||
inkscape:showpageshadow="2"
|
||||
inkscape:pageopacity="0.0"
|
||||
inkscape:pagecheckerboard="0"
|
||||
inkscape:deskcolor="#d1d1d1"
|
||||
inkscape:document-units="mm"
|
||||
showgrid="false"
|
||||
inkscape:zoom="0.84096521"
|
||||
inkscape:cx="92.156012"
|
||||
inkscape:cy="315.11411"
|
||||
inkscape:window-width="1920"
|
||||
inkscape:window-height="1011"
|
||||
inkscape:window-x="0"
|
||||
inkscape:window-y="32"
|
||||
inkscape:window-maximized="1"
|
||||
inkscape:current-layer="layer1" />
|
||||
<defs
|
||||
id="defs2" />
|
||||
<g
|
||||
inkscape:label="Calque 1"
|
||||
inkscape:groupmode="layer"
|
||||
id="layer1"
|
||||
transform="translate(-44.730637,-68.858589)">
|
||||
<rect
|
||||
style="fill:none;stroke:#004787;stroke-width:2;stroke-linecap:round;stroke-linejoin:round;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
|
||||
id="rect234"
|
||||
width="90.738403"
|
||||
height="15.141003"
|
||||
x="45.730637"
|
||||
y="69.858589" />
|
||||
<text
|
||||
xml:space="preserve"
|
||||
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:5.29167px;line-height:125%;font-family:'DejaVu Sans Mono';-inkscape-font-specification:'DejaVu Sans Mono';letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:0.264583px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
|
||||
x="82.664459"
|
||||
y="79.623909"
|
||||
id="text2144"><tspan
|
||||
sodipodi:role="line"
|
||||
id="tspan2142"
|
||||
style="fill:#004787;fill-opacity:1;stroke-width:0.264583px"
|
||||
x="82.664459"
|
||||
y="79.623909">Header</tspan></text>
|
||||
</g>
|
||||
</svg>
|
After Width: | Height: | Size: 2.2 KiB |
|
@ -1 +1,2 @@
|
|||
<strong>Just a test email.</strong> <small>(sent at ${sent_date})</small>
|
||||
<img src="${load_image_as_base64('header.svg')}" style="display: block" />
|
||||
<p><strong>Just a test email.</strong> <small>(sent at ${sent_date})</small></p>
|
||||
|
|
|
@ -38,19 +38,10 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
|
|||
"-T",
|
||||
"--template",
|
||||
action="store_true",
|
||||
dest="template",
|
||||
help="Template name to send (default: test)",
|
||||
default="test",
|
||||
)
|
||||
|
||||
test_opts.add_argument(
|
||||
"-m",
|
||||
"--mako",
|
||||
action="store_true",
|
||||
dest="test_mako",
|
||||
help="Test mako templating",
|
||||
)
|
||||
|
||||
test_opts.add_argument(
|
||||
"--cc",
|
||||
action="store",
|
||||
|
@ -93,7 +84,7 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
|
|||
options.test_to,
|
||||
cc=options.test_cc,
|
||||
bcc=options.test_bcc,
|
||||
template="test",
|
||||
template=options.template,
|
||||
sent_date=datetime.datetime.now(),
|
||||
):
|
||||
log.info("Test email sent")
|
||||
|
|
1
setup.py
1
setup.py
|
@ -22,6 +22,7 @@ extras_require = {
|
|||
"pytz",
|
||||
],
|
||||
"email": [
|
||||
"python-magic",
|
||||
"mako",
|
||||
],
|
||||
"pgsql": [
|
||||
|
|
|
@ -182,27 +182,96 @@ def test_parse_normal_opening_hours_multiple_periods():
|
|||
]
|
||||
|
||||
|
||||
#
|
||||
# Tests on is_closed
|
||||
#
|
||||
|
||||
|
||||
exceptional_closures = [
|
||||
"22/09/2017",
|
||||
"20/09/2017-22/09/2017",
|
||||
"20/09/2017-22/09/2017 18/09/2017",
|
||||
"25/11/2017",
|
||||
"26/11/2017 9h30-12h30",
|
||||
def test_parse_normal_opening_hours_is_sorted():
|
||||
assert opening_hours.parse_normal_opening_hours(
|
||||
[
|
||||
"samedi 9h30-18h",
|
||||
"lundi-vendredi 14h-18h 9h30-12h30",
|
||||
"samedi 9h30-12h",
|
||||
"dimanche 9h30-12h",
|
||||
]
|
||||
) == [
|
||||
{
|
||||
"days": ["lundi", "mardi", "mercredi", "jeudi", "vendredi"],
|
||||
"hours_periods": [
|
||||
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
|
||||
{"start": datetime.time(14, 0), "stop": datetime.time(18, 0)},
|
||||
],
|
||||
},
|
||||
{
|
||||
"days": ["samedi"],
|
||||
"hours_periods": [
|
||||
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
|
||||
],
|
||||
},
|
||||
{
|
||||
"days": ["samedi"],
|
||||
"hours_periods": [
|
||||
{"start": datetime.time(9, 30), "stop": datetime.time(18, 0)},
|
||||
],
|
||||
},
|
||||
{
|
||||
"days": ["dimanche"],
|
||||
"hours_periods": [
|
||||
{"start": datetime.time(9, 30), "stop": datetime.time(12, 0)},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# Tests on normal opening hours
|
||||
#
|
||||
normal_opening_hours = [
|
||||
"lundi-mardi jeudi 9h30-12h30 14h-16h30",
|
||||
"mercredi vendredi 9h30-12h30 14h-17h",
|
||||
"samedi",
|
||||
]
|
||||
normally_opened_datetime = datetime.datetime(2024, 3, 1, 10, 15)
|
||||
normally_opened_all_day_datetime = datetime.datetime(2024, 4, 6, 10, 15)
|
||||
normally_closed_datetime = datetime.datetime(2017, 3, 1, 20, 15)
|
||||
normally_closed_all_day_datetime = datetime.datetime(2024, 4, 7, 20, 15)
|
||||
|
||||
|
||||
def test_its_normally_open():
|
||||
assert opening_hours.its_normally_open(normal_opening_hours, when=normally_opened_datetime)
|
||||
|
||||
|
||||
def test_its_normally_open_all_day():
|
||||
assert opening_hours.its_normally_open(
|
||||
normal_opening_hours, when=normally_opened_all_day_datetime
|
||||
)
|
||||
|
||||
|
||||
def test_its_normally_closed():
|
||||
assert not opening_hours.its_normally_open(normal_opening_hours, when=normally_closed_datetime)
|
||||
|
||||
|
||||
def test_its_normally_closed_all_day():
|
||||
assert not opening_hours.its_normally_open(
|
||||
normal_opening_hours, when=normally_closed_all_day_datetime
|
||||
)
|
||||
|
||||
|
||||
def test_its_normally_open_ignore_time():
|
||||
assert opening_hours.its_normally_open(
|
||||
normal_opening_hours, when=normally_closed_datetime.date(), ignore_time=True
|
||||
)
|
||||
|
||||
|
||||
def test_its_normally_closed_ignore_time():
|
||||
assert not opening_hours.its_normally_open(
|
||||
normal_opening_hours, when=normally_closed_all_day_datetime.date(), ignore_time=True
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Tests on non working days
|
||||
#
|
||||
nonworking_public_holidays = [
|
||||
"1janvier",
|
||||
"paques",
|
||||
"lundi_paques",
|
||||
"1mai",
|
||||
"8mai",
|
||||
"jeudi_ascension",
|
||||
"lundi_pentecote",
|
||||
|
@ -212,6 +281,120 @@ nonworking_public_holidays = [
|
|||
"11novembre",
|
||||
"noel",
|
||||
]
|
||||
nonworking_date = datetime.date(2017, 1, 1)
|
||||
not_included_nonworking_date = datetime.date(2017, 5, 1)
|
||||
not_nonworking_date = datetime.date(2017, 5, 2)
|
||||
|
||||
|
||||
def test_its_nonworking_day():
|
||||
assert (
|
||||
opening_hours.its_nonworking_day(nonworking_public_holidays, date=nonworking_date) is True
|
||||
)
|
||||
|
||||
|
||||
def test_its_not_nonworking_day():
|
||||
assert (
|
||||
opening_hours.its_nonworking_day(
|
||||
nonworking_public_holidays,
|
||||
date=not_nonworking_date,
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_its_not_included_nonworking_day():
|
||||
assert (
|
||||
opening_hours.its_nonworking_day(
|
||||
nonworking_public_holidays,
|
||||
date=not_included_nonworking_date,
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Tests in exceptional closures
|
||||
#
|
||||
exceptional_closures = [
|
||||
"22/09/2017",
|
||||
"20/09/2017-22/09/2017",
|
||||
"20/09/2017-22/09/2017 18/09/2017",
|
||||
"25/11/2017",
|
||||
"26/11/2017 9h30-12h30",
|
||||
"27/11/2017 17h-18h 9h30-12h30",
|
||||
]
|
||||
exceptional_closure_all_day_date = datetime.date(2017, 9, 22)
|
||||
exceptional_closure_all_day_datetime = datetime.datetime.combine(
|
||||
exceptional_closure_all_day_date, datetime.time(20, 15)
|
||||
)
|
||||
exceptional_closure_datetime = datetime.datetime(2017, 11, 26, 10, 30)
|
||||
exceptional_closure_datetime_hours_period = {
|
||||
"start": datetime.time(9, 30),
|
||||
"stop": datetime.time(12, 30),
|
||||
}
|
||||
not_exceptional_closure_date = datetime.date(2019, 9, 22)
|
||||
|
||||
|
||||
def test_its_exceptionally_closed():
|
||||
assert (
|
||||
opening_hours.its_exceptionally_closed(
|
||||
exceptional_closures, when=exceptional_closure_all_day_datetime
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
|
||||
def test_its_not_exceptionally_closed():
|
||||
assert (
|
||||
opening_hours.its_exceptionally_closed(
|
||||
exceptional_closures, when=not_exceptional_closure_date
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_its_exceptionally_closed_all_day():
|
||||
assert (
|
||||
opening_hours.its_exceptionally_closed(
|
||||
exceptional_closures, when=exceptional_closure_all_day_datetime, all_day=True
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
|
||||
def test_its_not_exceptionally_closed_all_day():
|
||||
assert (
|
||||
opening_hours.its_exceptionally_closed(
|
||||
exceptional_closures, when=exceptional_closure_datetime, all_day=True
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_get_exceptional_closures_hours():
|
||||
assert opening_hours.get_exceptional_closures_hours(
|
||||
exceptional_closures, date=exceptional_closure_datetime.date()
|
||||
) == [exceptional_closure_datetime_hours_period]
|
||||
|
||||
|
||||
def test_get_exceptional_closures_hours_all_day():
|
||||
assert opening_hours.get_exceptional_closures_hours(
|
||||
exceptional_closures, date=exceptional_closure_all_day_date
|
||||
) == [{"start": datetime.datetime.min.time(), "stop": datetime.datetime.max.time()}]
|
||||
|
||||
|
||||
def test_get_exceptional_closures_hours_is_sorted():
|
||||
assert opening_hours.get_exceptional_closures_hours(
|
||||
["27/11/2017 17h-18h 9h30-12h30"], date=datetime.date(2017, 11, 27)
|
||||
) == [
|
||||
{"start": datetime.time(9, 30), "stop": datetime.time(12, 30)},
|
||||
{"start": datetime.time(17, 0), "stop": datetime.time(18, 0)},
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# Tests on is_closed
|
||||
#
|
||||
|
||||
|
||||
def test_is_closed_when_normaly_closed_by_hour():
|
||||
|
@ -255,7 +438,7 @@ def test_is_closed_when_normaly_closed_by_day():
|
|||
normal_opening_hours_values=normal_opening_hours,
|
||||
exceptional_closures_values=exceptional_closures,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2017, 5, 6, 14, 15),
|
||||
when=datetime.datetime(2017, 5, 7, 14, 15),
|
||||
) == {"closed": True, "exceptional_closure": False, "exceptional_closure_all_day": False}
|
||||
|
||||
|
||||
|
@ -300,3 +483,203 @@ def test_nonworking_french_public_days_of_the_year():
|
|||
"noel": datetime.date(2021, 12, 25),
|
||||
"saint_etienne": datetime.date(2021, 12, 26),
|
||||
}
|
||||
|
||||
|
||||
def test_next_opening_date():
|
||||
assert opening_hours.next_opening_date(
|
||||
normal_opening_hours_values=normal_opening_hours,
|
||||
exceptional_closures_values=exceptional_closures,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
date=datetime.date(2021, 4, 4),
|
||||
) == datetime.date(2021, 4, 6)
|
||||
|
||||
|
||||
def test_next_opening_hour():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=normal_opening_hours,
|
||||
exceptional_closures_values=exceptional_closures,
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 4, 10, 30),
|
||||
) == datetime.datetime(2021, 4, 6, 9, 30)
|
||||
|
||||
|
||||
def test_next_opening_hour_with_exceptionnal_closure_hours():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 4, 10, 30),
|
||||
) == datetime.datetime(2021, 4, 6, 16, 0)
|
||||
|
||||
|
||||
def test_next_opening_hour_with_exceptionnal_closure_day():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["06/04/2021"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 4, 10, 30),
|
||||
) == datetime.datetime(2021, 4, 7, 9, 0)
|
||||
|
||||
|
||||
def test_next_opening_hour_with_overlapsed_opening_hours():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
|
||||
exceptional_closures_values=["06/04/2021 9h-13h 14h-16h"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 4, 10, 30),
|
||||
) == datetime.datetime(2021, 4, 6, 8, 0)
|
||||
|
||||
|
||||
def test_next_opening_hour_with_too_large_exceptionnal_closure_days():
|
||||
assert (
|
||||
opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["06/04/2021-16-04/2021"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 4, 10, 30),
|
||||
max_anaylse_days=10,
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_next_opening_hour_on_opened_moment():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 6, 10, 30),
|
||||
) == datetime.datetime(2021, 4, 6, 10, 30)
|
||||
|
||||
|
||||
def test_next_opening_hour_on_same_day():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 6, 13, 0),
|
||||
) == datetime.datetime(2021, 4, 6, 14, 0)
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 6, 16, 0),
|
||||
) == datetime.datetime(2021, 4, 6, 16, 0)
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 6, 16, 0),
|
||||
) == datetime.datetime(2021, 4, 6, 16, 0)
|
||||
|
||||
|
||||
def test_next_opening_hour_on_opened_day_but_too_late():
|
||||
assert opening_hours.next_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2021, 4, 6, 23, 0),
|
||||
) == datetime.datetime(2021, 4, 7, 9, 0)
|
||||
|
||||
|
||||
def test_previous_opening_date():
|
||||
assert opening_hours.previous_opening_date(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
date=datetime.date(2024, 4, 1),
|
||||
) == datetime.date(2024, 3, 29)
|
||||
|
||||
|
||||
def test_previous_opening_hour():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 1, 10, 30),
|
||||
) == datetime.datetime(2024, 3, 29, 18, 0)
|
||||
|
||||
|
||||
def test_previous_opening_hour_with_exceptionnal_closure_hours():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["29/03/2024 14h-18h"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 1, 10, 30),
|
||||
) == datetime.datetime(2024, 3, 29, 12, 0)
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["29/03/2024 16h-18h"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 1, 10, 30),
|
||||
) == datetime.datetime(2024, 3, 29, 16, 0)
|
||||
|
||||
|
||||
def test_previous_opening_hour_with_exceptionnal_closure_day():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["29/03/2024"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 1, 10, 30),
|
||||
) == datetime.datetime(2024, 3, 28, 18, 0)
|
||||
|
||||
|
||||
def test_previous_opening_hour_with_overlapsed_opening_hours():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h", "mardi 8h-19h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 3, 8, 30),
|
||||
) == datetime.datetime(2024, 4, 2, 19, 0)
|
||||
|
||||
|
||||
def test_previous_opening_hour_with_too_large_exceptionnal_closure_days():
|
||||
assert (
|
||||
opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=["06/03/2024-16-04/2024"],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 17, 8, 30),
|
||||
max_anaylse_days=10,
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_previous_opening_hour_on_opened_moment():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 5, 10, 30),
|
||||
) == datetime.datetime(2024, 4, 5, 10, 30)
|
||||
|
||||
|
||||
def test_previous_opening_hour_on_same_day():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 5, 13, 0),
|
||||
) == datetime.datetime(2024, 4, 5, 12, 0)
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 5, 16, 0),
|
||||
) == datetime.datetime(2024, 4, 5, 16, 0)
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 5, 16, 0),
|
||||
) == datetime.datetime(2024, 4, 5, 16, 0)
|
||||
|
||||
|
||||
def test_previous_opening_hour_on_opened_day_but_too_early():
|
||||
assert opening_hours.previous_opening_hour(
|
||||
normal_opening_hours_values=["lundi-vendredi 9h-12h 14h-18h"],
|
||||
exceptional_closures_values=[],
|
||||
nonworking_public_holidays_values=nonworking_public_holidays,
|
||||
when=datetime.datetime(2024, 4, 5, 8, 0),
|
||||
) == datetime.datetime(2024, 4, 4, 18, 0)
|
||||
|
|
Loading…
Reference in a new issue