From da63f533be2ac60526a2f595e7649ff757bc885b Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Sat, 7 Jan 2023 02:19:18 +0100 Subject: [PATCH] Abstract common DB methods in mysql.db.DB class and use it as base to implement PgDB, OracleDB and MyDB --- mylib/db.py | 392 ++++++++++++++++++++++++++++++++++++++ mylib/mysql.py | 117 ++++++++---- mylib/oracle.py | 331 ++------------------------------ mylib/pgsql.py | 334 ++------------------------------ tests/test_mysql.py | 455 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 954 insertions(+), 675 deletions(-) create mode 100644 mylib/db.py create mode 100644 tests/test_mysql.py diff --git a/mylib/db.py b/mylib/db.py new file mode 100644 index 0000000..227208a --- /dev/null +++ b/mylib/db.py @@ -0,0 +1,392 @@ +# -*- coding: utf-8 -*- + +""" Basic SQL DB client """ + +import logging + + +log = logging.getLogger(__name__) + + +# +# Exceptions +# + +class DBException(Exception): + """ That is the base exception class for all the other exceptions provided by this module. """ + + def __init__(self, error, *args, **kwargs): + for arg, value in kwargs.items(): + setattr(self, arg, value) + super().__init__(error.format(*args, **kwargs)) + + +class DBNotImplemented(DBException, RuntimeError): + """ + Raised when calling a method not implemented in child class + """ + + def __init__(self, method, class_name): + super().__init__( + "The method {method} is not yet implemented in class {class_name}", + method=method, class_name=class_name + ) + + +class DBFailToConnect(DBException, RuntimeError): + """ + Raised on connecting error occurred + """ + + def __init__(self, uri): + super().__init__( + "An error occured during database connection ({uri})", + uri=uri + ) + + +class DBDuplicatedSQLParameter(DBException, KeyError): + """ + Raised when trying to set a SQL query parameter + and an other parameter with the same name is already set + """ + + def __init__(self, parameter_name): + super().__init__( + "Duplicated SQL parameter '{parameter_name}'", + parameter_name=parameter_name + ) + + +class DBUnsupportedWHEREClauses(DBException, TypeError): + """ + Raised when trying to execute query with unsupported + WHERE clauses provided + """ + + def __init__(self, where_clauses): + super().__init__( + "Unsupported WHERE clauses: {where_clauses}", + where_clauses=where_clauses + ) + + +class DBInvalidOrderByClause(DBException, TypeError): + """ + Raised when trying to select on table with invalid + ORDER BY clause provided + """ + + def __init__(self, order_by): + super().__init__( + "Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values (ordering field name and direction)", + order_by=order_by + ) + + +class DB: + """ Database client """ + + just_try = False + + def __init__(self, just_try=False, **kwargs): + self.just_try = just_try + self._conn = None + for arg, value in kwargs.items(): + setattr(self, f'_{arg}', value) + + def connect(self, exit_on_error=True): + """ Connect to DB server """ + raise DBNotImplemented('connect', self.__class__.__name__) + + def close(self): + """ Close connection with DB server (if opened) """ + if self._conn: + self._conn.close() + self._conn = None + + @staticmethod + def _log_query(sql, params): + log.debug( + 'Run SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + + @staticmethod + def _log_query_exception(sql, params): + log.exception( + 'Error during SQL query "%s" %s', + sql, + "with params = {0}".format( # pylint: disable=consider-using-f-string + ', '.join([ + f'{key} = {value}' + for key, value in params.items() + ]) if params else "without params" + ) + ) + + def doSQL(self, sql, params=None): + """ + Run SQL query and commit changes (rollback on error) + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: True on success, False otherwise + :rtype: bool + """ + raise DBNotImplemented('doSQL', self.__class__.__name__) + + def doSelect(self, sql, params=None): + """ + Run SELECT SQL query and return list of selected rows as dict + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: List of selected rows as dict on success, False otherwise + :rtype: list, bool + """ + raise DBNotImplemented('doSelect', self.__class__.__name__) + + # + # SQL helpers + # + + @staticmethod + def _quote_table_name(table): + """ Quote table name """ + return '"{0}"'.format( # pylint: disable=consider-using-f-string + '"."'.join( + table.split('.') + ) + ) + + @staticmethod + def _quote_field_name(field): + """ Quote table name """ + return f'"{field}"' + + @staticmethod + def format_param(param): + """ Format SQL query parameter for prepared query """ + return f'%({param})s' + + @classmethod + def _combine_params(cls, params, to_add=None, **kwargs): + if to_add: + assert isinstance(to_add, dict), "to_add must be a dict or None" + params = cls._combine_params(params, **to_add) + + for param, value in kwargs.items(): + if param in params: + raise DBDuplicatedSQLParameter(param) + params[param] = value + return params + + @classmethod + def _format_where_clauses(cls, where_clauses, params=None, where_op=None): + """ + Format WHERE clauses + + :param where_clauses: The WHERE clauses. Could be: + - a raw SQL WHERE clause as string + - a tuple of two elements: a raw WHERE clause and its parameters as dict + - a dict of WHERE clauses with field name as key and WHERE clause value as value + - a list of any of previous valid WHERE clauses + :param params: Dict of other already set SQL query parameters (optional) + :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) + + :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success + :rtype: string, bool + """ + if params is None: + params = {} + if where_op is None: + where_op = 'AND' + + if isinstance(where_clauses, str): + return (where_clauses, params) + + if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict): + cls._combine_params(params, where_clauses[1]) + return (where_clauses[0], params) + + if isinstance(where_clauses, (list, tuple)): + sql_where_clauses = [] + for where_clause in where_clauses: + sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) + sql_where_clauses.append(sql2) + return ( + f' {where_op} '.join(sql_where_clauses), + params + ) + + if isinstance(where_clauses, dict): + sql_where_clauses = [] + for field, value in where_clauses.items(): + param = field + if field in params: + idx = 1 + while param in params: + param = f'{field}_{idx}' + idx += 1 + cls._combine_params(params, {param: value}) + sql_where_clauses.append( + f'{cls._quote_field_name(field)} = {cls.format_param(param)}' + ) + return ( + f' {where_op} '.join(sql_where_clauses), + params + ) + raise DBUnsupportedWHEREClauses(where_clauses) + + @classmethod + def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): + """ + Add WHERE clauses to an SQL query + + :param sql: The SQL query to complete + :param params: The dict of parameters of the SQL query to complete + :param where_clauses: The WHERE clause (see _format_where_clauses()) + :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) + + :return: + :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters + """ + if where_clauses: + sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op) + sql += " WHERE " + sql_where + return (sql, params) + + def insert(self, table, values, just_try=False): + """ Run INSERT SQL query """ + # pylint: disable=consider-using-f-string + sql = 'INSERT INTO {0} ({1}) VALUES ({2})'.format( + self._quote_table_name(table), + ', '.join([ + self._quote_field_name(field) + for field in values.keys() + ]), + ", ".join([ + self.format_param(key) + for key in values + ]) + ) + + if just_try: + log.debug("Just-try mode: execute INSERT query: %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=values): + log.error("Fail to execute INSERT query (SQL: %s)", sql) + return False + return True + + def update(self, table, values, where_clauses, where_op=None, just_try=False): + """ Run UPDATE SQL query """ + # pylint: disable=consider-using-f-string + sql = 'UPDATE {0} SET {1}'.format( + self._quote_table_name(table), + ", ".join([ + f'{self._quote_field_name(key)} = {self.format_param(key)}' + for key in values + ]) + ) + params = values + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if just_try: + log.debug("Just-try mode: execute UPDATE query: %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=params): + log.error("Fail to execute UPDATE query (SQL: %s)", sql) + return False + return True + + def delete(self, table, where_clauses, where_op='AND', just_try=False): + """ Run DELETE SQL query """ + sql = f'DELETE FROM {self._quote_table_name(table)}' + params = {} + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if just_try: + log.debug("Just-try mode: execute UPDATE query: %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=params): + log.error("Fail to execute UPDATE query (SQL: %s)", sql) + return False + return True + + def truncate(self, table, just_try=False): + """ Run TRUNCATE SQL query """ + sql = f'TRUNCATE TABLE {self._quote_table_name(table)}' + + if just_try: + log.debug("Just-try mode: execute TRUNCATE query: %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql): + log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) + return False + return True + + def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False): + """ Run SELECT SQL query """ + sql = "SELECT " + if fields is None: + sql += "*" + elif isinstance(fields, str): + sql += f'{self._quote_field_name(fields)}' + else: + sql += ', '.join([self._quote_field_name(field) for field in fields]) + + sql += f' FROM {self._quote_table_name(table)}' + params = {} + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if order_by: + if isinstance(order_by, str): + sql += f' ORDER BY {order_by}' + elif ( + isinstance(order_by, (list, tuple)) and len(order_by) == 2 + and isinstance(order_by[0], str) + and isinstance(order_by[1], str) + and order_by[1].upper() in ('ASC', 'UPPER') + ): + sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' + else: + raise DBInvalidOrderByClause(order_by) + + if just_try: + log.debug("Just-try mode: execute SELECT query : %s", sql) + return just_try + + return self.doSelect(sql, params=params) diff --git a/mylib/mysql.py b/mylib/mysql.py index eb918b7..0823fcd 100644 --- a/mylib/mysql.py +++ b/mylib/mysql.py @@ -8,53 +8,106 @@ import sys import MySQLdb from MySQLdb._exceptions import Error +from mylib.db import DB +from mylib.db import DBFailToConnect + + log = logging.getLogger(__name__) -class MyDB: +class MyDB(DB): """ MySQL client """ - host = "" - user = "" - pwd = "" - db = "" + _host = None + _user = None + _pwd = None + _db = None - con = 0 + def __init__(self, host, user, pwd, db, charset=None, **kwargs): + self._host = host + self._user = user + self._pwd = pwd + self._db = db + self._charset = charset if charset else 'utf8' + super().__init__(**kwargs) - def __init__(self, host, user, pwd, db): - self.host = host - self.user = user - self.pwd = pwd - self.db = db - - def connect(self): + def connect(self, exit_on_error=True): """ Connect to MySQL server """ - if self.con == 0: + if self._conn is None: try: - con = MySQLdb.connect(self.host, self.user, self.pwd, self.db) - self.con = con - except Error: - log.fatal('Error connecting to MySQL server', exc_info=True) - sys.exit(1) + self._conn = MySQLdb.connect( + host=self._host, user=self._user, passwd=self._pwd, + db=self._db, charset=self._charset, use_unicode=True) + except Error as err: + log.fatal( + 'An error occured during MySQL database connection (%s@%s:%s).', + self._user, self._host, self._db, exc_info=1 + ) + if exit_on_error: + sys.exit(1) + else: + raise DBFailToConnect(f'{self._user}@{self._host}:{self._db}') from err + return True - def doSQL(self, sql): - """ Run INSERT/UPDATE/DELETE/... SQL query """ - cursor = self.con.cursor() + def doSQL(self, sql, params=None): + """ + Run SQL query and commit changes (rollback on error) + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: True on success, False otherwise + :rtype: bool + """ + if self.just_try: + log.debug("Just-try mode : do not really execute SQL query '%s'", sql) + return True + cursor = self._conn.cursor() try: - cursor.execute(sql) - self.con.commit() + self._log_query(sql, params) + cursor.execute(sql, params) + self._conn.commit() return True except Error: - log.error('Error during SQL request "%s"', sql, exc_info=True) - self.con.rollback() + self._log_query_exception(sql, params) + self._conn.rollback() return False - def doSelect(self, sql): - """ Run SELECT SQL query and return result as dict """ - cursor = self.con.cursor() + def doSelect(self, sql, params=None): + """ + Run SELECT SQL query and return list of selected rows as dict + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: List of selected rows as dict on success, False otherwise + :rtype: list, bool + """ try: - cursor.execute(sql) - return cursor.fetchall() + self._log_query(sql, params) + cursor = self._conn.cursor() + cursor.execute(sql, params) + return [ + dict( + (field[0], row[idx]) + for idx, field in enumerate(cursor.description) + ) + for row in cursor.fetchall() + ] except Error: - log.error('Error during SQL request "%s"', sql, exc_info=True) + self._log_query_exception(sql, params) return False + + @staticmethod + def _quote_table_name(table): + """ Quote table name """ + return '`{0}`'.format( # pylint: disable=consider-using-f-string + '`.`'.join( + table.split('.') + ) + ) + + @staticmethod + def _quote_field_name(field): + """ Quote table name """ + return f'`{field}`' diff --git a/mylib/oracle.py b/mylib/oracle.py index b6e66e4..12feebc 100644 --- a/mylib/oracle.py +++ b/mylib/oracle.py @@ -7,82 +7,24 @@ import sys import cx_Oracle +from mylib.db import DB +from mylib.db import DBFailToConnect + log = logging.getLogger(__name__) -# -# Exceptions -# - -class OracleDBException(Exception): - """ That is the base exception class for all the other exceptions provided by this module. """ - - def __init__(self, error, *args, **kwargs): - for arg, value in kwargs.items(): - setattr(self, arg, value) - super().__init__(error.format(*args, **kwargs)) - - -class OracleDBFailToConnect(OracleDBException, RuntimeError): - """ - Raised on connecting error occurred - """ - - def __init__(self, dsn, user): - super().__init__( - "An error occured during Oracle database connection ({user}@{dsn})", - user=user, dsn=dsn - ) - - -class OracleDBDuplicatedSQLParameter(OracleDBException, KeyError): - """ - Raised when trying to set a SQL query parameter - and an other parameter with the same name is already set - """ - - def __init__(self, parameter_name): - super().__init__( - "Duplicated SQL parameter '{parameter_name}'", - parameter_name=parameter_name - ) - - -class OracleDBUnsupportedWHEREClauses(OracleDBException, TypeError): - """ - Raised when trying to execute query with unsupported - WHERE clauses provided - """ - - def __init__(self, where_clauses): - super().__init__( - "Unsupported WHERE clauses: {where_clauses}", - where_clauses=where_clauses - ) - - -class OracleDBInvalidOrderByClause(OracleDBException, TypeError): - """ - Raised when trying to select on table with invalid - ORDER BY clause provided - """ - - def __init__(self, order_by): - super().__init__( - "Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values (ordering field name and direction)", - order_by=order_by - ) - - -class OracleDB: +class OracleDB(DB): """ Oracle client """ - def __init__(self, dsn, user, pwd, just_try=False): + _dsn = None + _user = None + _pwd = None + + def __init__(self, dsn, user, pwd, **kwargs): self._dsn = dsn self._user = user self._pwd = pwd - self._conn = None - self.just_try = just_try + super().__init__(**kwargs) def connect(self, exit_on_error=True): """ Connect to Oracle server """ @@ -102,41 +44,9 @@ class OracleDB: if exit_on_error: sys.exit(1) else: - raise OracleDBFailToConnect(self._dsn, self._user) from err + raise DBFailToConnect(f'{self._user}@{self._dsn}') from err return True - def close(self): - """ Close connection with Oracle server (if opened) """ - if self._conn: - self._conn.close() - self._conn = None - - @staticmethod - def _log_query(sql, params): - log.debug( - 'Run SQL query "%s" %s', - sql, - "with params = {0}".format( # pylint: disable=consider-using-f-string - ', '.join([ - f'{key} = {value}' - for key, value in params.items() - ]) if params else "without params" - ) - ) - - @staticmethod - def _log_query_exception(sql, params): - log.exception( - 'Error during SQL query "%s" %s', - sql, - "with params = {0}".format( # pylint: disable=consider-using-f-string - ', '.join([ - f'{key} = {value}' - for key, value in params.items() - ]) if params else "without params" - ) - ) - def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) @@ -199,222 +109,3 @@ class OracleDB: def format_param(param): """ Format SQL query parameter for prepared query """ return f':{param}' - - @classmethod - def _combine_params(cls, params, to_add=None, **kwargs): - if to_add: - assert isinstance(to_add, dict), "to_add must be a dict or None" - params = cls._combine_params(params, **to_add) - - for param, value in kwargs.items(): - if param in params: - raise OracleDBDuplicatedSQLParameter(param) - params[param] = value - return params - - @classmethod - def _format_where_clauses(cls, where_clauses, params=None, where_op=None): - """ - Format WHERE clauses - - :param where_clauses: The WHERE clauses. Could be: - - a raw SQL WHERE clause as string - - a tuple of two elements: a raw WHERE clause and its parameters as dict - - a dict of WHERE clauses with field name as key and WHERE clause value as value - - a list of any of previous valid WHERE clauses - :param params: Dict of other already set SQL query parameters (optional) - :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) - - :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success - :rtype: string, bool - """ - if params is None: - params = {} - if where_op is None: - where_op = 'AND' - - if isinstance(where_clauses, str): - return (where_clauses, params) - - if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict): - cls._combine_params(params, where_clauses[1]) - return (where_clauses[0], params) - - if isinstance(where_clauses, (list, tuple)): - sql_where_clauses = [] - for where_clause in where_clauses: - sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) - sql_where_clauses.append(sql2) - return ( - f' {where_op} '.join(sql_where_clauses), - params - ) - - if isinstance(where_clauses, dict): - sql_where_clauses = [] - for field, value in where_clauses.items(): - param = field - if field in params: - idx = 1 - while param in params: - param = f'{field}_{idx}' - idx += 1 - cls._combine_params(params, {param: value}) - sql_where_clauses.append( - f'"{field}" = {cls.format_param(param)}' - ) - return ( - f' {where_op} '.join(sql_where_clauses), - params - ) - raise OracleDBUnsupportedWHEREClauses(where_clauses) - - @classmethod - def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): - """ - Add WHERE clauses to an SQL query - - :param sql: The SQL query to complete - :param params: The dict of parameters of the SQL query to complete - :param where_clauses: The WHERE clause (see _format_where_clauses()) - :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) - - :return: - :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters - """ - if where_clauses: - sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op) - sql += " WHERE " + sql_where - return (sql, params) - - @staticmethod - def _quote_table_name(table): - """ Quote table name """ - return '"{0}"'.format( # pylint: disable=consider-using-f-string - '"."'.join( - table.split('.') - ) - ) - - def insert(self, table, values, just_try=False): - """ Run INSERT SQL query """ - # pylint: disable=consider-using-f-string - sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format( - self._quote_table_name(table), - '", "'.join(values.keys()), - ", ".join([ - self.format_param(key) - for key in values - ]) - ) - - if just_try: - log.debug("Just-try mode: execute INSERT query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=values): - log.error("Fail to execute INSERT query (SQL: %s)", sql) - return False - return True - - def update(self, table, values, where_clauses, where_op=None, just_try=False): - """ Run UPDATE SQL query """ - # pylint: disable=consider-using-f-string - sql = 'UPDATE {0} SET {1}'.format( - self._quote_table_name(table), - ", ".join([ - f'"{key}" = {self.format_param(key)}' - for key in values - ]) - ) - params = values - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if just_try: - log.debug("Just-try mode: execute UPDATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL: %s)", sql) - return False - return True - - def delete(self, table, where_clauses, where_op='AND', just_try=False): - """ Run DELETE SQL query """ - sql = f'DELETE FROM {self._quote_table_name(table)}' - params = {} - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if just_try: - log.debug("Just-try mode: execute UPDATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL: %s)", sql) - return False - return True - - def truncate(self, table, just_try=False): - """ Run TRUNCATE SQL query """ - sql = f'TRUNCATE TABLE {self._quote_table_name(table)}' - - if just_try: - log.debug("Just-try mode: execute TRUNCATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql): - log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) - return False - return True - - def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False): - """ Run SELECT SQL query """ - sql = "SELECT " - if fields is None: - sql += "*" - elif isinstance(fields, str): - sql += f'"{fields}"' - else: - sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string - - sql += f' FROM {self._quote_table_name(table)}' - params = {} - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if order_by: - if isinstance(order_by, str): - sql += f' ORDER BY {order_by}' - elif ( - isinstance(order_by, (list, tuple)) and len(order_by) == 2 - and isinstance(order_by[0], str) - and isinstance(order_by[1], str) - and order_by[1].upper() in ('ASC', 'UPPER') - ): - sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' - else: - raise OracleDBInvalidOrderByClause(order_by) - - if just_try: - log.debug("Just-try mode: execute SELECT query : %s", sql) - return just_try - - return self.doSelect(sql, params=params) diff --git a/mylib/pgsql.py b/mylib/pgsql.py index 02f5785..882e8a2 100644 --- a/mylib/pgsql.py +++ b/mylib/pgsql.py @@ -8,86 +8,28 @@ import sys import psycopg2 +from mylib.db import DB, DBFailToConnect + log = logging.getLogger(__name__) -# -# Exceptions -# - -class PgDBException(Exception): - """ That is the base exception class for all the other exceptions provided by this module. """ - - def __init__(self, error, *args, **kwargs): - for arg, value in kwargs.items(): - setattr(self, arg, value) - super().__init__(error.format(*args, **kwargs)) - - -class PgDBFailToConnect(PgDBException, RuntimeError): - """ - Raised on connecting error occurred - """ - - def __init__(self, host, user, db): - super().__init__( - "An error occured during Postgresql database connection ({user}@{host}, database={db})", - user=user, host=host, db=db - ) - - -class PgDBDuplicatedSQLParameter(PgDBException, KeyError): - """ - Raised when trying to set a SQL query parameter - and an other parameter with the same name is already set - """ - - def __init__(self, parameter_name): - super().__init__( - "Duplicated SQL parameter '{parameter_name}'", - parameter_name=parameter_name - ) - - -class PgDBUnsupportedWHEREClauses(PgDBException, TypeError): - """ - Raised when trying to execute query with unsupported - WHERE clauses provided - """ - - def __init__(self, where_clauses): - super().__init__( - "Unsupported WHERE clauses: {where_clauses}", - where_clauses=where_clauses - ) - - -class PgDBInvalidOrderByClause(PgDBException, TypeError): - """ - Raised when trying to select on table with invalid - ORDER BY clause provided - """ - - def __init__(self, order_by): - super().__init__( - "Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values (ordering field name and direction)", - order_by=order_by - ) - - -class PgDB: +class PgDB(DB): """ PostgreSQL client """ + _host = None + _user = None + _pwd = None + _db = None + date_format = '%Y-%m-%d' datetime_format = '%Y-%m-%d %H:%M:%S' - def __init__(self, host, user, pwd, db, just_try=False): + def __init__(self, host, user, pwd, db, **kwargs): self._host = host self._user = user self._pwd = pwd self._db = db - self._conn = None - self.just_try = just_try + super().__init__(**kwargs) def connect(self, exit_on_error=True): """ Connect to PostgreSQL server """ @@ -110,7 +52,7 @@ class PgDB: if exit_on_error: sys.exit(1) else: - raise PgDBFailToConnect(self._host, self._user, self._db) from err + raise DBFailToConnect(f'{self._user}@{self._host}:{self._db}') from err return True def close(self): @@ -132,32 +74,6 @@ class PgDB: ) return False - @staticmethod - def _log_query(sql, params): - log.debug( - 'Run SQL query "%s" %s', - sql, - "with params = {0}".format( # pylint: disable=consider-using-f-string - ', '.join([ - f'{key} = {value}' - for key, value in params.items() - ]) if params else "without params" - ) - ) - - @staticmethod - def _log_query_exception(sql, params): - log.exception( - 'Error during SQL query "%s" %s', - sql, - "with params = {0}".format( # pylint: disable=consider-using-f-string - ', '.join([ - f'{key} = {value}' - for key, value in params.items() - ]) if params else "without params" - ) - ) - def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) @@ -213,234 +129,6 @@ class PgDB: for idx, field in enumerate(fields) ) - # - # SQL helpers - # - - @staticmethod - def format_param(param): - """ Format SQL query parameter for prepared query """ - return f'%({param})s' - - @classmethod - def _combine_params(cls, params, to_add=None, **kwargs): - if to_add: - assert isinstance(to_add, dict), "to_add must be a dict or None" - params = cls._combine_params(params, **to_add) - - for param, value in kwargs.items(): - if param in params: - raise PgDBDuplicatedSQLParameter(param) - params[param] = value - return params - - @classmethod - def _format_where_clauses(cls, where_clauses, params=None, where_op=None): - """ - Format WHERE clauses - - :param where_clauses: The WHERE clauses. Could be: - - a raw SQL WHERE clause as string - - a tuple of two elements: a raw WHERE clause and its parameters as dict - - a dict of WHERE clauses with field name as key and WHERE clause value as value - - a list of any of previous valid WHERE clauses - :param params: Dict of other already set SQL query parameters (optional) - :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) - - :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success - :rtype: string, bool - """ - if params is None: - params = {} - if where_op is None: - where_op = 'AND' - - if isinstance(where_clauses, str): - return (where_clauses, params) - - if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict): - cls._combine_params(params, where_clauses[1]) - return (where_clauses[0], params) - - if isinstance(where_clauses, (list, tuple)): - sql_where_clauses = [] - for where_clause in where_clauses: - sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) - sql_where_clauses.append(sql2) - return ( - f' {where_op} '.join(sql_where_clauses), - params - ) - - if isinstance(where_clauses, dict): - sql_where_clauses = [] - for field, value in where_clauses.items(): - param = field - if field in params: - idx = 1 - while param in params: - param = f'{field}_{idx}' - idx += 1 - cls._combine_params(params, {param: value}) - sql_where_clauses.append( - f'"{field}" = {cls.format_param(param)}' - ) - return ( - f' {where_op} '.join(sql_where_clauses), - params - ) - raise PgDBUnsupportedWHEREClauses(where_clauses) - - @classmethod - def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): - """ - Add WHERE clauses to an SQL query - - :param sql: The SQL query to complete - :param params: The dict of parameters of the SQL query to complete - :param where_clauses: The WHERE clause (see _format_where_clauses()) - :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) - - :return: - :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters - """ - if where_clauses: - sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op) - sql += " WHERE " + sql_where - return (sql, params) - - @staticmethod - def _quote_table_name(table): - """ Quote table name """ - return '"{0}"'.format( # pylint: disable=consider-using-f-string - '"."'.join( - table.split('.') - ) - ) - - def insert(self, table, values, just_try=False): - """ Run INSERT SQL query """ - # pylint: disable=consider-using-f-string - sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format( - self._quote_table_name(table), - '", "'.join(values.keys()), - ", ".join([ - self.format_param(key) - for key in values - ]) - ) - - if just_try: - log.debug("Just-try mode: execute INSERT query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=values): - log.error("Fail to execute INSERT query (SQL: %s)", sql) - return False - return True - - def update(self, table, values, where_clauses, where_op=None, just_try=False): - """ Run UPDATE SQL query """ - # pylint: disable=consider-using-f-string - sql = 'UPDATE {0} SET {1}'.format( - self._quote_table_name(table), - ", ".join([ - f'"{key}" = {self.format_param(key)}' - for key in values - ]) - ) - params = values - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if just_try: - log.debug("Just-try mode: execute UPDATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL: %s)", sql) - return False - return True - - def delete(self, table, where_clauses, where_op='AND', just_try=False): - """ Run DELETE SQL query """ - sql = f'DELETE FROM {self._quote_table_name(table)}' - params = {} - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if just_try: - log.debug("Just-try mode: execute UPDATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL: %s)", sql) - return False - return True - - def truncate(self, table, just_try=False): - """ Run TRUNCATE SQL query """ - sql = f'TRUNCATE TABLE {self._quote_table_name(table)}' - - if just_try: - log.debug("Just-try mode: execute TRUNCATE query: %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql): - log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) - return False - return True - - def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False): - """ Run SELECT SQL query """ - sql = "SELECT " - if fields is None: - sql += "*" - elif isinstance(fields, str): - sql += f'"{fields}"' - else: - sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string - - sql += f' FROM {self._quote_table_name(table)}' - params = {} - - try: - sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) - except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): - log.error('Fail to add WHERE clauses', exc_info=True) - return False - - if order_by: - if isinstance(order_by, str): - sql += f' ORDER BY {order_by}' - elif ( - isinstance(order_by, (list, tuple)) and len(order_by) == 2 - and isinstance(order_by[0], str) - and isinstance(order_by[1], str) - and order_by[1].upper() in ('ASC', 'UPPER') - ): - sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' - else: - raise PgDBInvalidOrderByClause(order_by) - - if just_try: - log.debug("Just-try mode: execute SELECT query : %s", sql) - return just_try - - return self.doSelect(sql, params=params) - # # Depreated helpers # diff --git a/tests/test_mysql.py b/tests/test_mysql.py new file mode 100644 index 0000000..889071f --- /dev/null +++ b/tests/test_mysql.py @@ -0,0 +1,455 @@ +# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access +""" Tests on opening hours helpers """ + +import pytest + +from MySQLdb._exceptions import Error + +from mylib.mysql import MyDB + + +class FakeMySQLdbCursor: + """ Fake MySQLdb cursor """ + + def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception): + self.expected_sql = expected_sql + self.expected_params = expected_params + self.expected_return = expected_return + self.expected_just_try = expected_just_try + self.expected_exception = expected_exception + + def execute(self, sql, params=None): + if self.expected_exception: + raise Error(f'{self}.execute({sql}, {params}): expected exception') + if self.expected_just_try and not sql.lower().startswith('select '): + assert False, f'{self}.execute({sql}, {params}) may not be executed in just try mode' + # pylint: disable=consider-using-f-string + assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql) + # pylint: disable=consider-using-f-string + assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params) + return self.expected_return + + @property + def description(self): + assert self.expected_return + assert isinstance(self.expected_return, list) + assert isinstance(self.expected_return[0], dict) + return [(field, 1, 2, 3) for field in self.expected_return[0].keys()] + + def fetchall(self): + if isinstance(self.expected_return, list): + return ( + list(row.values()) + if isinstance(row, dict) else row + for row in self.expected_return + ) + return self.expected_return + + def __repr__(self): + return ( + f'FakeMySQLdbCursor({self.expected_sql}, {self.expected_params}, ' + f'{self.expected_return}, {self.expected_just_try})' + ) + + +class FakeMySQLdb: + """ Fake MySQLdb connection """ + + expected_sql = None + expected_params = None + expected_return = True + expected_just_try = False + expected_exception = False + just_try = False + + def __init__(self, **kwargs): + allowed_kwargs = dict(db=str, user=str, passwd=(str, None), host=str, charset=str, use_unicode=bool) + for arg, value in kwargs.items(): + assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"' + assert isinstance(value, allowed_kwargs[arg]), \ + f'Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})' + setattr(self, arg, value) + + def close(self): + return self.expected_return + + def cursor(self): + return FakeMySQLdbCursor( + self.expected_sql, self.expected_params, + self.expected_return, self.expected_just_try or self.just_try, + self.expected_exception + ) + + def commit(self): + self._check_just_try() + return self.expected_return + + def rollback(self): + self._check_just_try() + return self.expected_return + + def _check_just_try(self): + if self.just_try: + assert False, "May not be executed in just try mode" + + +def fake_mysqldb_connect(**kwargs): + return FakeMySQLdb(**kwargs) + + +def fake_mysqldb_connect_just_try(**kwargs): + con = FakeMySQLdb(**kwargs) + con.just_try = True + return con + + +@pytest.fixture +def test_mydb(): + return MyDB('127.0.0.1', 'user', 'password', 'dbname') + + +@pytest.fixture +def fake_mydb(mocker): + mocker.patch('MySQLdb.connect', fake_mysqldb_connect) + return MyDB('127.0.0.1', 'user', 'password', 'dbname') + + +@pytest.fixture +def fake_just_try_mydb(mocker): + mocker.patch('MySQLdb.connect', fake_mysqldb_connect_just_try) + return MyDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True) + + +@pytest.fixture +def fake_connected_mydb(fake_mydb): + fake_mydb.connect() + return fake_mydb + + +@pytest.fixture +def fake_connected_just_try_mydb(fake_just_try_mydb): + fake_just_try_mydb.connect() + return fake_just_try_mydb + + +def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value + def mock_args(*args, **kwargs): + # pylint: disable=consider-using-f-string + assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) + # pylint: disable=consider-using-f-string + assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs) + return expected_return + return mock_args + + +def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument + assert False, "doSQL() may not be executed in just try mode" + + +def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value + def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument + # pylint: disable=consider-using-f-string + assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) + # pylint: disable=consider-using-f-string + assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params) + return expected_return + return mock_doSQL + + +# MyDB.doSelect() have same expected parameters as MyDB.doSQL() +generate_mock_doSelect = generate_mock_doSQL +mock_doSelect_just_try = mock_doSQL_just_try + +# +# Test on MyDB helper methods +# + + +def test_combine_params_with_to_add_parameter(): + assert MyDB._combine_params(dict(test1=1), dict(test2=2)) == dict( + test1=1, test2=2 + ) + + +def test_combine_params_with_kargs(): + assert MyDB._combine_params(dict(test1=1), test2=2) == dict( + test1=1, test2=2 + ) + + +def test_combine_params_with_kargs_and_to_add_parameter(): + assert MyDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict( + test1=1, test2=2, test3=3 + ) + + +def test_format_where_clauses_params_are_preserved(): + args = ('test = test', dict(test1=1)) + assert MyDB._format_where_clauses(*args) == args + + +def test_format_where_clauses_raw(): + assert MyDB._format_where_clauses('test = test') == (('test = test'), {}) + + +def test_format_where_clauses_tuple_clause_with_params(): + where_clauses = ( + 'test1 = %(test1)s AND test2 = %(test2)s', + dict(test1=1, test2=2) + ) + assert MyDB._format_where_clauses(where_clauses) == where_clauses + + +def test_format_where_clauses_dict(): + where_clauses = dict(test1=1, test2=2) + assert MyDB._format_where_clauses(where_clauses) == ( + '`test1` = %(test1)s AND `test2` = %(test2)s', + where_clauses + ) + + +def test_format_where_clauses_combined_types(): + where_clauses = ( + 'test1 = 1', + ('test2 LIKE %(test2)s', dict(test2=2)), + dict(test3=3, test4=4) + ) + assert MyDB._format_where_clauses(where_clauses) == ( + 'test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s', + dict(test2=2, test3=3, test4=4) + ) + + +def test_format_where_clauses_with_where_op(): + where_clauses = dict(test1=1, test2=2) + assert MyDB._format_where_clauses(where_clauses, where_op='OR') == ( + '`test1` = %(test1)s OR `test2` = %(test2)s', + where_clauses + ) + + +def test_add_where_clauses(): + sql = "SELECT * FROM table" + where_clauses = dict(test1=1, test2=2) + assert MyDB._add_where_clauses(sql, None, where_clauses) == ( + sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s', + where_clauses + ) + + +def test_add_where_clauses_preserved_params(): + sql = "SELECT * FROM table" + where_clauses = dict(test1=1, test2=2) + params = dict(fake1=1) + assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == ( + sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s', + dict(**where_clauses, **params) + ) + + +def test_add_where_clauses_with_op(): + sql = "SELECT * FROM table" + where_clauses = ('test1=1', 'test2=2') + assert MyDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( + sql + ' WHERE test1=1 OR test2=2', + {} + ) + + +def test_add_where_clauses_with_duplicated_field(): + sql = "UPDATE table SET test1=%(test1)s" + params = dict(test1='new_value') + where_clauses = dict(test1='where_value') + assert MyDB._add_where_clauses(sql, params, where_clauses) == ( + sql + ' WHERE `test1` = %(test1_1)s', + dict(test1='new_value', test1_1='where_value') + ) + + +def test_quote_table_name(): + assert MyDB._quote_table_name("mytable") == '`mytable`' + assert MyDB._quote_table_name("myschema.mytable") == '`myschema`.`mytable`' + + +def test_insert(mocker, test_mydb): + values = dict(test1=1, test2=2) + mocker.patch( + 'mylib.mysql.MyDB.doSQL', + generate_mock_doSQL( + 'INSERT INTO `mytable` (`test1`, `test2`) VALUES (%(test1)s, %(test2)s)', + values + ) + ) + + assert test_mydb.insert('mytable', values) + + +def test_insert_just_try(mocker, test_mydb): + mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) + assert test_mydb.insert('mytable', dict(test1=1, test2=2), just_try=True) + + +def test_update(mocker, test_mydb): + values = dict(test1=1, test2=2) + where_clauses = dict(test3=3, test4=4) + mocker.patch( + 'mylib.mysql.MyDB.doSQL', + generate_mock_doSQL( + 'UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` = %(test3)s AND `test4` = %(test4)s', + dict(**values, **where_clauses) + ) + ) + + assert test_mydb.update('mytable', values, where_clauses) + + +def test_update_just_try(mocker, test_mydb): + mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) + assert test_mydb.update('mytable', dict(test1=1, test2=2), None, just_try=True) + + +def test_delete(mocker, test_mydb): + where_clauses = dict(test1=1, test2=2) + mocker.patch( + 'mylib.mysql.MyDB.doSQL', + generate_mock_doSQL( + 'DELETE FROM `mytable` WHERE `test1` = %(test1)s AND `test2` = %(test2)s', + where_clauses + ) + ) + + assert test_mydb.delete('mytable', where_clauses) + + +def test_delete_just_try(mocker, test_mydb): + mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) + assert test_mydb.delete('mytable', None, just_try=True) + + +def test_truncate(mocker, test_mydb): + mocker.patch( + 'mylib.mysql.MyDB.doSQL', + generate_mock_doSQL('TRUNCATE TABLE `mytable`', None) + ) + + assert test_mydb.truncate('mytable') + + +def test_truncate_just_try(mocker, test_mydb): + mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try) + assert test_mydb.truncate('mytable', just_try=True) + + +def test_select(mocker, test_mydb): + fields = ('field1', 'field2') + where_clauses = dict(test3=3, test4=4) + expected_return = [ + dict(field1=1, field2=2), + dict(field1=2, field2=3), + ] + order_by = "field1, DESC" + mocker.patch( + 'mylib.mysql.MyDB.doSelect', + generate_mock_doSQL( + 'SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` = %(test4)s ORDER BY ' + order_by, + where_clauses, expected_return + ) + ) + + assert test_mydb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return + + +def test_select_without_field_and_order_by(mocker, test_mydb): + mocker.patch( + 'mylib.mysql.MyDB.doSelect', + generate_mock_doSQL( + 'SELECT * FROM `mytable`' + ) + ) + + assert test_mydb.select('mytable') + + +def test_select_just_try(mocker, test_mydb): + mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try) + assert test_mydb.select('mytable', None, None, just_try=True) + +# +# Tests on main methods +# + + +def test_connect(mocker, test_mydb): + expected_kwargs = dict( + db=test_mydb._db, + user=test_mydb._user, + host=test_mydb._host, + passwd=test_mydb._pwd, + charset=test_mydb._charset, + use_unicode=True, + ) + + mocker.patch( + 'MySQLdb.connect', + generate_mock_args( + expected_kwargs=expected_kwargs + ) + ) + + assert test_mydb.connect() + + +def test_close(fake_mydb): + assert fake_mydb.close() is None + + +def test_close_connected(fake_connected_mydb): + assert fake_connected_mydb.close() is None + + +def test_doSQL(fake_connected_mydb): + fake_connected_mydb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s' + fake_connected_mydb._conn.expected_params = dict(test1=1) + fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params) + + +def test_doSQL_without_params(fake_connected_mydb): + fake_connected_mydb._conn.expected_sql = 'DELETE FROM table' + fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql) + + +def test_doSQL_just_try(fake_connected_just_try_mydb): + assert fake_connected_just_try_mydb.doSQL('DELETE FROM table') + + +def test_doSQL_on_exception(fake_connected_mydb): + fake_connected_mydb._conn.expected_exception = True + assert fake_connected_mydb.doSQL('DELETE FROM table') is False + + +def test_doSelect(fake_connected_mydb): + fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_mydb._conn.expected_params = dict(test1=1) + fake_connected_mydb._conn.expected_return = [dict(test1=1)] + assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params) == fake_connected_mydb._conn.expected_return + + +def test_doSelect_without_params(fake_connected_mydb): + fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table' + fake_connected_mydb._conn.expected_return = [dict(test1=1)] + assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql) == fake_connected_mydb._conn.expected_return + + +def test_doSelect_on_exception(fake_connected_mydb): + fake_connected_mydb._conn.expected_exception = True + assert fake_connected_mydb.doSelect('SELECT * FROM table') is False + + +def test_doSelect_just_try(fake_connected_just_try_mydb): + fake_connected_just_try_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_just_try_mydb._conn.expected_params = dict(test1=1) + fake_connected_just_try_mydb._conn.expected_return = [dict(test1=1)] + assert fake_connected_just_try_mydb.doSelect( + fake_connected_just_try_mydb._conn.expected_sql, + fake_connected_just_try_mydb._conn.expected_params + ) == fake_connected_just_try_mydb._conn.expected_return