Code cleaning

This commit is contained in:
Benjamin Renard 2023-01-06 22:13:28 +01:00
parent 815081d544
commit b0fd2b1c6d
9 changed files with 148 additions and 101 deletions

View File

@ -49,8 +49,8 @@ def pretty_format_dict(value, encoding='utf8', prefix=None):
result = []
for key in sorted(value.keys()):
result.append(
f'{prefix}- {key} : ' +
pretty_format_value_in_list(
f'{prefix}- {key} : '
+ pretty_format_value_in_list(
value[key],
encoding=encoding,
prefix=prefix
@ -65,8 +65,8 @@ def pretty_format_list(row, encoding='utf8', prefix=None):
result = []
for idx, values in enumerate(row):
result.append(
f'{prefix}- #{idx} : ' +
pretty_format_value_in_list(
f'{prefix}- #{idx} : '
+ pretty_format_value_in_list(
values,
encoding=encoding,
prefix=prefix

View File

@ -135,10 +135,10 @@ class EmailClient(ConfigurableObject): # pylint: disable=useless-object-inherit
msg['Date'] = email.utils.formatdate(None, True)
encoding = encoding if encoding else self._get_option('encoding')
if template:
assert template in self.templates, "Unknwon template %s" % template
assert template in self.templates, f'Unknwon template {template}'
# Handle subject from template
if not subject:
assert self.templates[template].get('subject'), 'No subject defined in template %s' % template
assert self.templates[template].get('subject'), f'No subject defined in template {template}'
msg['Subject'] = self.templates[template]['subject'].format(**template_vars)
# Put HTML part in last one to prefered it
@ -168,14 +168,18 @@ class EmailClient(ConfigurableObject): # pylint: disable=useless-object-inherit
part = MIMEBase('application', "octet-stream")
part.set_payload(fp.read())
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(filepath))
part.add_header(
'Content-Disposition',
f'attachment; filename="{os.path.basename(filepath)}"')
msg.attach(part)
if attachment_payloads:
for filename, payload in attachment_payloads:
part = MIMEBase('application', "octet-stream")
part.set_payload(payload)
encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % filename)
part.add_header(
'Content-Disposition',
f'attachment; filename="{filename}"')
msg.attach(part)
return msg

View File

@ -82,7 +82,7 @@ def parse_exceptional_closures(values):
pstart = time.strptime(parts[0], date_format)
pstop = time.strptime(parts[1], date_format)
if pstop <= pstart:
raise ValueError('Day %s <= %s' % (parts[1], parts[0]))
raise ValueError(f'Day {parts[1]} <= {parts[0]}')
date = datetime.date(pstart.tm_year, pstart.tm_mon, pstart.tm_mday)
stop_date = datetime.date(pstop.tm_year, pstop.tm_mon, pstop.tm_mday)
@ -95,16 +95,16 @@ def parse_exceptional_closures(values):
mstart = time_pattern.match(parts[0])
mstop = time_pattern.match(parts[1])
if not mstart or not mstop:
raise ValueError('"%s" is not a valid time period' % word)
raise ValueError(f'"{word}" is not a valid time period')
hstart = datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0))
hstop = datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0))
if hstop <= hstart:
raise ValueError('Time %s <= %s' % (parts[1], parts[0]))
raise ValueError(f'Time {parts[1]} <= {parts[0]}')
hours_periods.append({'start': hstart, 'stop': hstop})
else:
raise ValueError('Invalid number of part in this word: "%s"' % word)
raise ValueError(f'Invalid number of part in this word: "{word}"')
if not days:
raise ValueError('No days found in value "%s"' % value)
raise ValueError(f'No days found in value "{value}"')
exceptional_closures.append({'days': days, 'hours_periods': hours_periods})
return exceptional_closures
@ -123,7 +123,7 @@ def parse_normal_opening_hours(values):
if len(parts) == 1:
# ex: jeudi
if word not in week_days:
raise ValueError('"%s" is not a valid week day' % word)
raise ValueError(f'"{word}" is not a valid week day')
if word not in days:
days.append(word)
elif len(parts) == 2:
@ -131,7 +131,7 @@ def parse_normal_opening_hours(values):
if parts[0] in week_days and parts[1] in week_days:
# ex: lundi-jeudi
if week_days.index(parts[1]) <= week_days.index(parts[0]):
raise ValueError('"%s" is before "%s"' % (parts[1], parts[0]))
raise ValueError(f'"{parts[1]}" is before "{parts[0]}"')
started = False
for d in week_days:
if not started and d != parts[0]:
@ -146,16 +146,16 @@ def parse_normal_opening_hours(values):
mstart = time_pattern.match(parts[0])
mstop = time_pattern.match(parts[1])
if not mstart or not mstop:
raise ValueError('"%s" is not a valid time period' % word)
raise ValueError(f'"{word}" is not a valid time period')
hstart = datetime.time(int(mstart.group(1)), int(mstart.group(2) or 0))
hstop = datetime.time(int(mstop.group(1)), int(mstop.group(2) or 0))
if hstop <= hstart:
raise ValueError('Time %s <= %s' % (parts[1], parts[0]))
raise ValueError(f'Time {parts[1]} <= {parts[0]}')
hours_periods.append({'start': hstart, 'stop': hstop})
else:
raise ValueError('Invalid number of part in this word: "%s"' % word)
raise ValueError(f'Invalid number of part in this word: "{word}"')
if not days and not hours_periods:
raise ValueError('No days or hours period found in this value: "%s"' % value)
raise ValueError(f'No days or hours period found in this value: "{value}"')
normal_opening_hours.append({'days': days, 'hours_periods': hours_periods})
return normal_opening_hours
@ -173,18 +173,28 @@ def is_closed(
when_weekday = week_days[when.timetuple().tm_wday]
on_error_result = None
if on_error == 'closed':
on_error_result = {'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
on_error_result = {
'closed': True, 'exceptional_closure': False,
'exceptional_closure_all_day': False}
elif on_error == 'opened':
on_error_result = {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
on_error_result = {
'closed': False, 'exceptional_closure': False,
'exceptional_closure_all_day': False}
log.debug("When = %s => date = %s / time = %s / week day = %s", when, when_date, when_time, when_weekday)
log.debug(
"When = %s => date = %s / time = %s / week day = %s",
when, when_date, when_time, when_weekday)
if nonworking_public_holidays_values:
log.debug("Nonworking public holidays: %s", nonworking_public_holidays_values)
nonworking_days = nonworking_french_public_days_of_the_year()
for day in nonworking_public_holidays_values:
if day in nonworking_days and when_date == nonworking_days[day]:
log.debug("Non working day: %s", day)
return {'closed': True, 'exceptional_closure': exceptional_closure_on_nonworking_public_days, 'exceptional_closure_all_day': exceptional_closure_on_nonworking_public_days}
return {
'closed': True,
'exceptional_closure': exceptional_closure_on_nonworking_public_days,
'exceptional_closure_all_day': exceptional_closure_on_nonworking_public_days
}
if exceptional_closures_values:
try:
@ -201,10 +211,14 @@ def is_closed(
continue
if not cl['hours_periods']:
# All day exceptional closure
return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': True}
return {
'closed': True, 'exceptional_closure': True,
'exceptional_closure_all_day': True}
for hp in cl['hours_periods']:
if hp['start'] <= when_time <= hp['stop']:
return {'closed': True, 'exceptional_closure': True, 'exceptional_closure_all_day': False}
return {
'closed': True, 'exceptional_closure': True,
'exceptional_closure_all_day': False}
if normal_opening_hours_values:
try:
@ -221,12 +235,21 @@ def is_closed(
continue
if not oh['hours_periods']:
# All day opened
return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
return {
'closed': False, 'exceptional_closure': False,
'exceptional_closure_all_day': False}
for hp in oh['hours_periods']:
if hp['start'] <= when_time <= hp['stop']:
return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
return {
'closed': False, 'exceptional_closure': False,
'exceptional_closure_all_day': False}
log.debug("Not in normal opening hours => closed")
return {'closed': True, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
return {
'closed': True, 'exceptional_closure': False,
'exceptional_closure_all_day': False}
# Not a nonworking day, not during exceptional closure and no normal opening hours defined => Opened
return {'closed': False, 'exceptional_closure': False, 'exceptional_closure_all_day': False}
# Not a nonworking day, not during exceptional closure and no normal opening
# hours defined => Opened
return {
'closed': False, 'exceptional_closure': False,
'exceptional_closure_all_day': False}

View File

@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
def init_logging(options, name, report=None):
""" Initialize logging from calling script options """
logformat = '%(asctime)s - ' + name + ' - %(levelname)s - %(message)s'
logformat = f'%(asctime)s - {name} - %(levelname)s - %(message)s'
if options.debug:
loglevel = logging.DEBUG
elif options.verbose:
@ -64,7 +64,9 @@ def get_opts_parser(desc=None, just_try=False, just_one=False, progress=False, c
action="store",
type=str,
dest="logfile",
help="Log file path (default: %s)" % get_default_opt_value(config, default_config, 'logfile'),
help=(
'Log file path (default: '
f'{get_default_opt_value(config, default_config, "logfile")})'),
default=get_default_opt_value(config, default_config, 'logfile')
)
@ -109,7 +111,7 @@ def add_email_opts(parser, config=None):
default_config = dict(
smtp_host="127.0.0.1", smtp_port=25, smtp_ssl=False, smtp_tls=False, smtp_user=None,
smtp_password=None, smtp_debug=False, email_encoding=sys.getdefaultencoding(),
sender_name=getpass.getuser(), sender_email=getpass.getuser() + '@' + socket.gethostname(),
sender_name=getpass.getuser(), sender_email=f'{getpass.getuser()}@{socket.gethostname()}',
catch_all=False
)
@ -118,7 +120,9 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_smtp_host",
help="SMTP host (default: %s)" % get_default_opt_value(config, default_config, 'smtp_host'),
help=(
'SMTP host (default: '
f'{get_default_opt_value(config, default_config, "smtp_host")})'),
default=get_default_opt_value(config, default_config, 'smtp_host')
)
@ -127,7 +131,7 @@ def add_email_opts(parser, config=None):
action="store",
type=int,
dest="email_smtp_port",
help="SMTP port (default: %s)" % get_default_opt_value(config, default_config, 'smtp_port'),
help=f'SMTP port (default: {get_default_opt_value(config, default_config, "smtp_port")})',
default=get_default_opt_value(config, default_config, 'smtp_port')
)
@ -135,7 +139,7 @@ def add_email_opts(parser, config=None):
'--smtp-ssl',
action="store_true",
dest="email_smtp_ssl",
help="Use SSL (default: %s)" % get_default_opt_value(config, default_config, 'smtp_ssl'),
help=f'Use SSL (default: {get_default_opt_value(config, default_config, "smtp_ssl")})',
default=get_default_opt_value(config, default_config, 'smtp_ssl')
)
@ -143,7 +147,7 @@ def add_email_opts(parser, config=None):
'--smtp-tls',
action="store_true",
dest="email_smtp_tls",
help="Use TLS (default: %s)" % get_default_opt_value(config, default_config, 'smtp_tls'),
help=f'Use TLS (default: {get_default_opt_value(config, default_config, "smtp_tls")})',
default=get_default_opt_value(config, default_config, 'smtp_tls')
)
@ -152,7 +156,7 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_smtp_user",
help="SMTP username (default: %s)" % get_default_opt_value(config, default_config, 'smtp_user'),
help=f'SMTP username (default: {get_default_opt_value(config, default_config, "smtp_user")})',
default=get_default_opt_value(config, default_config, 'smtp_user')
)
@ -161,7 +165,7 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_smtp_password",
help="SMTP password (default: %s)" % get_default_opt_value(config, default_config, 'smtp_password'),
help=f'SMTP password (default: {get_default_opt_value(config, default_config, "smtp_password")})',
default=get_default_opt_value(config, default_config, 'smtp_password')
)
@ -169,7 +173,7 @@ def add_email_opts(parser, config=None):
'--smtp-debug',
action="store_true",
dest="email_smtp_debug",
help="Debug SMTP connection (default: %s)" % get_default_opt_value(config, default_config, 'smtp_debug'),
help=f'Debug SMTP connection (default: {get_default_opt_value(config, default_config, "smtp_debug")})',
default=get_default_opt_value(config, default_config, 'smtp_debug')
)
@ -178,7 +182,7 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_encoding",
help="SMTP encoding (default: %s)" % get_default_opt_value(config, default_config, 'email_encoding'),
help=f'SMTP encoding (default: {get_default_opt_value(config, default_config, "email_encoding")})',
default=get_default_opt_value(config, default_config, 'email_encoding')
)
@ -187,7 +191,7 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_sender_name",
help="Sender name (default: %s)" % get_default_opt_value(config, default_config, 'sender_name'),
help=f'Sender name (default: {get_default_opt_value(config, default_config, "sender_name")})',
default=get_default_opt_value(config, default_config, 'sender_name')
)
@ -196,7 +200,7 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_sender_email",
help="Sender email (default: %s)" % get_default_opt_value(config, default_config, 'sender_email'),
help=f'Sender email (default: {get_default_opt_value(config, default_config, "sender_email")})',
default=get_default_opt_value(config, default_config, 'sender_email')
)
@ -205,7 +209,9 @@ def add_email_opts(parser, config=None):
action="store",
type=str,
dest="email_catch_all",
help="Catch all sent email: specify catch recipient email address (default: %s)" % get_default_opt_value(config, default_config, 'catch_all'),
help=(
'Catch all sent email: specify catch recipient email address '
f'(default: {get_default_opt_value(config, default_config, "catch_all")})'),
default=get_default_opt_value(config, default_config, 'catch_all')
)

