From ad04357c6ba35907e392297a083a966a82e228f3 Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Fri, 6 Jan 2023 19:36:14 +0100 Subject: [PATCH] Code cleaning --- .pylintrc | 6 +++ mylib/__init__.py | 35 ++++++------ mylib/email.py | 7 +-- mylib/mysql.py | 7 +-- mylib/opening_hours.py | 5 ++ mylib/oracle.py | 108 ++++++++++++++++++------------------- mylib/pgsql.py | 120 +++++++++++++++++++++-------------------- mylib/sftp.py | 5 +- mylib/telltale.py | 3 +- tests/test_config.py | 4 +- tests/test_oracle.py | 13 ++--- tests/test_pgsql.py | 15 +++--- 12 files changed, 172 insertions(+), 156 deletions(-) diff --git a/.pylintrc b/.pylintrc index a2fa3ea..14f7ccf 100644 --- a/.pylintrc +++ b/.pylintrc @@ -3,4 +3,10 @@ disable=invalid-name, locally-disabled, too-many-arguments, too-many-branches, + too-many-locals, + too-many-return-statements, + too-many-nested-blocks, + too-many-instance-attributes, + too-many-lines, line-too-long, + duplicate-code, diff --git a/mylib/__init__.py b/mylib/__init__.py index 557fde6..68f8086 100644 --- a/mylib/__init__.py +++ b/mylib/__init__.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - """ Some really common helper functions """ # @@ -8,7 +6,8 @@ def increment_prefix(prefix): - return "%s " % prefix if prefix else " " + """ Increment the given prefix with two spaces """ + return f'{prefix if prefix else " "} ' def pretty_format_value(value, encoding='utf8', prefix=None): @@ -18,12 +17,12 @@ def pretty_format_value(value, encoding='utf8', prefix=None): if isinstance(value, list): return pretty_format_list(value, encoding=encoding, prefix=prefix) if isinstance(value, bytes): - return "'%s'" % value.decode(encoding, errors='replace') + return f"'{value.decode(encoding, errors='replace')}'" if isinstance(value, str): - return "'%s'" % value + return f"'{value}'" if value is None: return "None" - return "%s (%s)" % (str(value), type(value)) + return f'{value} ({type(value)})' def pretty_format_value_in_list(value, encoding='utf8', prefix=None): @@ -50,13 +49,11 @@ def pretty_format_dict(value, encoding='utf8', prefix=None): result = [] for key in sorted(value.keys()): result.append( - "%s- %s : %s" % ( - prefix, key, - pretty_format_value_in_list( - value[key], - encoding=encoding, - prefix=prefix - ) + f'{prefix}- {key} : ' + + pretty_format_value_in_list( + value[key], + encoding=encoding, + prefix=prefix ) ) return "\n".join(result) @@ -68,13 +65,11 @@ def pretty_format_list(row, encoding='utf8', prefix=None): result = [] for idx, values in enumerate(row): result.append( - "%s- #%s : %s" % ( - prefix, idx, - pretty_format_value_in_list( - values, - encoding=encoding, - prefix=prefix - ) + f'{prefix}- #{idx} : ' + + pretty_format_value_in_list( + values, + encoding=encoding, + prefix=prefix ) ) return "\n".join(result) diff --git a/mylib/email.py b/mylib/email.py index 1c91e87..2deab40 100644 --- a/mylib/email.py +++ b/mylib/email.py @@ -46,15 +46,16 @@ class EmailClient(ConfigurableObject): # pylint: disable=useless-object-inherit 'just_try': False, } - templates = dict() + templates = {} def __init__(self, templates=None, **kwargs): super().__init__(**kwargs) assert templates is None or isinstance(templates, dict) - self.templates = templates if templates else dict() + self.templates = templates if templates else {} - def configure(self, use_smtp=True, just_try=True, ** kwargs): # pylint: disable=arguments-differ + # pylint: disable=arguments-differ,arguments-renamed + def configure(self, use_smtp=True, just_try=True, ** kwargs): """ Configure options on registered mylib.Config object """ section = super().configure(**kwargs) diff --git a/mylib/mysql.py b/mylib/mysql.py index db85d90..eb918b7 100644 --- a/mylib/mysql.py +++ b/mylib/mysql.py @@ -6,6 +6,7 @@ import logging import sys import MySQLdb +from MySQLdb._exceptions import Error log = logging.getLogger(__name__) @@ -32,7 +33,7 @@ class MyDB: try: con = MySQLdb.connect(self.host, self.user, self.pwd, self.db) self.con = con - except Exception: + except Error: log.fatal('Error connecting to MySQL server', exc_info=True) sys.exit(1) @@ -43,7 +44,7 @@ class MyDB: cursor.execute(sql) self.con.commit() return True - except Exception: + except Error: log.error('Error during SQL request "%s"', sql, exc_info=True) self.con.rollback() return False @@ -54,6 +55,6 @@ class MyDB: try: cursor.execute(sql) return cursor.fetchall() - except Exception: + except Error: log.error('Error during SQL request "%s"', sql, exc_info=True) return False diff --git a/mylib/opening_hours.py b/mylib/opening_hours.py index a7806a7..f96d5db 100644 --- a/mylib/opening_hours.py +++ b/mylib/opening_hours.py @@ -16,6 +16,7 @@ time_pattern = re.compile('^([0-9]{1,2})h([0-9]{2})?$') def easter_date(year): + """ Compute easter date for the specified year """ a = year // 100 b = year % 100 c = (3 * (a + 25)) // 4 @@ -35,6 +36,7 @@ def easter_date(year): def nonworking_french_public_days_of_the_year(year=None): + """ Compute dict of nonworking french public days for the specified year """ if year is None: year = datetime.date.today().year dp = easter_date(year) @@ -57,6 +59,7 @@ def nonworking_french_public_days_of_the_year(year=None): def parse_exceptional_closures(values): + """ Parse exceptional closures values """ exceptional_closures = [] for value in values: days = [] @@ -107,6 +110,7 @@ def parse_exceptional_closures(values): def parse_normal_opening_hours(values): + """ Parse normal opening hours """ normal_opening_hours = [] for value in values: days = [] @@ -161,6 +165,7 @@ def is_closed( nonworking_public_holidays_values=None, exceptional_closure_on_nonworking_public_days=False, when=None, on_error='raise' ): + """ Check if closed """ if not when: when = datetime.datetime.now() when_date = when.date() diff --git a/mylib/oracle.py b/mylib/oracle.py index 2599253..b6e66e4 100644 --- a/mylib/oracle.py +++ b/mylib/oracle.py @@ -94,7 +94,7 @@ class OracleDB: password=self._pwd, dsn=self._dsn ) - except Exception as err: + except cx_Oracle.Error as err: log.fatal( 'An error occured during Oracle database connection (%s@%s).', self._user, self._dsn, exc_info=1 @@ -111,6 +111,32 @@ class OracleDB: self._conn.close() self._conn = None + @staticmethod + def _log_query(sql, params): + log.debug( + 'Run SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + + @staticmethod + def _log_query_exception(sql, params): + log.exception( + 'Error during SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) @@ -126,14 +152,7 @@ class OracleDB: return True try: - log.debug( - 'Run SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params" - ) + self._log_query(sql, params) with self._conn.cursor() as cursor: if isinstance(params, dict): cursor.execute(sql, **params) @@ -141,16 +160,8 @@ class OracleDB: cursor.execute(sql) self._conn.commit() return True - except Exception: - log.error( - 'Error during SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params", - exc_info=True - ) + except cx_Oracle.Error: + self._log_query_exception(sql, params) self._conn.rollback() return False @@ -165,14 +176,7 @@ class OracleDB: :rtype: list, bool """ try: - log.debug( - 'Run SQL SELECT query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params" - ) + self._log_query(sql, params) with self._conn.cursor() as cursor: if isinstance(params, dict): cursor.execute(sql, **params) @@ -183,16 +187,8 @@ class OracleDB: ) results = cursor.fetchall() return results - except Exception: - log.error( - 'Error during SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params", - exc_info=True - ) + except cx_Oracle.Error: + self._log_query_exception(sql, params) return False # @@ -201,7 +197,8 @@ class OracleDB: @staticmethod def format_param(param): - return ':{0}'.format(param) + """ Format SQL query parameter for prepared query """ + return f':{param}' @classmethod def _combine_params(cls, params, to_add=None, **kwargs): @@ -232,7 +229,7 @@ class OracleDB: :rtype: string, bool """ if params is None: - params = dict() + params = {} if where_op is None: where_op = 'AND' @@ -249,7 +246,7 @@ class OracleDB: sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) sql_where_clauses.append(sql2) return ( - (" %s " % where_op).join(sql_where_clauses), + f' {where_op} '.join(sql_where_clauses), params ) @@ -260,14 +257,14 @@ class OracleDB: if field in params: idx = 1 while param in params: - param = '%s_%d' % (field, idx) + param = f'{field}_{idx}' idx += 1 cls._combine_params(params, {param: value}) sql_where_clauses.append( - '"{field}" = {param}'.format(field=field, param=cls.format_param(param)) + f'"{field}" = {cls.format_param(param)}' ) return ( - (" %s " % where_op).join(sql_where_clauses), + f' {where_op} '.join(sql_where_clauses), params ) raise OracleDBUnsupportedWHEREClauses(where_clauses) @@ -293,7 +290,7 @@ class OracleDB: @staticmethod def _quote_table_name(table): """ Quote table name """ - return '"{0}"'.format( + return '"{0}"'.format( # pylint: disable=consider-using-f-string '"."'.join( table.split('.') ) @@ -301,6 +298,7 @@ class OracleDB: def insert(self, table, values, just_try=False): """ Run INSERT SQL query """ + # pylint: disable=consider-using-f-string sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format( self._quote_table_name(table), '", "'.join(values.keys()), @@ -322,10 +320,11 @@ class OracleDB: def update(self, table, values, where_clauses, where_op=None, just_try=False): """ Run UPDATE SQL query """ + # pylint: disable=consider-using-f-string sql = 'UPDATE {0} SET {1}'.format( self._quote_table_name(table), ", ".join([ - '"{0}" = {1}'.format(key, self.format_param(key)) + f'"{key}" = {self.format_param(key)}' for key in values ]) ) @@ -349,8 +348,8 @@ class OracleDB: def delete(self, table, where_clauses, where_op='AND', just_try=False): """ Run DELETE SQL query """ - sql = 'DELETE FROM {0}'.format(self._quote_table_name(table)) - params = dict() + sql = f'DELETE FROM {self._quote_table_name(table)}' + params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) @@ -370,8 +369,7 @@ class OracleDB: def truncate(self, table, just_try=False): """ Run TRUNCATE SQL query """ - - sql = 'TRUNCATE TABLE {0}'.format(self._quote_table_name(table)) + sql = f'TRUNCATE TABLE {self._quote_table_name(table)}' if just_try: log.debug("Just-try mode: execute TRUNCATE query: %s", sql) @@ -389,12 +387,12 @@ class OracleDB: if fields is None: sql += "*" elif isinstance(fields, str): - sql += '"{0}"'.format(fields) + sql += f'"{fields}"' else: - sql += '"{0}"'.format('", "'.join(fields)) + sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string - sql += ' FROM {0}'.format(self._quote_table_name(table)) - params = dict() + sql += f' FROM {self._quote_table_name(table)}' + params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) @@ -404,14 +402,14 @@ class OracleDB: if order_by: if isinstance(order_by, str): - sql += ' ORDER BY {0}'.format(order_by) + sql += f' ORDER BY {order_by}' elif ( isinstance(order_by, (list, tuple)) and len(order_by) == 2 and isinstance(order_by[0], str) and isinstance(order_by[1], str) and order_by[1].upper() in ('ASC', 'UPPER') ): - sql += ' ORDER BY "{0}" {1}'.format(order_by[0], order_by[1].upper()) + sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' else: raise OracleDBInvalidOrderByClause(order_by) diff --git a/mylib/pgsql.py b/mylib/pgsql.py index f37e904..02f5785 100644 --- a/mylib/pgsql.py +++ b/mylib/pgsql.py @@ -102,7 +102,7 @@ class PgDB: host=self._host, password=self._pwd ) - except Exception as err: + except psycopg2.Error as err: log.fatal( 'An error occured during Postgresql database connection (%s@%s, database=%s).', self._user, self._host, self._db, exc_info=1 @@ -125,13 +125,39 @@ class PgDB: try: self._conn.set_client_encoding(enc) return True - except Exception: + except psycopg2.Error: log.error( 'An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1 ) return False + @staticmethod + def _log_query(sql, params): + log.debug( + 'Run SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + + @staticmethod + def _log_query_exception(sql, params): + log.exception( + 'Error during SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) @@ -148,30 +174,15 @@ class PgDB: cursor = self._conn.cursor() try: - log.debug( - 'Run SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params" - ) + self._log_query(sql, params) if params is None: cursor.execute(sql) else: cursor.execute(sql, params) self._conn.commit() return True - except Exception: - log.error( - 'Error during SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params", - exc_info=True - ) + except psycopg2.Error: + self._log_query_exception(sql, params) self._conn.rollback() return False @@ -187,36 +198,29 @@ class PgDB: """ cursor = self._conn.cursor() try: - log.debug( - 'Run SQL SELECT query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params" - ) + self._log_query(sql, params) cursor.execute(sql, params) results = cursor.fetchall() return results - except Exception: - log.error( - 'Error during SQL query "%s" %s', - sql, - "with params = %s" % ', '.join([ - "%s = %s" % (key, value) - for key, value in params.items() - ]) if params else "without params", - exc_info=True - ) + except psycopg2.Error: + self._log_query_exception(sql, params) return False + @staticmethod + def _map_row_fields_by_index(fields, row): + return dict( + (field, row[idx]) + for idx, field in enumerate(fields) + ) + # # SQL helpers # @staticmethod def format_param(param): - return '%({0})s'.format(param) + """ Format SQL query parameter for prepared query """ + return f'%({param})s' @classmethod def _combine_params(cls, params, to_add=None, **kwargs): @@ -247,7 +251,7 @@ class PgDB: :rtype: string, bool """ if params is None: - params = dict() + params = {} if where_op is None: where_op = 'AND' @@ -264,7 +268,7 @@ class PgDB: sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) sql_where_clauses.append(sql2) return ( - (" %s " % where_op).join(sql_where_clauses), + f' {where_op} '.join(sql_where_clauses), params ) @@ -275,14 +279,14 @@ class PgDB: if field in params: idx = 1 while param in params: - param = '%s_%d' % (field, idx) + param = f'{field}_{idx}' idx += 1 cls._combine_params(params, {param: value}) sql_where_clauses.append( - '"{field}" = {param}'.format(field=field, param=cls.format_param(param)) + f'"{field}" = {cls.format_param(param)}' ) return ( - (" %s " % where_op).join(sql_where_clauses), + f' {where_op} '.join(sql_where_clauses), params ) raise PgDBUnsupportedWHEREClauses(where_clauses) @@ -308,7 +312,7 @@ class PgDB: @staticmethod def _quote_table_name(table): """ Quote table name """ - return '"{0}"'.format( + return '"{0}"'.format( # pylint: disable=consider-using-f-string '"."'.join( table.split('.') ) @@ -316,6 +320,7 @@ class PgDB: def insert(self, table, values, just_try=False): """ Run INSERT SQL query """ + # pylint: disable=consider-using-f-string sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format( self._quote_table_name(table), '", "'.join(values.keys()), @@ -337,10 +342,11 @@ class PgDB: def update(self, table, values, where_clauses, where_op=None, just_try=False): """ Run UPDATE SQL query """ + # pylint: disable=consider-using-f-string sql = 'UPDATE {0} SET {1}'.format( self._quote_table_name(table), ", ".join([ - '"{0}" = {1}'.format(key, self.format_param(key)) + f'"{key}" = {self.format_param(key)}' for key in values ]) ) @@ -364,8 +370,8 @@ class PgDB: def delete(self, table, where_clauses, where_op='AND', just_try=False): """ Run DELETE SQL query """ - sql = 'DELETE FROM {0}'.format(self._quote_table_name(table)) - params = dict() + sql = f'DELETE FROM {self._quote_table_name(table)}' + params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) @@ -385,8 +391,7 @@ class PgDB: def truncate(self, table, just_try=False): """ Run TRUNCATE SQL query """ - - sql = 'TRUNCATE {0}'.format(self._quote_table_name(table)) + sql = f'TRUNCATE TABLE {self._quote_table_name(table)}' if just_try: log.debug("Just-try mode: execute TRUNCATE query: %s", sql) @@ -404,12 +409,12 @@ class PgDB: if fields is None: sql += "*" elif isinstance(fields, str): - sql += '"{0}"'.format(fields) + sql += f'"{fields}"' else: - sql += '"{0}"'.format('", "'.join(fields)) + sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string - sql += ' FROM {0}'.format(self._quote_table_name(table)) - params = dict() + sql += f' FROM {self._quote_table_name(table)}' + params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) @@ -419,14 +424,14 @@ class PgDB: if order_by: if isinstance(order_by, str): - sql += ' ORDER BY {0}'.format(order_by) + sql += f' ORDER BY {order_by}' elif ( isinstance(order_by, (list, tuple)) and len(order_by) == 2 and isinstance(order_by[0], str) and isinstance(order_by[1], str) and order_by[1].upper() in ('ASC', 'UPPER') ): - sql += ' ORDER BY "{0}" {1}'.format(order_by[0], order_by[1].upper()) + sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' else: raise PgDBInvalidOrderByClause(order_by) @@ -454,7 +459,8 @@ class PgDB: elif isinstance(value, datetime.date): value = cls._format_date(value) - return "'%s'" % value.replace("'", "''") + # pylint: disable=consider-using-f-string + return "'{0}'".format(value.replace("'", "''")) @classmethod def _format_datetime(cls, value): diff --git a/mylib/sftp.py b/mylib/sftp.py index 5187251..b32912f 100644 --- a/mylib/sftp.py +++ b/mylib/sftp.py @@ -39,7 +39,8 @@ class SFTPClient(ConfigurableObject): sftp_client = None initial_directory = None - def configure(self, just_try=True, ** kwargs): # pylint: disable=arguments-differ + # pylint: disable=arguments-differ,arguments-renamed + def configure(self, just_try=True, **kwargs): """ Configure options on registered mylib.Config object """ section = super().configure(**kwargs) @@ -67,7 +68,7 @@ class SFTPClient(ConfigurableObject): if just_try: section.add_option( BooleanOption, 'just_try', default=self._defaults['just_try'], - comment='Just-try mode: do not really send emails') + comment='Just-try mode: do not really make change on remote SFTP host') return section diff --git a/mylib/telltale.py b/mylib/telltale.py index 69422f7..b29148c 100644 --- a/mylib/telltale.py +++ b/mylib/telltale.py @@ -42,7 +42,8 @@ class TelltaleFile: try: os.utime(self.filepath, None) except FileNotFoundError: - open(self.filepath, 'a').close() + # pylint: disable=consider-using-with + open(self.filepath, 'a', encoding="utf-8").close() def remove(self): """ Remove the telltale file """ diff --git a/tests/test_config.py b/tests/test_config.py index 4018f7b..e2fabc4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -179,7 +179,7 @@ def test_not_isset(): section = config.add_section('my_section') assert isinstance(section, ConfigSection) - option = section.add_option(StringOption, opt_name) + section.add_option(StringOption, opt_name) assert not config.isset(section_name, opt_name) @@ -206,7 +206,7 @@ def test_get_default(): opt_name = 'my_option' opt_default_value = 'value' section = config.add_section('my_section') - option = section.add_option(StringOption, opt_name, default=opt_default_value) + section.add_option(StringOption, opt_name, default=opt_default_value) config.parse_arguments_options(argv=[], create=False) assert config.get(section_name, opt_name) == opt_default_value diff --git a/tests/test_oracle.py b/tests/test_oracle.py index a1169e9..8ee3887 100644 --- a/tests/test_oracle.py +++ b/tests/test_oracle.py @@ -1,6 +1,7 @@ # pylint: disable=redefined-outer-name,missing-function-docstring,protected-access """ Tests on opening hours helpers """ +import cx_Oracle import pytest from mylib.oracle import OracleDB @@ -20,7 +21,7 @@ class FakeCXOracleCursor: def execute(self, sql, **params): assert self.opened if self.expected_exception: - raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params)) + raise cx_Oracle.Error("%s.execute(%s, %s): expected exception" % (self, sql, params)) if self.expected_just_try and not sql.lower().startswith('select '): assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params) assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql) @@ -49,7 +50,7 @@ class FakeCXOracle: """ Fake cx_Oracle connection """ expected_sql = None - expected_params = dict() + expected_params = {} expected_return = True expected_just_try = False expected_exception = False @@ -124,7 +125,7 @@ def fake_connected_just_try_oracledb(fake_just_try_oracledb): return fake_just_try_oracledb -def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value +def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs) @@ -136,7 +137,7 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum assert False, "doSQL() may not be executed in just try mode" -def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value +def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params) @@ -177,7 +178,7 @@ def test_format_where_clauses_params_are_preserved(): def test_format_where_clauses_raw(): - assert OracleDB._format_where_clauses('test = test') == (('test = test'), dict()) + assert OracleDB._format_where_clauses('test = test') == (('test = test'), {}) def test_format_where_clauses_tuple_clause_with_params(): @@ -240,7 +241,7 @@ def test_add_where_clauses_with_op(): where_clauses = ('test1=1', 'test2=2') assert OracleDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( sql + ' WHERE test1=1 OR test2=2', - dict() + {} ) diff --git a/tests/test_pgsql.py b/tests/test_pgsql.py index c2d719f..87e8265 100644 --- a/tests/test_pgsql.py +++ b/tests/test_pgsql.py @@ -1,6 +1,7 @@ # pylint: disable=redefined-outer-name,missing-function-docstring,protected-access """ Tests on opening hours helpers """ +import psycopg2 import pytest from mylib.pgsql import PgDB @@ -18,7 +19,7 @@ class FakePsycopg2Cursor: def execute(self, sql, params=None): if self.expected_exception: - raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params)) + raise psycopg2.Error("%s.execute(%s, %s): expected exception" % (self, sql, params)) if self.expected_just_try and not sql.lower().startswith('select '): assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params) assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql) @@ -59,7 +60,7 @@ class FakePsycopg2: self._check_just_try() assert len(arg) == 1 and isinstance(arg[0], str) if self.expected_exception: - raise Exception("set_client_encoding(%s): Expected exception" % arg[0]) + raise psycopg2.Error("set_client_encoding(%s): Expected exception" % arg[0]) return self.expected_return def cursor(self): @@ -121,7 +122,7 @@ def fake_connected_just_try_pgdb(fake_just_try_pgdb): return fake_just_try_pgdb -def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value +def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs) @@ -133,7 +134,7 @@ def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argum assert False, "doSQL() may not be executed in just try mode" -def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value +def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params) @@ -174,7 +175,7 @@ def test_format_where_clauses_params_are_preserved(): def test_format_where_clauses_raw(): - assert PgDB._format_where_clauses('test = test') == (('test = test'), dict()) + assert PgDB._format_where_clauses('test = test') == (('test = test'), {}) def test_format_where_clauses_tuple_clause_with_params(): @@ -237,7 +238,7 @@ def test_add_where_clauses_with_op(): where_clauses = ('test1=1', 'test2=2') assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( sql + ' WHERE test1=1 OR test2=2', - dict() + {} ) @@ -314,7 +315,7 @@ def test_delete_just_try(mocker, test_pgdb): def test_truncate(mocker, test_pgdb): mocker.patch( 'mylib.pgsql.PgDB.doSQL', - generate_mock_doSQL('TRUNCATE "mytable"', None) + generate_mock_doSQL('TRUNCATE TABLE "mytable"', None) ) assert test_pgdb.truncate('mytable')