919 lines
36 KiB
Python
919 lines
36 KiB
Python
""" Opening hours helpers """
|
|
|
|
import datetime
|
|
import logging
|
|
import re
|
|
import time
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
week_days = ["lundi", "mardi", "mercredi", "jeudi", "vendredi", "samedi", "dimanche"]
|
|
date_format = "%d/%m/%Y"
|
|
date_pattern = re.compile("^([0-9]{2})/([0-9]{2})/([0-9]{4})$")
|
|
time_pattern = re.compile("^([0-9]{1,2})h([0-9]{2})?$")
|
|
_nonworking_french_public_days_of_the_year_cache = {}
|
|
|
|
|
|
def easter_date(year):
|
|
"""Compute easter date for the specified year"""
|
|
a = year // 100
|
|
b = year % 100
|
|
c = (3 * (a + 25)) // 4
|
|
d = (3 * (a + 25)) % 4
|
|
e = (8 * (a + 11)) // 25
|
|
f = (5 * a + b) % 19
|
|
g = (19 * f + c - e) % 30
|
|
h = (f + 11 * g) // 319
|
|
j = (60 * (5 - d) + b) // 4
|
|
k = (60 * (5 - d) + b) % 4
|
|
m = (2 * j - k - g + h) % 7
|
|
n = (g - h + m + 114) // 31
|
|
p = (g - h + m + 114) % 31
|
|
day = p + 1
|
|
month = n
|
|
return datetime.date(year, month, day)
|
|
|
|
|
|
def nonworking_french_public_days_of_the_year(year=None):
|
|
"""Compute dict of nonworking french public days for the specified year"""
|
|
if year is None:
|
|
year = datetime.date.today().year
|
|
if year not in _nonworking_french_public_days_of_the_year_cache:
|
|
dp = easter_date(year)
|
|
_nonworking_french_public_days_of_the_year_cache[year] = {
|
|
"1janvier": datetime.date(year, 1, 1),
|
|
"paques": dp,
|
|
"lundi_paques": (dp + datetime.timedelta(1)),
|
|
"1mai": datetime.date(year, 5, 1),
|
|
"8mai": datetime.date(year, 5, 8),
|
|
"jeudi_ascension": (dp + datetime.timedelta(39)),
|
|
"pentecote": (dp + datetime.timedelta(49)),
|
|
"lundi_pentecote": (dp + datetime.timedelta(50)),
|
|
"14juillet": datetime.date(year, 7, 14),
|
|
"15aout": datetime.date(year, 8, 15),
|
|
"1novembre": datetime.date(year, 11, 1),
|
|
"11novembre": datetime.date(year, 11, 11),
|
|
"noel": datetime.date(year, 12, 25),
|
|
"saint_etienne": datetime.date(year, 12, 26),
|
|
}
|
|
return _nonworking_french_public_days_of_the_year_cache[year]
|
|
|
|
|
|
def parse_exceptional_closures(values):
|
|
"""Parse exceptional closures values"""
|
|
exceptional_closures = []
|
|
for value in values:
|
|
days = []
|
|
hours_periods = []
|
|
words = value.strip().split()
|
|
for word in words:
|
|
if not word:
|
|
continue
|
|
parts = word.split("-")
|
|
if len(parts) == 1:
|
|
# ex: 31/02/2017
|
|
ptime = time.strptime(word, date_format)
|
|
date = datetime.date(ptime.tm_year, ptime.tm_mon, ptime.tm_mday)
|
|
if date not in days:
|
|
days.append(date)
|
|
elif len(parts) == 2:
|
|
# ex: 18/12/2017-20/12/2017 ou 9h-10h30
|
|
if date_pattern.match(parts[0]) and date_pattern.match(parts[1]):
|
|
# ex: 18/12/2017-20/12/2017
|
|
pstart = time.strptime(parts[0], date_format)
|
|
pstop = time.strptime(parts[1], date_format)
|
|
if pstop <= pstart:
|
|
raise ValueError(f"Day {parts[1]} <= {parts[0]}")
|
|
|
|
date = datetime.date(pstart.tm_year, pstart.tm_mon, pstart.tm_mday)
|
|
stop_date = datetime.date(pstop.tm_year, pstop.tm_mon, pstop.tm_mday)
|
|
while date <= stop_date:
|
|
if date not in days:
|
|
days.append(date)
|
|
date += datetime.timedelta(days=1)
|
|
else:
|
|
# ex: 9h-10h30
|
|
mstart = time_pattern.match(parts[0])
|
|
mstop = time_pattern.match(parts[1])
|
|
if not mstart or not mstop:
|
|
raise ValueError(f'"{word}" is not a valid time period')
|
|
hstart = datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0))
|
|
hstop = datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0))
|
|
if hstop <= hstart:
|
|
raise ValueError(f"Time {parts[1]} <= {parts[0]}")
|
|
hours_periods.append({"start": hstart, "stop": hstop})
|
|
else:
|
|
raise ValueError(f'Invalid number of part in this word: "{word}"')
|
|
if not days:
|
|
raise ValueError(f'No days found in value "{value}"')
|
|
exceptional_closures.append({"days": days, "hours_periods": hours_periods})
|
|
return exceptional_closures
|
|
|
|
|
|
def parse_normal_opening_hours(values):
|
|
"""Parse normal opening hours"""
|
|
normal_opening_hours = []
|
|
for value in values:
|
|
days = []
|
|
hours_periods = []
|
|
words = value.strip().split()
|
|
for word in words:
|
|
if not word:
|
|
continue
|
|
parts = word.split("-")
|
|
if len(parts) == 1:
|
|
# ex: jeudi
|
|
if word not in week_days:
|
|
raise ValueError(f'"{word}" is not a valid week day')
|
|
if word not in days:
|
|
days.append(word)
|
|
elif len(parts) == 2:
|
|
# ex: lundi-jeudi ou 9h-10h30
|
|
if parts[0] in week_days and parts[1] in week_days:
|
|
# ex: lundi-jeudi
|
|
if week_days.index(parts[1]) <= week_days.index(parts[0]):
|
|
raise ValueError(f'"{parts[1]}" is before "{parts[0]}"')
|
|
started = False
|
|
for d in week_days:
|
|
if not started and d != parts[0]:
|
|
continue
|
|
started = True
|
|
if d not in days:
|
|
days.append(d)
|
|
if d == parts[1]:
|
|
break
|
|
else:
|
|
# ex: 9h-10h30
|
|
mstart = time_pattern.match(parts[0])
|
|
mstop = time_pattern.match(parts[1])
|
|
if not mstart or not mstop:
|
|
raise ValueError(f'"{word}" is not a valid time period')
|
|
hstart = datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0))
|
|
hstop = datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0))
|
|
if hstop <= hstart:
|
|
raise ValueError(f"Time {parts[1]} <= {parts[0]}")
|
|
hours_periods.append({"start": hstart, "stop": hstop})
|
|
else:
|
|
raise ValueError(f'Invalid number of part in this word: "{word}"')
|
|
if not days and not hours_periods:
|
|
raise ValueError(f'No days or hours period found in this value: "{value}"')
|
|
normal_opening_hours.append({"days": days, "hours_periods": hours_periods})
|
|
for idx, noh in enumerate(normal_opening_hours):
|
|
normal_opening_hours[idx]["hours_periods"] = sorted_hours_periods(noh["hours_periods"])
|
|
return sorted_opening_hours(normal_opening_hours)
|
|
|
|
|
|
def sorted_hours_periods(hours_periods):
|
|
"""Sort hours periods"""
|
|
return sorted(hours_periods, key=lambda hp: (hp["start"], hp["stop"]))
|
|
|
|
|
|
def sorted_opening_hours(opening_hours):
|
|
"""Sort opening hours"""
|
|
return sorted(
|
|
opening_hours,
|
|
key=lambda x: (
|
|
week_days.index(x["days"][0]) if x["days"] else None,
|
|
x["hours_periods"][0]["start"] if x["hours_periods"] else datetime.datetime.min.time(),
|
|
x["hours_periods"][0]["stop"] if x["hours_periods"] else datetime.datetime.max.time(),
|
|
),
|
|
)
|
|
|
|
|
|
def its_nonworking_day(nonworking_public_holidays_values, date=None):
|
|
"""Check if is a non-working day"""
|
|
if not nonworking_public_holidays_values:
|
|
return False
|
|
date = date if date else datetime.date.today()
|
|
log.debug("its_nonworking_day(%s): values=%s", date, nonworking_public_holidays_values)
|
|
nonworking_days = nonworking_french_public_days_of_the_year(year=date.year)
|
|
for day in nonworking_public_holidays_values:
|
|
if day in nonworking_days and nonworking_days[day] == date:
|
|
log.debug("its_nonworking_day(%s): %s", date, day)
|
|
return True
|
|
return False
|
|
|
|
|
|
def its_exceptionally_closed(exceptional_closures_values, when=None, parse=True, all_day=False):
|
|
"""Check if it's exceptionally closed"""
|
|
if not exceptional_closures_values:
|
|
return False
|
|
when = when if when else datetime.datetime.now()
|
|
assert isinstance(when, (datetime.date, datetime.datetime))
|
|
when_date = when.date() if isinstance(when, datetime.datetime) else when
|
|
exceptional_closures = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if parse
|
|
else exceptional_closures_values
|
|
)
|
|
log.debug("its_exceptionally_closed(%s): exceptional closures=%s", when, exceptional_closures)
|
|
for cl in exceptional_closures:
|
|
if when_date not in cl["days"]:
|
|
log.debug(
|
|
"its_exceptionally_closed(%s): %s not in days (%s)", when, when_date, cl["days"]
|
|
)
|
|
continue
|
|
if not cl["hours_periods"]:
|
|
# All day exceptional closure
|
|
return True
|
|
if all_day:
|
|
# Wanted an all day closure, ignore it
|
|
continue
|
|
for hp in cl["hours_periods"]:
|
|
if hp["start"] <= when.time() <= hp["stop"]:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_exceptional_closures_hours(exceptional_closures_values, date=None, parse=True):
|
|
"""Get exceptional closures hours of the day"""
|
|
if not exceptional_closures_values:
|
|
return []
|
|
date = date if date else datetime.date.today()
|
|
exceptional_closures = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if parse
|
|
else exceptional_closures_values
|
|
)
|
|
log.debug(
|
|
"get_exceptional_closures_hours(%s): exceptional closures=%s", date, exceptional_closures
|
|
)
|
|
exceptional_closures_hours = []
|
|
for cl in exceptional_closures:
|
|
if date not in cl["days"]:
|
|
log.debug("get_exceptional_closures_hours(%s): not in days (%s)", date, cl["days"])
|
|
continue
|
|
if not cl["hours_periods"]:
|
|
log.debug(
|
|
"get_exceptional_closures_hours(%s): it's exceptionally closed all the day", date
|
|
)
|
|
return [
|
|
{
|
|
"start": datetime.datetime.min.time(),
|
|
"stop": datetime.datetime.max.time(),
|
|
}
|
|
]
|
|
exceptional_closures_hours.extend(cl["hours_periods"])
|
|
log.debug(
|
|
"get_exceptional_closures_hours(%s): exceptional closures hours=%s",
|
|
date,
|
|
exceptional_closures_hours,
|
|
)
|
|
return sorted_hours_periods(exceptional_closures_hours)
|
|
|
|
|
|
def its_normally_open(normal_opening_hours_values, when=None, parse=True, ignore_time=False):
|
|
"""Check if it's normally open"""
|
|
when = when if when else datetime.datetime.now()
|
|
if not normal_opening_hours_values:
|
|
log.debug(
|
|
"its_normally_open(%s): no normal opening hours defined, consider as opened", when
|
|
)
|
|
return True
|
|
when_weekday = week_days[when.timetuple().tm_wday]
|
|
log.debug("its_normally_open(%s): week day=%s", when, when_weekday)
|
|
normal_opening_hours = (
|
|
parse_normal_opening_hours(normal_opening_hours_values)
|
|
if parse
|
|
else normal_opening_hours_values
|
|
)
|
|
log.debug("its_normally_open(%s): normal opening hours=%s", when, normal_opening_hours)
|
|
for oh in normal_opening_hours:
|
|
if oh["days"] and when_weekday not in oh["days"]:
|
|
log.debug("its_normally_open(%s): %s not in days (%s)", when, when_weekday, oh["days"])
|
|
continue
|
|
if not oh["hours_periods"] or ignore_time:
|
|
return True
|
|
for hp in oh["hours_periods"]:
|
|
if hp["start"] <= when.time() <= hp["stop"]:
|
|
return True
|
|
log.debug("its_normally_open(%s): not in normal opening hours", when)
|
|
return False
|
|
|
|
|
|
def its_opening_day(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
date=None,
|
|
parse=True,
|
|
):
|
|
"""Check if it's an opening day"""
|
|
date = date if date else datetime.date.today()
|
|
if its_nonworking_day(nonworking_public_holidays_values, date=date):
|
|
return False
|
|
if its_exceptionally_closed(exceptional_closures_values, when=date, all_day=True, parse=parse):
|
|
return False
|
|
return its_normally_open(normal_opening_hours_values, when=date, parse=parse, ignore_time=True)
|
|
|
|
|
|
def is_closed(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
exceptional_closure_on_nonworking_public_days=False,
|
|
when=None,
|
|
on_error="raise",
|
|
):
|
|
"""Check if closed"""
|
|
if not when:
|
|
when = datetime.datetime.now()
|
|
when_date = when.date()
|
|
when_time = when.time()
|
|
when_weekday = week_days[when.timetuple().tm_wday]
|
|
on_error_result = None
|
|
if on_error == "closed":
|
|
on_error_result = {
|
|
"closed": True,
|
|
"exceptional_closure": False,
|
|
"exceptional_closure_all_day": False,
|
|
}
|
|
elif on_error == "opened":
|
|
on_error_result = {
|
|
"closed": False,
|
|
"exceptional_closure": False,
|
|
"exceptional_closure_all_day": False,
|
|
}
|
|
|
|
log.debug(
|
|
"When = %s => date = %s / time = %s / week day = %s",
|
|
when,
|
|
when_date,
|
|
when_time,
|
|
when_weekday,
|
|
)
|
|
# Handle non-working days
|
|
if its_nonworking_day(nonworking_public_holidays_values, date=when_date):
|
|
return {
|
|
"closed": True,
|
|
"exceptional_closure": exceptional_closure_on_nonworking_public_days,
|
|
"exceptional_closure_all_day": exceptional_closure_on_nonworking_public_days,
|
|
}
|
|
|
|
# Handle exceptional closures
|
|
try:
|
|
if its_exceptionally_closed(exceptional_closures_values, when=when):
|
|
return {
|
|
"closed": True,
|
|
"exceptional_closure": True,
|
|
"exceptional_closure_all_day": its_exceptionally_closed(
|
|
exceptional_closures_values, when=when, all_day=True
|
|
),
|
|
}
|
|
except ValueError as e:
|
|
if on_error_result is None:
|
|
log.error("Fail to parse exceptional closures", exc_info=True)
|
|
raise e from e
|
|
log.error("Fail to parse exceptional closures, consider as %s", on_error, exc_info=True)
|
|
return on_error_result
|
|
|
|
# Finally, handle normal opening hours
|
|
try:
|
|
return {
|
|
"closed": not its_normally_open(normal_opening_hours_values, when=when),
|
|
"exceptional_closure": False,
|
|
"exceptional_closure_all_day": False,
|
|
}
|
|
except ValueError as e: # pylint: disable=broad-except
|
|
if on_error_result is None:
|
|
log.error("Fail to parse normal opening hours", exc_info=True)
|
|
raise e from e
|
|
log.error("Fail to parse normal opening hours, consider as %s", on_error, exc_info=True)
|
|
return on_error_result
|
|
|
|
|
|
def next_opening_date(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
date=None,
|
|
max_anaylse_days=None,
|
|
parse=True,
|
|
):
|
|
"""Search for the next opening day"""
|
|
date = date if date else datetime.date.today()
|
|
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
|
if parse:
|
|
try:
|
|
normal_opening_hours_values = (
|
|
parse_normal_opening_hours(normal_opening_hours_values)
|
|
if normal_opening_hours_values
|
|
else None
|
|
)
|
|
exceptional_closures_values = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if exceptional_closures_values
|
|
else None
|
|
)
|
|
except ValueError: # pylint: disable=broad-except
|
|
log.error(
|
|
"next_opening_date(%s): fail to parse normal opening hours or exceptional closures",
|
|
date,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
added_days = 0
|
|
while added_days <= max_anaylse_days:
|
|
test_date = date + datetime.timedelta(days=added_days)
|
|
if its_opening_day(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
date=test_date,
|
|
parse=False,
|
|
):
|
|
return test_date
|
|
added_days += 1
|
|
log.debug(
|
|
"next_opening_date(%s): no opening day found in the next %d days", date, max_anaylse_days
|
|
)
|
|
return False
|
|
|
|
|
|
def next_opening_hour(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
when=None,
|
|
max_anaylse_days=None,
|
|
parse=True,
|
|
):
|
|
"""Search for the next opening hour"""
|
|
when = when if when else datetime.datetime.now()
|
|
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
|
if parse:
|
|
try:
|
|
normal_opening_hours_values = (
|
|
parse_normal_opening_hours(normal_opening_hours_values)
|
|
if normal_opening_hours_values
|
|
else None
|
|
)
|
|
exceptional_closures_values = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if exceptional_closures_values
|
|
else None
|
|
)
|
|
except ValueError: # pylint: disable=broad-except
|
|
log.error(
|
|
"next_opening_hour(%s): fail to parse normal opening hours or exceptional closures",
|
|
when,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
date = next_opening_date(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
date=when.date(),
|
|
max_anaylse_days=max_anaylse_days,
|
|
parse=False,
|
|
)
|
|
if not date:
|
|
log.debug(
|
|
"next_opening_hour(%s): no opening day found in the next %d days",
|
|
when,
|
|
max_anaylse_days,
|
|
)
|
|
return False
|
|
log.debug("next_opening_hour(%s): next opening date=%s", when, date)
|
|
weekday = week_days[date.timetuple().tm_wday]
|
|
log.debug("next_opening_hour(%s): next opening week day=%s", when, weekday)
|
|
exceptional_closures_hours = get_exceptional_closures_hours(
|
|
exceptional_closures_values, date=date, parse=False
|
|
)
|
|
log.debug(
|
|
"next_opening_hour(%s): next opening day exceptional closures hours=%s",
|
|
when,
|
|
exceptional_closures_hours,
|
|
)
|
|
next_opening_datetime = None
|
|
exceptionally_closed = False
|
|
exceptionally_closed_all_day = False
|
|
in_opening_hours = date != when.date()
|
|
for oh in normal_opening_hours_values:
|
|
if exceptionally_closed_all_day:
|
|
break
|
|
|
|
if oh["days"] and weekday not in oh["days"]:
|
|
log.debug("next_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
|
|
continue
|
|
|
|
log.debug(
|
|
"next_opening_hour(%s): %s in days (%s), handle opening hours %s",
|
|
when,
|
|
weekday,
|
|
oh["days"],
|
|
oh["hours_periods"],
|
|
)
|
|
|
|
if not oh["hours_periods"]:
|
|
log.debug(
|
|
"next_opening_hour(%s): %s is an all day opening day, handle exceptional closures "
|
|
"hours %s to find the minimal opening time",
|
|
when,
|
|
weekday,
|
|
exceptional_closures_hours,
|
|
)
|
|
if date == when.date():
|
|
in_opening_hours = True
|
|
test_time = when.time() if when.date() == date else datetime.datetime.min.time()
|
|
for cl in exceptional_closures_hours:
|
|
if cl["start"] <= test_time < cl["stop"]:
|
|
if cl["stop"] >= datetime.datetime.max.time():
|
|
exceptionally_closed = True
|
|
exceptionally_closed_all_day = True
|
|
next_opening_datetime = None
|
|
break
|
|
test_time = cl["stop"]
|
|
else:
|
|
break
|
|
if not exceptionally_closed_all_day:
|
|
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
|
|
next_opening_datetime = (
|
|
candidate_next_opening_datetime
|
|
if not next_opening_datetime
|
|
or candidate_next_opening_datetime < next_opening_datetime
|
|
else next_opening_datetime
|
|
)
|
|
continue
|
|
|
|
log.debug(
|
|
"next_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
|
|
"minimal starting time",
|
|
when,
|
|
oh["hours_periods"],
|
|
weekday,
|
|
)
|
|
test_time = datetime.datetime.max.time()
|
|
for hp in oh["hours_periods"]:
|
|
if date == when.date() and hp["stop"] < when.time():
|
|
log.debug(
|
|
"next_opening_hour(%s): ignore opening hours %s before specified when time %s",
|
|
when,
|
|
hp,
|
|
when.time(),
|
|
)
|
|
continue
|
|
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
|
|
in_opening_hours = True
|
|
if exceptional_closures_hours:
|
|
log.debug(
|
|
"next_opening_hour(%s): check if opening hours %s match with exceptional "
|
|
"closure hours %s",
|
|
when,
|
|
hp,
|
|
exceptional_closures_hours,
|
|
)
|
|
for cl in exceptional_closures_hours:
|
|
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
|
|
log.debug(
|
|
"next_opening_hour(%s): opening hour %s is included in exceptional "
|
|
"closure hours %s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
exceptionally_closed = True
|
|
break
|
|
if hp["start"] < cl["start"]:
|
|
log.debug(
|
|
"next_opening_hour(%s): opening hour %s start before closure hours %s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
test_time = hp["start"] if hp["start"] < test_time else test_time
|
|
elif cl["stop"] >= hp["start"] and cl["stop"] < hp["stop"]:
|
|
log.debug(
|
|
"next_opening_hour(%s): opening hour %s end after closure hours %s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
test_time = cl["stop"] if cl["stop"] < test_time else test_time
|
|
elif hp["start"] < test_time:
|
|
log.debug(
|
|
"next_opening_hour(%s): no exceptional closure hours, use opening hours start "
|
|
"time %s",
|
|
when,
|
|
hp["start"],
|
|
)
|
|
test_time = hp["start"]
|
|
|
|
if test_time < datetime.datetime.max.time():
|
|
if date == when.date() and test_time < when.time():
|
|
test_time = when.time()
|
|
candidate_next_opening_datetime = datetime.datetime.combine(date, test_time)
|
|
next_opening_datetime = (
|
|
candidate_next_opening_datetime
|
|
if not next_opening_datetime
|
|
or candidate_next_opening_datetime < next_opening_datetime
|
|
else next_opening_datetime
|
|
)
|
|
|
|
if not next_opening_datetime and (
|
|
exceptionally_closed or (date == when.date() and not in_opening_hours)
|
|
):
|
|
new_max_anaylse_days = max_anaylse_days - (date - when.date()).days
|
|
if new_max_anaylse_days > 0:
|
|
log.debug(
|
|
"next_opening_hour(%s): exceptionally closed on %s, try on following %d days",
|
|
when,
|
|
date,
|
|
new_max_anaylse_days,
|
|
)
|
|
next_opening_datetime = next_opening_hour(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
when=datetime.datetime.combine(
|
|
date + datetime.timedelta(days=1), datetime.datetime.min.time()
|
|
),
|
|
max_anaylse_days=new_max_anaylse_days,
|
|
parse=False,
|
|
)
|
|
if not next_opening_datetime:
|
|
log.debug(
|
|
"next_opening_hour(%s): no opening hours found in next %d days", when, max_anaylse_days
|
|
)
|
|
return False
|
|
log.debug("next_opening_hour(%s): next opening hours=%s", when, next_opening_datetime)
|
|
return next_opening_datetime
|
|
|
|
|
|
def previous_opening_date(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
date=None,
|
|
max_anaylse_days=None,
|
|
parse=True,
|
|
):
|
|
"""Search for the previous opening day"""
|
|
date = date if date else datetime.date.today()
|
|
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
|
if parse:
|
|
try:
|
|
normal_opening_hours_values = (
|
|
parse_normal_opening_hours(normal_opening_hours_values)
|
|
if normal_opening_hours_values
|
|
else None
|
|
)
|
|
exceptional_closures_values = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if exceptional_closures_values
|
|
else None
|
|
)
|
|
except ValueError: # pylint: disable=broad-except
|
|
log.error(
|
|
"previous_opening_date(%s): fail to parse normal opening hours or exceptional "
|
|
"closures",
|
|
date,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
days = 0
|
|
while days <= max_anaylse_days:
|
|
test_date = date - datetime.timedelta(days=days)
|
|
if its_opening_day(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
date=test_date,
|
|
parse=False,
|
|
):
|
|
return test_date
|
|
days += 1
|
|
log.debug(
|
|
"previous_opening_date(%s): no opening day found in the next %d days",
|
|
date,
|
|
max_anaylse_days,
|
|
)
|
|
return False
|
|
|
|
|
|
def previous_opening_hour(
|
|
normal_opening_hours_values=None,
|
|
exceptional_closures_values=None,
|
|
nonworking_public_holidays_values=None,
|
|
when=None,
|
|
max_anaylse_days=None,
|
|
parse=True,
|
|
):
|
|
"""Search for the previous opening hour"""
|
|
when = when if when else datetime.datetime.now()
|
|
max_anaylse_days = max_anaylse_days if max_anaylse_days is not None else 30
|
|
if parse:
|
|
try:
|
|
normal_opening_hours_values = (
|
|
parse_normal_opening_hours(normal_opening_hours_values)
|
|
if normal_opening_hours_values
|
|
else None
|
|
)
|
|
exceptional_closures_values = (
|
|
parse_exceptional_closures(exceptional_closures_values)
|
|
if exceptional_closures_values
|
|
else None
|
|
)
|
|
except ValueError: # pylint: disable=broad-except
|
|
log.error(
|
|
"previous_opening_hour(%s): fail to parse normal opening hours or exceptional "
|
|
"closures",
|
|
when,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
date = previous_opening_date(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
date=when.date(),
|
|
max_anaylse_days=max_anaylse_days,
|
|
parse=False,
|
|
)
|
|
if not date:
|
|
log.debug(
|
|
"previous_opening_hour(%s): no opening day found in the previous %d days",
|
|
when,
|
|
max_anaylse_days,
|
|
)
|
|
return False
|
|
log.debug("previous_opening_hour(%s): previous opening date=%s", when, date)
|
|
weekday = week_days[date.timetuple().tm_wday]
|
|
log.debug("previous_opening_hour(%s): previous opening week day=%s", when, weekday)
|
|
exceptional_closures_hours = get_exceptional_closures_hours(
|
|
exceptional_closures_values, date=date, parse=False
|
|
)
|
|
log.debug(
|
|
"previous_opening_hour(%s): previous opening day exceptional closures hours=%s",
|
|
when,
|
|
exceptional_closures_hours,
|
|
)
|
|
previous_opening_datetime = None
|
|
exceptionally_closed = False
|
|
exceptionally_closed_all_day = False
|
|
in_opening_hours = date != when.date()
|
|
for oh in reversed(normal_opening_hours_values):
|
|
if exceptionally_closed_all_day:
|
|
break
|
|
|
|
if oh["days"] and weekday not in oh["days"]:
|
|
log.debug("previous_opening_hour(%s): %s not in days (%s)", when, weekday, oh["days"])
|
|
continue
|
|
|
|
log.debug(
|
|
"previous_opening_hour(%s): %s in days (%s), handle opening hours %s",
|
|
when,
|
|
weekday,
|
|
oh["days"],
|
|
oh["hours_periods"],
|
|
)
|
|
|
|
if not oh["hours_periods"]:
|
|
log.debug(
|
|
"previous_opening_hour(%s): %s is an all day opening day, handle exceptional "
|
|
"closures hours %s to find the maximal opening time",
|
|
when,
|
|
weekday,
|
|
exceptional_closures_hours,
|
|
)
|
|
if date == when.date():
|
|
in_opening_hours = True
|
|
test_time = when.time() if when.date() == date else datetime.datetime.max.time()
|
|
for cl in exceptional_closures_hours:
|
|
if cl["start"] <= test_time < cl["stop"]:
|
|
if cl["start"] <= datetime.datetime.min.time():
|
|
exceptionally_closed = True
|
|
exceptionally_closed_all_day = True
|
|
previous_opening_datetime = None
|
|
break
|
|
test_time = cl["start"]
|
|
else:
|
|
break
|
|
if not exceptionally_closed_all_day:
|
|
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
|
|
previous_opening_datetime = (
|
|
candidate_previous_opening_datetime
|
|
if not previous_opening_datetime
|
|
or candidate_previous_opening_datetime > previous_opening_datetime
|
|
else previous_opening_datetime
|
|
)
|
|
continue
|
|
|
|
log.debug(
|
|
"previous_opening_hour(%s): only opened during some hours periods (%s) on %s, find the "
|
|
"maximal opening time",
|
|
when,
|
|
oh["hours_periods"],
|
|
weekday,
|
|
)
|
|
test_time = datetime.datetime.min.time()
|
|
for hp in reversed(oh["hours_periods"]):
|
|
if date == when.date() and hp["start"] > when.time():
|
|
log.debug(
|
|
"previous_opening_hour(%s): ignore opening hours %s starting before specified "
|
|
"when time %s",
|
|
when,
|
|
hp,
|
|
when.time(),
|
|
)
|
|
continue
|
|
if date == when.date() and hp["start"] <= when.time() < hp["stop"]:
|
|
in_opening_hours = True
|
|
if exceptional_closures_hours:
|
|
log.debug(
|
|
"previous_opening_hour(%s): check if opening hours %s match with exceptional "
|
|
"closure hours %s",
|
|
when,
|
|
hp,
|
|
exceptional_closures_hours,
|
|
)
|
|
for cl in reversed(exceptional_closures_hours):
|
|
if cl["start"] <= hp["start"] and cl["stop"] >= hp["stop"]:
|
|
log.debug(
|
|
"previous_opening_hour(%s): opening hour %s is included in exceptional "
|
|
"closure hours %s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
exceptionally_closed = True
|
|
break
|
|
if cl["stop"] < hp["stop"]:
|
|
log.debug(
|
|
"previous_opening_hour(%s): opening hour %s end after closure hours %s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
test_time = hp["stop"] if hp["stop"] > test_time else test_time
|
|
elif cl["start"] > hp["stop"]:
|
|
log.debug(
|
|
"previous_opening_hour(%s): opening hour %s start before closure hours "
|
|
"%s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
test_time = hp["stop"] if hp["stop"] > test_time else test_time
|
|
elif cl["stop"] >= hp["stop"] and cl["start"] > hp["start"]:
|
|
log.debug(
|
|
"previous_opening_hour(%s): opening hour %s start before closure hours "
|
|
"%s",
|
|
when,
|
|
hp,
|
|
cl,
|
|
)
|
|
test_time = cl["start"] if cl["start"] > test_time else test_time
|
|
elif hp["stop"] > test_time:
|
|
log.debug(
|
|
"previous_opening_hour(%s): no exceptional closure hours, use opening hours "
|
|
"stop time %s",
|
|
when,
|
|
hp["stop"],
|
|
)
|
|
test_time = hp["stop"]
|
|
|
|
if test_time > datetime.datetime.min.time():
|
|
if date == when.date() and test_time > when.time():
|
|
test_time = when.time()
|
|
candidate_previous_opening_datetime = datetime.datetime.combine(date, test_time)
|
|
previous_opening_datetime = (
|
|
candidate_previous_opening_datetime
|
|
if not previous_opening_datetime
|
|
or candidate_previous_opening_datetime > previous_opening_datetime
|
|
else previous_opening_datetime
|
|
)
|
|
|
|
if not previous_opening_datetime and (
|
|
exceptionally_closed or (date == when.date() and not in_opening_hours)
|
|
):
|
|
new_max_anaylse_days = max_anaylse_days - (when.date() - date).days
|
|
if new_max_anaylse_days > 0:
|
|
log.debug(
|
|
"previous_opening_hour(%s): exceptionally closed on %s, try on previous %d days",
|
|
when,
|
|
date,
|
|
new_max_anaylse_days,
|
|
)
|
|
previous_opening_datetime = previous_opening_hour(
|
|
normal_opening_hours_values=normal_opening_hours_values,
|
|
exceptional_closures_values=exceptional_closures_values,
|
|
nonworking_public_holidays_values=nonworking_public_holidays_values,
|
|
when=datetime.datetime.combine(
|
|
date - datetime.timedelta(days=1), datetime.datetime.max.time()
|
|
),
|
|
max_anaylse_days=new_max_anaylse_days,
|
|
parse=False,
|
|
)
|
|
if not previous_opening_datetime:
|
|
log.debug(
|
|
"previous_opening_hour(%s): no opening hours found in previous %d days",
|
|
when,
|
|
max_anaylse_days,
|
|
)
|
|
return False
|
|
log.debug(
|
|
"previous_opening_hour(%s): previous opening hours=%s", when, previous_opening_datetime
|
|
)
|
|
return previous_opening_datetime
|