View File

@ -29,49 +29,49 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
init_logging(options, 'Test LDAP helpers')
now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal())
print("Now = %s" % now)
print(f'Now = {now}')
datestring_now = format_datetime(now)
print("format_datetime : %s" % datestring_now)
print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc))
print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local'))
print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc))
print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal()))
print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local'))
print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo'))
print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True))
print(f'format_datetime : {datestring_now}')
print(f'format_datetime (from_timezone=utc) : {format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc)}')
print(f'format_datetime (from_timezone=local) : {format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())}')
print(f'format_datetime (from_timezone=local) : {format_datetime(now.replace(tzinfo=None), from_timezone="local")}')
print(f'format_datetime (from_timezone=Paris) : {format_datetime(now.replace(tzinfo=None), from_timezone="Europe/Paris")}')
print(f'format_datetime (to_timezone=utc) : {format_datetime(now, to_timezone=pytz.utc)}')
print(f'format_datetime (to_timezone=local) : {format_datetime(now, to_timezone=dateutil.tz.tzlocal())}')
print(f'format_datetime (to_timezone=local) : {format_datetime(now, to_timezone="local")}')
print(f'format_datetime (to_timezone=Tokyo) : {format_datetime(now, to_timezone="Asia/Tokyo")}')
print(f'format_datetime (naive=True) : {format_datetime(now, naive=True)}')
print("format_date : %s" % format_date(now))
print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc))
print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local'))
print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc))
print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal()))
print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local'))
print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo'))
print("format_date (naive=True) : %s" % format_date(now, naive=True))
print(f'format_date : {format_date(now)}')
print(f'format_date (from_timezone=utc) : {format_date(now.replace(tzinfo=None), from_timezone=pytz.utc)}')
print(f'format_date (from_timezone=local) : {format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())}')
print(f'format_date (from_timezone=local) : {format_date(now.replace(tzinfo=None), from_timezone="local")}')
print(f'format_date (from_timezone=Paris) : {format_date(now.replace(tzinfo=None), from_timezone="Europe/Paris")}')
print(f'format_date (to_timezone=utc) : {format_date(now, to_timezone=pytz.utc)}')
print(f'format_date (to_timezone=local) : {format_date(now, to_timezone=dateutil.tz.tzlocal())}')
print(f'format_date (to_timezone=local) : {format_date(now, to_timezone="local")}')
print(f'format_date (to_timezone=Tokyo) : {format_date(now, to_timezone="Asia/Tokyo")}')
print(f'format_date (naive=True) : {format_date(now, naive=True)}')
print("parse_datetime : %s" % parse_datetime(datestring_now))
print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc))
print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local'))
print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris'))
print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc))
print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal()))
print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local'))
print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo'))
print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True))
print(f'parse_datetime : {parse_datetime(datestring_now)}')
print(f'parse_datetime (default_timezone=utc) : {parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc)}')
print(f'parse_datetime (default_timezone=local) : {parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())}')
print(f'parse_datetime (default_timezone=local) : {parse_datetime(datestring_now[0:-1], default_timezone="local")}')
print(f'parse_datetime (default_timezone=Paris) : {parse_datetime(datestring_now[0:-1], default_timezone="Europe/Paris")}')
print(f'parse_datetime (to_timezone=utc) : {parse_datetime(datestring_now, to_timezone=pytz.utc)}')
print(f'parse_datetime (to_timezone=local) : {parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal())}')
print(f'parse_datetime (to_timezone=local) : {parse_datetime(datestring_now, to_timezone="local")}')
print(f'parse_datetime (to_timezone=Tokyo) : {parse_datetime(datestring_now, to_timezone="Asia/Tokyo")}')
print(f'parse_datetime (naive=True) : {parse_datetime(datestring_now, naive=True)}')
print("parse_date : %s" % parse_date(datestring_now))
print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc))
print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local'))
print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris'))
print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc))
print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal()))
print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local'))
print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo'))
print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True))
print(f'parse_date : {parse_date(datestring_now)}')
print(f'parse_date (default_timezone=utc) : {parse_date(datestring_now[0:-1], default_timezone=pytz.utc)}')
print(f'parse_date (default_timezone=local) : {parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())}')
print(f'parse_date (default_timezone=local) : {parse_date(datestring_now[0:-1], default_timezone="local")}')
print(f'parse_date (default_timezone=Paris) : {parse_date(datestring_now[0:-1], default_timezone="Europe/Paris")}')
print(f'parse_date (to_timezone=utc) : {parse_date(datestring_now, to_timezone=pytz.utc)}')
print(f'parse_date (to_timezone=local) : {parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal())}')
print(f'parse_date (to_timezone=local) : {parse_date(datestring_now, to_timezone="local")}')
print(f'parse_date (to_timezone=Tokyo) : {parse_date(datestring_now, to_timezone="Asia/Tokyo")}')
print(f'parse_date (naive=True) : {parse_date(datestring_now, naive=True)}')

