Compare commits
No commits in common. "01d71fef896f6919c442b2954954e3a36ccde51c" and "eb183b0d3b0af1fac2a62a27db39551d0e71cd6a" have entirely different histories.
01d71fef89
...
eb183b0d3b
13 changed files with 156 additions and 178 deletions
|
@ -3,10 +3,4 @@ disable=invalid-name,
|
||||||
locally-disabled,
|
locally-disabled,
|
||||||
too-many-arguments,
|
too-many-arguments,
|
||||||
too-many-branches,
|
too-many-branches,
|
||||||
too-many-locals,
|
|
||||||
too-many-return-statements,
|
|
||||||
too-many-nested-blocks,
|
|
||||||
too-many-instance-attributes,
|
|
||||||
too-many-lines,
|
|
||||||
line-too-long,
|
line-too-long,
|
||||||
duplicate-code,
|
|
||||||
|
|
|
@ -27,12 +27,6 @@ apt install libmariadb-dev
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
### Using pip
|
|
||||||
|
|
||||||
Just run `pip install git+https://gitea.zionetrix.net/bn8/python-mylib.git`
|
|
||||||
|
|
||||||
### From source
|
|
||||||
|
|
||||||
Just run `python setup.py install`
|
Just run `python setup.py install`
|
||||||
|
|
||||||
**Note:** This project could previously use as independent python files (not as module). This old version is keep in *legacy* git branch (not maintained).
|
**Note:** This project could previously use as independent python files (not as module). This old version is keep in *legacy* git branch (not maintained).
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
""" Some really common helper functions """
|
""" Some really common helper functions """
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -6,8 +8,7 @@
|
||||||
|
|
||||||
|
|
||||||
def increment_prefix(prefix):
|
def increment_prefix(prefix):
|
||||||
""" Increment the given prefix with two spaces """
|
return "%s " % prefix if prefix else " "
|
||||||
return f'{prefix if prefix else " "} '
|
|
||||||
|
|
||||||
|
|
||||||
def pretty_format_value(value, encoding='utf8', prefix=None):
|
def pretty_format_value(value, encoding='utf8', prefix=None):
|
||||||
|
@ -17,12 +18,12 @@ def pretty_format_value(value, encoding='utf8', prefix=None):
|
||||||
if isinstance(value, list):
|
if isinstance(value, list):
|
||||||
return pretty_format_list(value, encoding=encoding, prefix=prefix)
|
return pretty_format_list(value, encoding=encoding, prefix=prefix)
|
||||||
if isinstance(value, bytes):
|
if isinstance(value, bytes):
|
||||||
return f"'{value.decode(encoding, errors='replace')}'"
|
return "'%s'" % value.decode(encoding, errors='replace')
|
||||||
if isinstance(value, str):
|
if isinstance(value, str):
|
||||||
return f"'{value}'"
|
return "'%s'" % value
|
||||||
if value is None:
|
if value is None:
|
||||||
return "None"
|
return "None"
|
||||||
return f'{value} ({type(value)})'
|
return "%s (%s)" % (str(value), type(value))
|
||||||
|
|
||||||
|
|
||||||
def pretty_format_value_in_list(value, encoding='utf8', prefix=None):
|
def pretty_format_value_in_list(value, encoding='utf8', prefix=None):
|
||||||
|
@ -49,11 +50,13 @@ def pretty_format_dict(value, encoding='utf8', prefix=None):
|
||||||
result = []
|
result = []
|
||||||
for key in sorted(value.keys()):
|
for key in sorted(value.keys()):
|
||||||
result.append(
|
result.append(
|
||||||
f'{prefix}- {key} : ' +
|
"%s- %s : %s" % (
|
||||||
pretty_format_value_in_list(
|
prefix, key,
|
||||||
value[key],
|
pretty_format_value_in_list(
|
||||||
encoding=encoding,
|
value[key],
|
||||||
prefix=prefix
|
encoding=encoding,
|
||||||
|
prefix=prefix
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return "\n".join(result)
|
return "\n".join(result)
|
||||||
|
@ -65,11 +68,13 @@ def pretty_format_list(row, encoding='utf8', prefix=None):
|
||||||
result = []
|
result = []
|
||||||
for idx, values in enumerate(row):
|
for idx, values in enumerate(row):
|
||||||
result.append(
|
result.append(
|
||||||
f'{prefix}- #{idx} : ' +
|
"%s- #%s : %s" % (
|
||||||
pretty_format_value_in_list(
|
prefix, idx,
|
||||||
values,
|
pretty_format_value_in_list(
|
||||||
encoding=encoding,
|
values,
|
||||||
prefix=prefix
|
encoding=encoding,
|
||||||
|
prefix=prefix
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return "\n".join(result)
|
return "\n".join(result)
|
||||||
|
|
|
@ -46,16 +46,15 @@ class EmailClient(ConfigurableObject): # pylint: disable=useless-object-inherit
|
||||||
'just_try': False,
|
'just_try': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
templates = {}
|
templates = dict()
|
||||||
|
|
||||||
def __init__(self, templates=None, **kwargs):
|
def __init__(self, templates=None, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
assert templates is None or isinstance(templates, dict)
|
assert templates is None or isinstance(templates, dict)
|
||||||
self.templates = templates if templates else {}
|
self.templates = templates if templates else dict()
|
||||||
|
|
||||||
# pylint: disable=arguments-differ,arguments-renamed
|
def configure(self, use_smtp=True, just_try=True, ** kwargs): # pylint: disable=arguments-differ
|
||||||
def configure(self, use_smtp=True, just_try=True, ** kwargs):
|
|
||||||
""" Configure options on registered mylib.Config object """
|
""" Configure options on registered mylib.Config object """
|
||||||
section = super().configure(**kwargs)
|
section = super().configure(**kwargs)
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import logging
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import MySQLdb
|
import MySQLdb
|
||||||
from MySQLdb._exceptions import Error
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -33,7 +32,7 @@ class MyDB:
|
||||||
try:
|
try:
|
||||||
con = MySQLdb.connect(self.host, self.user, self.pwd, self.db)
|
con = MySQLdb.connect(self.host, self.user, self.pwd, self.db)
|
||||||
self.con = con
|
self.con = con
|
||||||
except Error:
|
except Exception:
|
||||||
log.fatal('Error connecting to MySQL server', exc_info=True)
|
log.fatal('Error connecting to MySQL server', exc_info=True)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
@ -44,7 +43,7 @@ class MyDB:
|
||||||
cursor.execute(sql)
|
cursor.execute(sql)
|
||||||
self.con.commit()
|
self.con.commit()
|
||||||
return True
|
return True
|
||||||
except Error:
|
except Exception:
|
||||||
log.error('Error during SQL request "%s"', sql, exc_info=True)
|
log.error('Error during SQL request "%s"', sql, exc_info=True)
|
||||||
self.con.rollback()
|
self.con.rollback()
|
||||||
return False
|
return False
|
||||||
|
@ -55,6 +54,6 @@ class MyDB:
|
||||||
try:
|
try:
|
||||||
cursor.execute(sql)
|
cursor.execute(sql)
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
except Error:
|
except Exception:
|
||||||
log.error('Error during SQL request "%s"', sql, exc_info=True)
|
log.error('Error during SQL request "%s"', sql, exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -16,7 +16,6 @@ time_pattern = re.compile('^([0-9]{1,2})h([0-9]{2})?$')
|
||||||
|
|
||||||
|
|
||||||
def easter_date(year):
|
def easter_date(year):
|
||||||
""" Compute easter date for the specified year """
|
|
||||||
a = year // 100
|
a = year // 100
|
||||||
b = year % 100
|
b = year % 100
|
||||||
c = (3 * (a + 25)) // 4
|
c = (3 * (a + 25)) // 4
|
||||||
|
@ -36,7 +35,6 @@ def easter_date(year):
|
||||||
|
|
||||||
|
|
||||||
def nonworking_french_public_days_of_the_year(year=None):
|
def nonworking_french_public_days_of_the_year(year=None):
|
||||||
""" Compute dict of nonworking french public days for the specified year """
|
|
||||||
if year is None:
|
if year is None:
|
||||||
year = datetime.date.today().year
|
year = datetime.date.today().year
|
||||||
dp = easter_date(year)
|
dp = easter_date(year)
|
||||||
|
@ -59,7 +57,6 @@ def nonworking_french_public_days_of_the_year(year=None):
|
||||||
|
|
||||||
|
|
||||||
def parse_exceptional_closures(values):
|
def parse_exceptional_closures(values):
|
||||||
""" Parse exceptional closures values """
|
|
||||||
exceptional_closures = []
|
exceptional_closures = []
|
||||||
for value in values:
|
for value in values:
|
||||||
days = []
|
days = []
|
||||||
|
@ -110,7 +107,6 @@ def parse_exceptional_closures(values):
|
||||||
|
|
||||||
|
|
||||||
def parse_normal_opening_hours(values):
|
def parse_normal_opening_hours(values):
|
||||||
""" Parse normal opening hours """
|
|
||||||
normal_opening_hours = []
|
normal_opening_hours = []
|
||||||
for value in values:
|
for value in values:
|
||||||
days = []
|
days = []
|
||||||
|
@ -165,7 +161,6 @@ def is_closed(
|
||||||
nonworking_public_holidays_values=None, exceptional_closure_on_nonworking_public_days=False,
|
nonworking_public_holidays_values=None, exceptional_closure_on_nonworking_public_days=False,
|
||||||
when=None, on_error='raise'
|
when=None, on_error='raise'
|
||||||
):
|
):
|
||||||
""" Check if closed """
|
|
||||||
if not when:
|
if not when:
|
||||||
when = datetime.datetime.now()
|
when = datetime.datetime.now()
|
||||||
when_date = when.date()
|
when_date = when.date()
|
||||||
|
|
108
mylib/oracle.py
108
mylib/oracle.py
|
@ -94,7 +94,7 @@ class OracleDB:
|
||||||
password=self._pwd,
|
password=self._pwd,
|
||||||
dsn=self._dsn
|
dsn=self._dsn
|
||||||
)
|
)
|
||||||
except cx_Oracle.Error as err:
|
except Exception as err:
|
||||||
log.fatal(
|
log.fatal(
|
||||||
'An error occured during Oracle database connection (%s@%s).',
|
'An error occured during Oracle database connection (%s@%s).',
|
||||||
self._user, self._dsn, exc_info=1
|
self._user, self._dsn, exc_info=1
|
||||||
|
@ -111,32 +111,6 @@ class OracleDB:
|
||||||
self._conn.close()
|
self._conn.close()
|
||||||
self._conn = None
|
self._conn = None
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _log_query(sql, params):
|
|
||||||
log.debug(
|
|
||||||
'Run SQL query "%s" %s',
|
|
||||||
sql,
|
|
||||||
"with params = {0}".format( # pylint: disable=consider-using-f-string
|
|
||||||
', '.join([
|
|
||||||
f'{key} = {value}'
|
|
||||||
for key, value in params.items()
|
|
||||||
]) if params else "without params"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _log_query_exception(sql, params):
|
|
||||||
log.exception(
|
|
||||||
'Error during SQL query "%s" %s',
|
|
||||||
sql,
|
|
||||||
"with params = {0}".format( # pylint: disable=consider-using-f-string
|
|
||||||
', '.join([
|
|
||||||
f'{key} = {value}'
|
|
||||||
for key, value in params.items()
|
|
||||||
]) if params else "without params"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def doSQL(self, sql, params=None):
|
def doSQL(self, sql, params=None):
|
||||||
"""
|
"""
|
||||||
Run SQL query and commit changes (rollback on error)
|
Run SQL query and commit changes (rollback on error)
|
||||||
|
@ -152,7 +126,14 @@ class OracleDB:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._log_query(sql, params)
|
log.debug(
|
||||||
|
'Run SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params"
|
||||||
|
)
|
||||||
with self._conn.cursor() as cursor:
|
with self._conn.cursor() as cursor:
|
||||||
if isinstance(params, dict):
|
if isinstance(params, dict):
|
||||||
cursor.execute(sql, **params)
|
cursor.execute(sql, **params)
|
||||||
|
@ -160,8 +141,16 @@ class OracleDB:
|
||||||
cursor.execute(sql)
|
cursor.execute(sql)
|
||||||
self._conn.commit()
|
self._conn.commit()
|
||||||
return True
|
return True
|
||||||
except cx_Oracle.Error:
|
except Exception:
|
||||||
self._log_query_exception(sql, params)
|
log.error(
|
||||||
|
'Error during SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params",
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
self._conn.rollback()
|
self._conn.rollback()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -176,7 +165,14 @@ class OracleDB:
|
||||||
:rtype: list, bool
|
:rtype: list, bool
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self._log_query(sql, params)
|
log.debug(
|
||||||
|
'Run SQL SELECT query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params"
|
||||||
|
)
|
||||||
with self._conn.cursor() as cursor:
|
with self._conn.cursor() as cursor:
|
||||||
if isinstance(params, dict):
|
if isinstance(params, dict):
|
||||||
cursor.execute(sql, **params)
|
cursor.execute(sql, **params)
|
||||||
|
@ -187,8 +183,16 @@ class OracleDB:
|
||||||
)
|
)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
return results
|
return results
|
||||||
except cx_Oracle.Error:
|
except Exception:
|
||||||
self._log_query_exception(sql, params)
|
log.error(
|
||||||
|
'Error during SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params",
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -197,8 +201,7 @@ class OracleDB:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format_param(param):
|
def format_param(param):
|
||||||
""" Format SQL query parameter for prepared query """
|
return ':{0}'.format(param)
|
||||||
return f':{param}'
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _combine_params(cls, params, to_add=None, **kwargs):
|
def _combine_params(cls, params, to_add=None, **kwargs):
|
||||||
|
@ -229,7 +232,7 @@ class OracleDB:
|
||||||
:rtype: string, bool
|
:rtype: string, bool
|
||||||
"""
|
"""
|
||||||
if params is None:
|
if params is None:
|
||||||
params = {}
|
params = dict()
|
||||||
if where_op is None:
|
if where_op is None:
|
||||||
where_op = 'AND'
|
where_op = 'AND'
|
||||||
|
|
||||||
|
@ -246,7 +249,7 @@ class OracleDB:
|
||||||
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
||||||
sql_where_clauses.append(sql2)
|
sql_where_clauses.append(sql2)
|
||||||
return (
|
return (
|
||||||
f' {where_op} '.join(sql_where_clauses),
|
(" %s " % where_op).join(sql_where_clauses),
|
||||||
params
|
params
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -257,14 +260,14 @@ class OracleDB:
|
||||||
if field in params:
|
if field in params:
|
||||||
idx = 1
|
idx = 1
|
||||||
while param in params:
|
while param in params:
|
||||||
param = f'{field}_{idx}'
|
param = '%s_%d' % (field, idx)
|
||||||
idx += 1
|
idx += 1
|
||||||
cls._combine_params(params, {param: value})
|
cls._combine_params(params, {param: value})
|
||||||
sql_where_clauses.append(
|
sql_where_clauses.append(
|
||||||
f'"{field}" = {cls.format_param(param)}'
|
'"{field}" = {param}'.format(field=field, param=cls.format_param(param))
|
||||||
)
|
)
|
||||||
return (
|
return (
|
||||||
f' {where_op} '.join(sql_where_clauses),
|
(" %s " % where_op).join(sql_where_clauses),
|
||||||
params
|
params
|
||||||
)
|
)
|
||||||
raise OracleDBUnsupportedWHEREClauses(where_clauses)
|
raise OracleDBUnsupportedWHEREClauses(where_clauses)
|
||||||
|
@ -290,7 +293,7 @@ class OracleDB:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _quote_table_name(table):
|
def _quote_table_name(table):
|
||||||
""" Quote table name """
|
""" Quote table name """
|
||||||
return '"{0}"'.format( # pylint: disable=consider-using-f-string
|
return '"{0}"'.format(
|
||||||
'"."'.join(
|
'"."'.join(
|
||||||
table.split('.')
|
table.split('.')
|
||||||
)
|
)
|
||||||
|
@ -298,7 +301,6 @@ class OracleDB:
|
||||||
|
|
||||||
def insert(self, table, values, just_try=False):
|
def insert(self, table, values, just_try=False):
|
||||||
""" Run INSERT SQL query """
|
""" Run INSERT SQL query """
|
||||||
# pylint: disable=consider-using-f-string
|
|
||||||
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
||||||
self._quote_table_name(table),
|
self._quote_table_name(table),
|
||||||
'", "'.join(values.keys()),
|
'", "'.join(values.keys()),
|
||||||
|
@ -320,11 +322,10 @@ class OracleDB:
|
||||||
|
|
||||||
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
||||||
""" Run UPDATE SQL query """
|
""" Run UPDATE SQL query """
|
||||||
# pylint: disable=consider-using-f-string
|
|
||||||
sql = 'UPDATE {0} SET {1}'.format(
|
sql = 'UPDATE {0} SET {1}'.format(
|
||||||
self._quote_table_name(table),
|
self._quote_table_name(table),
|
||||||
", ".join([
|
", ".join([
|
||||||
f'"{key}" = {self.format_param(key)}'
|
'"{0}" = {1}'.format(key, self.format_param(key))
|
||||||
for key in values
|
for key in values
|
||||||
])
|
])
|
||||||
)
|
)
|
||||||
|
@ -348,8 +349,8 @@ class OracleDB:
|
||||||
|
|
||||||
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
||||||
""" Run DELETE SQL query """
|
""" Run DELETE SQL query """
|
||||||
sql = f'DELETE FROM {self._quote_table_name(table)}'
|
sql = 'DELETE FROM {0}'.format(self._quote_table_name(table))
|
||||||
params = {}
|
params = dict()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||||
|
@ -369,7 +370,8 @@ class OracleDB:
|
||||||
|
|
||||||
def truncate(self, table, just_try=False):
|
def truncate(self, table, just_try=False):
|
||||||
""" Run TRUNCATE SQL query """
|
""" Run TRUNCATE SQL query """
|
||||||
sql = f'TRUNCATE TABLE {self._quote_table_name(table)}'
|
|
||||||
|
sql = 'TRUNCATE TABLE {0}'.format(self._quote_table_name(table))
|
||||||
|
|
||||||
if just_try:
|
if just_try:
|
||||||
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
||||||
|
@ -387,12 +389,12 @@ class OracleDB:
|
||||||
if fields is None:
|
if fields is None:
|
||||||
sql += "*"
|
sql += "*"
|
||||||
elif isinstance(fields, str):
|
elif isinstance(fields, str):
|
||||||
sql += f'"{fields}"'
|
sql += '"{0}"'.format(fields)
|
||||||
else:
|
else:
|
||||||
sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string
|
sql += '"{0}"'.format('", "'.join(fields))
|
||||||
|
|
||||||
sql += f' FROM {self._quote_table_name(table)}'
|
sql += ' FROM {0}'.format(self._quote_table_name(table))
|
||||||
params = {}
|
params = dict()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||||
|
@ -402,14 +404,14 @@ class OracleDB:
|
||||||
|
|
||||||
if order_by:
|
if order_by:
|
||||||
if isinstance(order_by, str):
|
if isinstance(order_by, str):
|
||||||
sql += f' ORDER BY {order_by}'
|
sql += ' ORDER BY {0}'.format(order_by)
|
||||||
elif (
|
elif (
|
||||||
isinstance(order_by, (list, tuple)) and len(order_by) == 2
|
isinstance(order_by, (list, tuple)) and len(order_by) == 2
|
||||||
and isinstance(order_by[0], str)
|
and isinstance(order_by[0], str)
|
||||||
and isinstance(order_by[1], str)
|
and isinstance(order_by[1], str)
|
||||||
and order_by[1].upper() in ('ASC', 'UPPER')
|
and order_by[1].upper() in ('ASC', 'UPPER')
|
||||||
):
|
):
|
||||||
sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}'
|
sql += ' ORDER BY "{0}" {1}'.format(order_by[0], order_by[1].upper())
|
||||||
else:
|
else:
|
||||||
raise OracleDBInvalidOrderByClause(order_by)
|
raise OracleDBInvalidOrderByClause(order_by)
|
||||||
|
|
||||||
|
|
120
mylib/pgsql.py
120
mylib/pgsql.py
|
@ -102,7 +102,7 @@ class PgDB:
|
||||||
host=self._host,
|
host=self._host,
|
||||||
password=self._pwd
|
password=self._pwd
|
||||||
)
|
)
|
||||||
except psycopg2.Error as err:
|
except Exception as err:
|
||||||
log.fatal(
|
log.fatal(
|
||||||
'An error occured during Postgresql database connection (%s@%s, database=%s).',
|
'An error occured during Postgresql database connection (%s@%s, database=%s).',
|
||||||
self._user, self._host, self._db, exc_info=1
|
self._user, self._host, self._db, exc_info=1
|
||||||
|
@ -125,39 +125,13 @@ class PgDB:
|
||||||
try:
|
try:
|
||||||
self._conn.set_client_encoding(enc)
|
self._conn.set_client_encoding(enc)
|
||||||
return True
|
return True
|
||||||
except psycopg2.Error:
|
except Exception:
|
||||||
log.error(
|
log.error(
|
||||||
'An error occured setting Postgresql database connection encoding to "%s"',
|
'An error occured setting Postgresql database connection encoding to "%s"',
|
||||||
enc, exc_info=1
|
enc, exc_info=1
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _log_query(sql, params):
|
|
||||||
log.debug(
|
|
||||||
'Run SQL query "%s" %s',
|
|
||||||
sql,
|
|
||||||
"with params = {0}".format( # pylint: disable=consider-using-f-string
|
|
||||||
', '.join([
|
|
||||||
f'{key} = {value}'
|
|
||||||
for key, value in params.items()
|
|
||||||
]) if params else "without params"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _log_query_exception(sql, params):
|
|
||||||
log.exception(
|
|
||||||
'Error during SQL query "%s" %s',
|
|
||||||
sql,
|
|
||||||
"with params = {0}".format( # pylint: disable=consider-using-f-string
|
|
||||||
', '.join([
|
|
||||||
f'{key} = {value}'
|
|
||||||
for key, value in params.items()
|
|
||||||
]) if params else "without params"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def doSQL(self, sql, params=None):
|
def doSQL(self, sql, params=None):
|
||||||
"""
|
"""
|
||||||
Run SQL query and commit changes (rollback on error)
|
Run SQL query and commit changes (rollback on error)
|
||||||
|
@ -174,15 +148,30 @@ class PgDB:
|
||||||
|
|
||||||
cursor = self._conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
try:
|
try:
|
||||||
self._log_query(sql, params)
|
log.debug(
|
||||||
|
'Run SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params"
|
||||||
|
)
|
||||||
if params is None:
|
if params is None:
|
||||||
cursor.execute(sql)
|
cursor.execute(sql)
|
||||||
else:
|
else:
|
||||||
cursor.execute(sql, params)
|
cursor.execute(sql, params)
|
||||||
self._conn.commit()
|
self._conn.commit()
|
||||||
return True
|
return True
|
||||||
except psycopg2.Error:
|
except Exception:
|
||||||
self._log_query_exception(sql, params)
|
log.error(
|
||||||
|
'Error during SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params",
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
self._conn.rollback()
|
self._conn.rollback()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -198,29 +187,36 @@ class PgDB:
|
||||||
"""
|
"""
|
||||||
cursor = self._conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
try:
|
try:
|
||||||
self._log_query(sql, params)
|
log.debug(
|
||||||
|
'Run SQL SELECT query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params"
|
||||||
|
)
|
||||||
cursor.execute(sql, params)
|
cursor.execute(sql, params)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
return results
|
return results
|
||||||
except psycopg2.Error:
|
except Exception:
|
||||||
self._log_query_exception(sql, params)
|
log.error(
|
||||||
|
'Error during SQL query "%s" %s',
|
||||||
|
sql,
|
||||||
|
"with params = %s" % ', '.join([
|
||||||
|
"%s = %s" % (key, value)
|
||||||
|
for key, value in params.items()
|
||||||
|
]) if params else "without params",
|
||||||
|
exc_info=True
|
||||||
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _map_row_fields_by_index(fields, row):
|
|
||||||
return dict(
|
|
||||||
(field, row[idx])
|
|
||||||
for idx, field in enumerate(fields)
|
|
||||||
)
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# SQL helpers
|
# SQL helpers
|
||||||
#
|
#
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format_param(param):
|
def format_param(param):
|
||||||
""" Format SQL query parameter for prepared query """
|
return '%({0})s'.format(param)
|
||||||
return f'%({param})s'
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _combine_params(cls, params, to_add=None, **kwargs):
|
def _combine_params(cls, params, to_add=None, **kwargs):
|
||||||
|
@ -251,7 +247,7 @@ class PgDB:
|
||||||
:rtype: string, bool
|
:rtype: string, bool
|
||||||
"""
|
"""
|
||||||
if params is None:
|
if params is None:
|
||||||
params = {}
|
params = dict()
|
||||||
if where_op is None:
|
if where_op is None:
|
||||||
where_op = 'AND'
|
where_op = 'AND'
|
||||||
|
|
||||||
|
@ -268,7 +264,7 @@ class PgDB:
|
||||||
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
||||||
sql_where_clauses.append(sql2)
|
sql_where_clauses.append(sql2)
|
||||||
return (
|
return (
|
||||||
f' {where_op} '.join(sql_where_clauses),
|
(" %s " % where_op).join(sql_where_clauses),
|
||||||
params
|
params
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -279,14 +275,14 @@ class PgDB:
|
||||||
if field in params:
|
if field in params:
|
||||||
idx = 1
|
idx = 1
|
||||||
while param in params:
|
while param in params:
|
||||||
param = f'{field}_{idx}'
|
param = '%s_%d' % (field, idx)
|
||||||
idx += 1
|
idx += 1
|
||||||
cls._combine_params(params, {param: value})
|
cls._combine_params(params, {param: value})
|
||||||
sql_where_clauses.append(
|
sql_where_clauses.append(
|
||||||
f'"{field}" = {cls.format_param(param)}'
|
'"{field}" = {param}'.format(field=field, param=cls.format_param(param))
|
||||||
)
|
)
|
||||||
return (
|
return (
|
||||||
f' {where_op} '.join(sql_where_clauses),
|
(" %s " % where_op).join(sql_where_clauses),
|
||||||
params
|
params
|
||||||
)
|
)
|
||||||
raise PgDBUnsupportedWHEREClauses(where_clauses)
|
raise PgDBUnsupportedWHEREClauses(where_clauses)
|
||||||
|
@ -312,7 +308,7 @@ class PgDB:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _quote_table_name(table):
|
def _quote_table_name(table):
|
||||||
""" Quote table name """
|
""" Quote table name """
|
||||||
return '"{0}"'.format( # pylint: disable=consider-using-f-string
|
return '"{0}"'.format(
|
||||||
'"."'.join(
|
'"."'.join(
|
||||||
table.split('.')
|
table.split('.')
|
||||||
)
|
)
|
||||||
|
@ -320,7 +316,6 @@ class PgDB:
|
||||||
|
|
||||||
def insert(self, table, values, just_try=False):
|
def insert(self, table, values, just_try=False):
|
||||||
""" Run INSERT SQL query """
|
""" Run INSERT SQL query """
|
||||||
# pylint: disable=consider-using-f-string
|
|
||||||
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
||||||
self._quote_table_name(table),
|
self._quote_table_name(table),
|
||||||
'", "'.join(values.keys()),
|
'", "'.join(values.keys()),
|
||||||
|
@ -342,11 +337,10 @@ class PgDB:
|
||||||
|
|
||||||
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
||||||
""" Run UPDATE SQL query """
|
""" Run UPDATE SQL query """
|
||||||
# pylint: disable=consider-using-f-string
|
|
||||||
sql = 'UPDATE {0} SET {1}'.format(
|
sql = 'UPDATE {0} SET {1}'.format(
|
||||||
self._quote_table_name(table),
|
self._quote_table_name(table),
|
||||||
", ".join([
|
", ".join([
|
||||||
f'"{key}" = {self.format_param(key)}'
|
'"{0}" = {1}'.format(key, self.format_param(key))
|
||||||
for key in values
|
for key in values
|
||||||
])
|
])
|
||||||
)
|
)
|
||||||
|
@ -370,8 +364,8 @@ class PgDB:
|
||||||
|
|
||||||
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
||||||
""" Run DELETE SQL query """
|
""" Run DELETE SQL query """
|
||||||
sql = f'DELETE FROM {self._quote_table_name(table)}'
|
sql = 'DELETE FROM {0}'.format(self._quote_table_name(table))
|
||||||
params = {}
|
params = dict()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||||
|
@ -391,7 +385,8 @@ class PgDB:
|
||||||
|
|
||||||
def truncate(self, table, just_try=False):
|
def truncate(self, table, just_try=False):
|
||||||
""" Run TRUNCATE SQL query """
|
""" Run TRUNCATE SQL query """
|
||||||
sql = f'TRUNCATE TABLE {self._quote_table_name(table)}'
|
|
||||||
|
sql = 'TRUNCATE {0}'.format(self._quote_table_name(table))
|
||||||
|
|
||||||
if just_try:
|
if just_try:
|
||||||
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
||||||
|
@ -409,12 +404,12 @@ class PgDB:
|
||||||
if fields is None:
|
if fields is None:
|
||||||
sql += "*"
|
sql += "*"
|
||||||
elif isinstance(fields, str):
|
elif isinstance(fields, str):
|
||||||
sql += f'"{fields}"'
|
sql += '"{0}"'.format(fields)
|
||||||
else:
|
else:
|
||||||
sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string
|
sql += '"{0}"'.format('", "'.join(fields))
|
||||||
|
|
||||||
sql += f' FROM {self._quote_table_name(table)}'
|
sql += ' FROM {0}'.format(self._quote_table_name(table))
|
||||||
params = {}
|
params = dict()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||||
|
@ -424,14 +419,14 @@ class PgDB:
|
||||||
|
|
||||||
if order_by:
|
if order_by:
|
||||||
if isinstance(order_by, str):
|
if isinstance(order_by, str):
|
||||||
sql += f' ORDER BY {order_by}'
|
sql += ' ORDER BY {0}'.format(order_by)
|
||||||
elif (
|
elif (
|
||||||
isinstance(order_by, (list, tuple)) and len(order_by) == 2
|
isinstance(order_by, (list, tuple)) and len(order_by) == 2
|
||||||
and isinstance(order_by[0], str)
|
and isinstance(order_by[0], str)
|
||||||
and isinstance(order_by[1], str)
|
and isinstance(order_by[1], str)
|
||||||
and order_by[1].upper() in ('ASC', 'UPPER')
|
and order_by[1].upper() in ('ASC', 'UPPER')
|
||||||
):
|
):
|
||||||
sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}'
|
sql += ' ORDER BY "{0}" {1}'.format(order_by[0], order_by[1].upper())
|
||||||
else:
|
else:
|
||||||
raise PgDBInvalidOrderByClause(order_by)
|
raise PgDBInvalidOrderByClause(order_by)
|
||||||
|
|
||||||
|
@ -459,8 +454,7 @@ class PgDB:
|
||||||
elif isinstance(value, datetime.date):
|
elif isinstance(value, datetime.date):
|
||||||
value = cls._format_date(value)
|
value = cls._format_date(value)
|
||||||
|
|
||||||
# pylint: disable=consider-using-f-string
|
return "'%s'" % value.replace("'", "''")
|
||||||
return "'{0}'".format(value.replace("'", "''"))
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _format_datetime(cls, value):
|
def _format_datetime(cls, value):
|
||||||
|
|
|
@ -39,8 +39,7 @@ class SFTPClient(ConfigurableObject):
|
||||||
sftp_client = None
|
sftp_client = None
|
||||||
initial_directory = None
|
initial_directory = None
|
||||||
|
|
||||||
# pylint: disable=arguments-differ,arguments-renamed
|
def configure(self, just_try=True, ** kwargs): # pylint: disable=arguments-differ
|
||||||
def configure(self, just_try=True, **kwargs):
|
|
||||||
""" Configure options on registered mylib.Config object """
|
""" Configure options on registered mylib.Config object """
|
||||||
section = super().configure(**kwargs)
|
section = super().configure(**kwargs)
|
||||||
|
|
||||||
|
@ -68,7 +67,7 @@ class SFTPClient(ConfigurableObject):
|
||||||
if just_try:
|
if just_try:
|
||||||
section.add_option(
|
section.add_option(
|
||||||
BooleanOption, 'just_try', default=self._defaults['just_try'],
|
BooleanOption, 'just_try', default=self._defaults['just_try'],
|
||||||
comment='Just-try mode: do not really make change on remote SFTP host')
|
comment='Just-try mode: do not really send emails')
|
||||||
|
|
||||||
return section
|
return section
|
||||||
|
|
||||||
|
|
|
@ -42,8 +42,7 @@ class TelltaleFile:
|
||||||
try:
|
try:
|
||||||
os.utime(self.filepath, None)
|
os.utime(self.filepath, None)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
# pylint: disable=consider-using-with
|
open(self.filepath, 'a').close()
|
||||||
open(self.filepath, 'a', encoding="utf-8").close()
|
|
||||||
|
|
||||||
def remove(self):
|
def remove(self):
|
||||||
""" Remove the telltale file """
|
""" Remove the telltale file """
|
||||||
|
|
|
@ -179,7 +179,7 @@ def test_not_isset():
|
||||||
|
|
||||||
section = config.add_section('my_section')
|
section = config.add_section('my_section')
|
||||||
assert isinstance(section, ConfigSection)
|
assert isinstance(section, ConfigSection)
|
||||||
section.add_option(StringOption, opt_name)
|
option = section.add_option(StringOption, opt_name)
|
||||||
|
|
||||||
assert not config.isset(section_name, opt_name)
|
assert not config.isset(section_name, opt_name)
|
||||||
|
|
||||||
|
@ -206,7 +206,7 @@ def test_get_default():
|
||||||
opt_name = 'my_option'
|
opt_name = 'my_option'
|
||||||
opt_default_value = 'value'
|
opt_default_value = 'value'
|
||||||
section = config.add_section('my_section')
|
section = config.add_section('my_section')
|
||||||
section.add_option(StringOption, opt_name, default=opt_default_value)
|
option = section.add_option(StringOption, opt_name, default=opt_default_value)
|
||||||
config.parse_arguments_options(argv=[], create=False)
|
config.parse_arguments_options(argv=[], create=False)
|
||||||
|
|
||||||
assert config.get(section_name, opt_name) == opt_default_value
|
assert config.get(section_name, opt_name) == opt_default_value
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
||||||
""" Tests on opening hours helpers """
|
""" Tests on opening hours helpers """
|
||||||
|
|
||||||
import cx_Oracle
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from mylib.oracle import OracleDB
|
from mylib.oracle import OracleDB
|
||||||
|
@ -21,7 +20,7 @@ class FakeCXOracleCursor:
|
||||||
def execute(self, sql, **params):
|
def execute(self, sql, **params):
|
||||||
assert self.opened
|
assert self.opened
|
||||||
if self.expected_exception:
|
if self.expected_exception:
|
||||||
raise cx_Oracle.Error("%s.execute(%s, %s): expected exception" % (self, sql, params))
|
raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params))
|
||||||
if self.expected_just_try and not sql.lower().startswith('select '):
|
if self.expected_just_try and not sql.lower().startswith('select '):
|
||||||
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
|
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
|
||||||
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
||||||
|
@ -50,7 +49,7 @@ class FakeCXOracle:
|
||||||
""" Fake cx_Oracle connection """
|
""" Fake cx_Oracle connection """
|
||||||
|
|
||||||
expected_sql = None
|
expected_sql = None
|
||||||
expected_params = {}
|
expected_params = dict()
|
||||||
expected_return = True
|
expected_return = True
|
||||||
expected_just_try = False
|
expected_just_try = False
|
||||||
expected_exception = False
|
expected_exception = False
|
||||||
|
@ -125,7 +124,7 @@ def fake_connected_just_try_oracledb(fake_just_try_oracledb):
|
||||||
return fake_just_try_oracledb
|
return fake_just_try_oracledb
|
||||||
|
|
||||||
|
|
||||||
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
|
def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||||
def mock_args(*args, **kwargs):
|
def mock_args(*args, **kwargs):
|
||||||
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
||||||
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
||||||
|
@ -137,7 +136,7 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum
|
||||||
assert False, "doSQL() may not be executed in just try mode"
|
assert False, "doSQL() may not be executed in just try mode"
|
||||||
|
|
||||||
|
|
||||||
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
|
def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||||
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
||||||
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
||||||
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
||||||
|
@ -178,7 +177,7 @@ def test_format_where_clauses_params_are_preserved():
|
||||||
|
|
||||||
|
|
||||||
def test_format_where_clauses_raw():
|
def test_format_where_clauses_raw():
|
||||||
assert OracleDB._format_where_clauses('test = test') == (('test = test'), {})
|
assert OracleDB._format_where_clauses('test = test') == (('test = test'), dict())
|
||||||
|
|
||||||
|
|
||||||
def test_format_where_clauses_tuple_clause_with_params():
|
def test_format_where_clauses_tuple_clause_with_params():
|
||||||
|
@ -241,7 +240,7 @@ def test_add_where_clauses_with_op():
|
||||||
where_clauses = ('test1=1', 'test2=2')
|
where_clauses = ('test1=1', 'test2=2')
|
||||||
assert OracleDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
assert OracleDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
||||||
sql + ' WHERE test1=1 OR test2=2',
|
sql + ' WHERE test1=1 OR test2=2',
|
||||||
{}
|
dict()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
||||||
""" Tests on opening hours helpers """
|
""" Tests on opening hours helpers """
|
||||||
|
|
||||||
import psycopg2
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from mylib.pgsql import PgDB
|
from mylib.pgsql import PgDB
|
||||||
|
@ -19,7 +18,7 @@ class FakePsycopg2Cursor:
|
||||||
|
|
||||||
def execute(self, sql, params=None):
|
def execute(self, sql, params=None):
|
||||||
if self.expected_exception:
|
if self.expected_exception:
|
||||||
raise psycopg2.Error("%s.execute(%s, %s): expected exception" % (self, sql, params))
|
raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params))
|
||||||
if self.expected_just_try and not sql.lower().startswith('select '):
|
if self.expected_just_try and not sql.lower().startswith('select '):
|
||||||
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
|
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
|
||||||
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
||||||
|
@ -60,7 +59,7 @@ class FakePsycopg2:
|
||||||
self._check_just_try()
|
self._check_just_try()
|
||||||
assert len(arg) == 1 and isinstance(arg[0], str)
|
assert len(arg) == 1 and isinstance(arg[0], str)
|
||||||
if self.expected_exception:
|
if self.expected_exception:
|
||||||
raise psycopg2.Error("set_client_encoding(%s): Expected exception" % arg[0])
|
raise Exception("set_client_encoding(%s): Expected exception" % arg[0])
|
||||||
return self.expected_return
|
return self.expected_return
|
||||||
|
|
||||||
def cursor(self):
|
def cursor(self):
|
||||||
|
@ -122,7 +121,7 @@ def fake_connected_just_try_pgdb(fake_just_try_pgdb):
|
||||||
return fake_just_try_pgdb
|
return fake_just_try_pgdb
|
||||||
|
|
||||||
|
|
||||||
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
|
def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||||
def mock_args(*args, **kwargs):
|
def mock_args(*args, **kwargs):
|
||||||
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
||||||
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
||||||
|
@ -134,7 +133,7 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum
|
||||||
assert False, "doSQL() may not be executed in just try mode"
|
assert False, "doSQL() may not be executed in just try mode"
|
||||||
|
|
||||||
|
|
||||||
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
|
def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||||
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
||||||
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
||||||
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
||||||
|
@ -175,7 +174,7 @@ def test_format_where_clauses_params_are_preserved():
|
||||||
|
|
||||||
|
|
||||||
def test_format_where_clauses_raw():
|
def test_format_where_clauses_raw():
|
||||||
assert PgDB._format_where_clauses('test = test') == (('test = test'), {})
|
assert PgDB._format_where_clauses('test = test') == (('test = test'), dict())
|
||||||
|
|
||||||
|
|
||||||
def test_format_where_clauses_tuple_clause_with_params():
|
def test_format_where_clauses_tuple_clause_with_params():
|
||||||
|
@ -238,7 +237,7 @@ def test_add_where_clauses_with_op():
|
||||||
where_clauses = ('test1=1', 'test2=2')
|
where_clauses = ('test1=1', 'test2=2')
|
||||||
assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
||||||
sql + ' WHERE test1=1 OR test2=2',
|
sql + ' WHERE test1=1 OR test2=2',
|
||||||
{}
|
dict()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -315,7 +314,7 @@ def test_delete_just_try(mocker, test_pgdb):
|
||||||
def test_truncate(mocker, test_pgdb):
|
def test_truncate(mocker, test_pgdb):
|
||||||
mocker.patch(
|
mocker.patch(
|
||||||
'mylib.pgsql.PgDB.doSQL',
|
'mylib.pgsql.PgDB.doSQL',
|
||||||
generate_mock_doSQL('TRUNCATE TABLE "mytable"', None)
|
generate_mock_doSQL('TRUNCATE "mytable"', None)
|
||||||
)
|
)
|
||||||
|
|
||||||
assert test_pgdb.truncate('mytable')
|
assert test_pgdb.truncate('mytable')
|
||||||
|
|
Loading…
Reference in a new issue