View File

@ -27,7 +27,7 @@ def main(argv=None): # pylint: disable=too-many-locals,too-many-statements
action="store",
type=int,
dest="count",
help="Progress bar max value (default: %s)" % default_max_val,
help=f'Progress bar max value (default: {default_max_val})',
default=default_max_val
)

View File

@ -1,4 +1,5 @@
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access,global-statement
# pylint: disable=global-variable-not-assigned
""" Tests on config lib """
import logging

View File

@ -21,10 +21,12 @@ class FakeCXOracleCursor:
def execute(self, sql, **params):
assert self.opened
if self.expected_exception:
raise cx_Oracle.Error("%s.execute(%s, %s): expected exception" % (self, sql, params))
raise cx_Oracle.Error(f'{self}.execute({sql}, {params}): expected exception')
if self.expected_just_try and not sql.lower().startswith('select '):
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
assert False, f'{self}.execute({sql}, {params}) may not be executed in just try mode'
# pylint: disable=consider-using-f-string
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
# pylint: disable=consider-using-f-string
assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params)
return self.expected_return
@ -40,9 +42,9 @@ class FakeCXOracleCursor:
self.opened = False
def __repr__(self):
return "FakeCXOracleCursor(%s, %s, %s, %s)" % (
self.expected_sql, self.expected_params,
self.expected_return, self.expected_just_try
return (
f'FakeCXOracleCursor({self.expected_sql}, {self.expected_params}, '
f'{self.expected_return}, {self.expected_just_try})'
)
@ -59,8 +61,8 @@ class FakeCXOracle:
def __init__(self, **kwargs):
allowed_kwargs = dict(dsn=str, user=str, password=(str, None))
for arg, value in kwargs.items():
assert arg in allowed_kwargs, "Invalid arg %s='%s'" % (arg, value)
assert isinstance(value, allowed_kwargs[arg]), "Arg %s not a %s (%s)" % (arg, allowed_kwargs[arg], type(value))
assert arg in allowed_kwargs, f"Invalid arg {arg}='{value}'"
assert isinstance(value, allowed_kwargs[arg]), f"Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})"
setattr(self, arg, value)
def close(self):
@ -127,7 +129,9 @@ def fake_connected_just_try_oracledb(fake_just_try_oracledb):
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
def mock_args(*args, **kwargs):
# pylint: disable=consider-using-f-string
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
# pylint: disable=consider-using-f-string
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
return expected_return
return mock_args
@ -139,7 +143,9 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
# pylint: disable=consider-using-f-string
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
# pylint: disable=consider-using-f-string
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
return expected_return
return mock_doSQL

View File

@ -19,10 +19,12 @@ class FakePsycopg2Cursor:
def execute(self, sql, params=None):
if self.expected_exception:
raise psycopg2.Error("%s.execute(%s, %s): expected exception" % (self, sql, params))
raise psycopg2.Error(f'{self}.execute({sql}, {params}): expected exception')
if self.expected_just_try and not sql.lower().startswith('select '):
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
assert False, f'{self}.execute({sql}, {params}) may not be executed in just try mode'
# pylint: disable=consider-using-f-string
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
# pylint: disable=consider-using-f-string
assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params)
return self.expected_return
@ -30,9 +32,9 @@ class FakePsycopg2Cursor:
return self.expected_return
def __repr__(self):
return "FakePsycopg2Cursor(%s, %s, %s, %s)" % (
self.expected_sql, self.expected_params,
self.expected_return, self.expected_just_try
return (
f'FakePsycopg2Cursor({self.expected_sql}, {self.expected_params}, '
f'{self.expected_return}, {self.expected_just_try})'
)
@ -49,8 +51,9 @@ class FakePsycopg2:
def __init__(self, **kwargs):
allowed_kwargs = dict(dbname=str, user=str, password=(str, None), host=str)
for arg, value in kwargs.items():
assert arg in allowed_kwargs, "Invalid arg %s='%s'" % (arg, value)
assert isinstance(value, allowed_kwargs[arg]), "Arg %s not a %s (%s)" % (arg, allowed_kwargs[arg], type(value))
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
assert isinstance(value, allowed_kwargs[arg]), \
f'Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})'
setattr(self, arg, value)
def close(self):
@ -60,7 +63,7 @@ class FakePsycopg2:
self._check_just_try()
assert len(arg) == 1 and isinstance(arg[0], str)
if self.expected_exception:
raise psycopg2.Error("set_client_encoding(%s): Expected exception" % arg[0])
raise psycopg2.Error(f'set_client_encoding({arg[0]}): Expected exception')
return self.expected_return
def cursor(self):
@ -124,7 +127,9 @@ def fake_connected_just_try_pgdb(fake_just_try_pgdb):
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
def mock_args(*args, **kwargs):
# pylint: disable=consider-using-f-string
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
# pylint: disable=consider-using-f-string
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
return expected_return
return mock_args
@ -136,7 +141,9 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
# pylint: disable=consider-using-f-string
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
# pylint: disable=consider-using-f-string
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
return expected_return
return mock_doSQL