From afe54819bbe4369d0491ee9fd8adfff13ef37312 Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Wed, 6 Oct 2021 21:33:25 +0200 Subject: [PATCH] Improve PgDB class and add unit tests --- mylib/pgsql.py | 405 ++++++++++++++++++++++++++++++++------------ tests/test_pgsql.py | 385 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 686 insertions(+), 104 deletions(-) create mode 100644 tests/test_pgsql.py diff --git a/mylib/pgsql.py b/mylib/pgsql.py index b0b2c09..9b7313f 100644 --- a/mylib/pgsql.py +++ b/mylib/pgsql.py @@ -11,6 +11,45 @@ import psycopg2 log = logging.getLogger(__name__) +# +# Exceptions +# + +class PgDBException(Exception): + """ That is the base exception class for all the other exceptions provided by this module. """ + + def __init__(self, error, *args, **kwargs): + for arg, value in kwargs.items(): + setattr(self, arg, value) + super().__init__(error.format(*args, **kwargs)) + + +class PgDBDuplicatedSQLParameter(PgDBException, KeyError): + """ + Raised when trying to set a SQL query parameter + and an other parameter with the same name is already set + """ + + def __init__(self, parameter_name): + super().__init__( + "Duplicated SQL parameter '{parameter_name}'", + parameter_name=parameter_name + ) + + +class PgDBUnsupportedWHEREClauses(PgDBException, TypeError): + """ + Raised when trying to execute query with unsupported + WHERE clauses provided + """ + + def __init__(self, where_clauses): + super().__init__( + "Unsupported WHERE clauses: {where_clauses}", + where_clauses=where_clauses + ) + + class PgDB: """ PostgreSQL client """ @@ -62,13 +101,24 @@ class PgDB: self.con.set_client_encoding(enc) return True except Exception: - log.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1) + log.error( + 'An error occured setting Postgresql database connection encoding to "%s"', + enc, exc_info=1 + ) return False def doSQL(self, sql, params=None): - """ Run SELECT SQL query and return result as dict """ + """ + Run SQL query and commit changes (rollback on error) + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: True on success, False otherwise + :rtype: bool + """ if self.just_try: - log.debug(u"Just-try mode : do not really execute SQL query '%s'", sql) + log.debug("Just-try mode : do not really execute SQL query '%s'", sql) return True cursor = self.con.cursor() @@ -80,134 +130,281 @@ class PgDB: self.con.commit() return True except Exception: - log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) + log.error( + 'Error during SQL request "%s" %s', + sql, + "with params = %s" % ', '.join([ + "%s = %s" % (key, value) + for key, value in params.items() + ]) if params else "without params", + exc_info=True + ) self.con.rollback() return False - def doSelect(self, sql, params): - """ Run SELECT SQL query and return result as dict """ + def doSelect(self, sql, params=None): + """ + Run SELECT SQL query and return list of selected rows as dict + + :param sql: The SQL query + :param params: The SQL query's parameters as dict (optional) + + :return: List of selected rows as dict on success, False otherwise + :rtype: list, bool + """ cursor = self.con.cursor() try: cursor.execute(sql, params) results = cursor.fetchall() return results except Exception: - log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) - return False + log.error( + 'Error during SQL request "%s" %s', + sql, + "with params = %s" % ', '.join([ + "%s = %s" % (key, value) + for key, value in params.items() + ]) if params else "without params", + exc_info=True + ) + return False + # # SQL helpers # - def _quote_value(self, value): + + @classmethod + def _combine_params(cls, params, to_add=None, **kwargs): + if to_add: + assert isinstance(to_add, dict), "to_add must be a dict or None" + params = cls._combine_params(params, **to_add) + + for param, value in kwargs.items(): + if param in params: + raise PgDBDuplicatedSQLParameter(param) + params[param] = value + return params + + @classmethod + def _format_where_clauses(cls, where_clauses, params=None, where_op=None): + """ + Format WHERE clauses + + :param where_clauses: The WHERE clauses. Could be: + - a raw SQL WHERE clause as string + - a tuple of two elements: a raw WHERE clause and its parameters as dict + - a dict of WHERE clauses with field name as key and WHERE clause value as value + - a list of any of previous valid WHERE clauses + :param params: Dict of other already set SQL query parameters (optional) + :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) + + :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success + :rtype: string, bool + """ + if params is None: + params = dict() + if where_op is None: + where_op = 'AND' + + if isinstance(where_clauses, str): + return (where_clauses, params) + + if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict): + cls._combine_params(params, where_clauses[1]) + return (where_clauses[0], params) + + if isinstance(where_clauses, (list, tuple)): + sql_where_clauses = [] + for where_clause in where_clauses: + sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) + sql_where_clauses.append(sql2) + return ( + (" %s " % where_op).join(sql_where_clauses), + params + ) + + if isinstance(where_clauses, dict): + cls._combine_params(params, where_clauses) + return ( + (" %s " % where_op).join([ + '"{0}" = %({0})s'.format(key) + for key in where_clauses + ]), + params + ) + raise PgDBUnsupportedWHEREClauses(where_clauses) + + @classmethod + def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): + """ + Add WHERE clauses to an SQL query + + :param sql: The SQL query to complete + :param params: The dict of parameters of the SQL query to complete + :param where_clauses: The WHERE clause (see _format_where_clauses()) + :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) + + :return: + :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters + """ + if where_clauses: + sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op) + sql += " WHERE " + sql_where + return (sql, params) + + + def insert(self, table, values, just_try=False): + """ Run INSERT SQL query """ + sql = 'INSERT INTO "%s" ("%s") VALUES (%s)' % ( + table, + '", "'.join(values.keys()), + ", ".join([ + '%({0})s'.format(key) + for key in values + ]) + ) + + if just_try: + log.debug("Just-try mode : execute INSERT query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=values): + log.error("Fail to execute INSERT query (SQL : %s)", sql) + return False + return True + + def update(self, table, values, where_clauses, where_op=None, just_try=False): + """ Run UPDATE SQL query """ + sql = 'UPDATE "%s" SET %s' % ( + table, + ", ".join([ + '"{0}" = %({0})s'.format(key) + for key in values + ]) + ) + params = values + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if just_try: + log.debug("Just-try mode : execute UPDATE query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=params): + log.error("Fail to execute UPDATE query (SQL : %s)", sql) + return False + return True + + def delete(self, table, where_clauses, where_op='AND', just_try=False): + """ Run DELETE SQL query """ + sql = 'DELETE FROM "%s"' % table + params = dict() + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if just_try: + log.debug("Just-try mode : execute UPDATE query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql, params=params): + log.error("Fail to execute UPDATE query (SQL : %s)", sql) + return False + return True + + def truncate(self, table, just_try=False): + """ Run TRUNCATE SQL query """ + + sql = 'TRUNCATE "%s"' % table + + if just_try: + log.debug("Just-try mode : execute TRUNCATE query : %s", sql) + return True + + log.debug(sql) + if not self.doSQL(sql): + log.error("Fail to execute TRUNCATE query (SQL : %s)", sql) + return False + return True + + def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False): + """ Run SELECT SQL query """ + sql = "SELECT " + if fields is None: + sql += "*" + elif isinstance(fields, str): + sql += '"%s"' % fields + else: + sql += '"' + '", "'.join(fields) + '"' + + sql += ' FROM "%s"' % table + params = dict() + + try: + sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) + except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): + log.error('Fail to add WHERE clauses', exc_info=True) + return False + + if order_by: + sql += ' ORDER BY %s' % order_by + + if just_try: + log.debug("Just-try mode : execute SELECT query : %s", sql) + return just_try + + return self.doSelect(sql, params=params) + + + # + # Depreated helpers + # + + @classmethod + def _quote_value(cls, value): """ Quote a value for SQL query """ + if value is None: + return 'NULL' + if isinstance(value, (int, float)): return str(value) if isinstance(value, datetime.datetime): - value = self._format_datetime(value) + value = cls._format_datetime(value) elif isinstance(value, datetime.date): - value = self._format_date(value) + value = cls._format_date(value) - return u"'%s'" % value.replace(u"'", u"''") + return "'%s'" % value.replace("'", "''") - def _format_where_clauses(self, where_clauses, where_op=u'AND'): - """ Format WHERE clauses """ - if isinstance(where_clauses, str): - return where_clauses - if isinstance(where_clauses, list): - return (u" %s " % where_op).join(where_clauses) - if isinstance(where_clauses, dict): - return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses)) - log.error('Unsupported where clauses type %s', type(where_clauses)) - return False - - def _format_datetime(self, value): + @classmethod + def _format_datetime(cls, value): """ Format datetime object as string """ assert isinstance(value, datetime.datetime) - return value.strftime(self.datetime_format) + return value.strftime(cls.datetime_format) - def _format_date(self, value): + @classmethod + def _format_date(cls, value): """ Format date object as string """ assert isinstance(value, (datetime.date, datetime.datetime)) - return value.strftime(self.date_format) + return value.strftime(cls.date_format) - def time2datetime(self, time): + @classmethod + def time2datetime(cls, time): """ Convert timestamp to datetime string """ - return self._format_datetime(datetime.datetime.fromtimestamp(int(time))) + return cls._format_datetime(datetime.datetime.fromtimestamp(int(time))) - def time2date(self, time): + @classmethod + def time2date(cls, time): """ Convert timestamp to date string """ - return self._format_date(datetime.date.fromtimestamp(int(time))) - - def insert(self, table, values, just_try=False): - """ Run INSERT SQL query """ - sql = u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values))) - - if just_try: - log.debug(u"Just-try mode : execute INSERT query : %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql): - log.error(u"Fail to execute INSERT query (SQL : %s)", sql) - return False - return True - - def update(self, table, values, where_clauses, where_op=u'AND', just_try=False): - """ Run UPDATE SQL query """ - where = self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql = u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where) - - if just_try: - log.debug(u"Just-try mode : execute UPDATE query : %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql): - log.error(u"Fail to execute UPDATE query (SQL : %s)", sql) - return False - return True - - def delete(self, table, where_clauses, where_op=u'AND', just_try=False): - """ Run DELETE SQL query """ - where = self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql = u"DELETE FROM %s WHERE %s" % (table, where) - - if just_try: - log.debug(u"Just-try mode : execute DELETE query : %s", sql) - return True - - log.debug(sql) - if not self.doSQL(sql): - log.error(u"Fail to execute DELETE query (SQL : %s)", sql) - return False - return True - - def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None): - """ Run SELECT SQL query """ - sql = u"SELECT " - if fields is None: - sql += "*" - elif isinstance(fields, str): - sql += fields - else: - sql += u", ".join(fields) - - sql += u" FROM " + table - if where_clauses: - where = self._format_where_clauses(where_clauses, where_op=where_op) - if not where: - return False - - sql += u" WHERE " + where - - if order_by: - sql += u"ORDER %s" % order_by - - return self.doSelect(sql) + return cls._format_date(datetime.date.fromtimestamp(int(time))) diff --git a/tests/test_pgsql.py b/tests/test_pgsql.py new file mode 100644 index 0000000..708739e --- /dev/null +++ b/tests/test_pgsql.py @@ -0,0 +1,385 @@ +""" Tests on opening hours helpers """ + +import pytest + +from mylib.pgsql import PgDB + +class FakePsycopg2Cursor: + + def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception): + self.expected_sql = expected_sql + self.expected_params = expected_params + self.expected_return = expected_return + self.expected_just_try = expected_just_try + self.expected_exception = expected_exception + + def execute(self, sql, params=None): + if self.expected_exception: + raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params)) + if self.expected_just_try and not sql.lower().startswith('select '): + assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params) + assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql) + assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params) + return self.expected_return + + def fetchall(self): + return self.expected_return + + def __repr__(self): + return "FakePsycopg2Cursor(%s, %s, %s, %s)" % ( + self.expected_sql, self.expected_params, + self.expected_return, self.expected_just_try + ) + + +class FakePsycopg2: + + expected_sql = None + expected_params = None + expected_return = True + expected_just_try = False + expected_exception = False + just_try = False + + def __init__(self, **kwargs): + allowed_kwargs = dict(dbname=str, user=str, password=(str, None), host=str) + for arg, value in kwargs.items(): + assert arg in allowed_kwargs, "Invalid arg %s='%s'" % (arg, value) + assert isinstance(value, allowed_kwargs[arg]), "Arg %s not a %s (%s)" % (arg, allowed_kwargs[arg], type(value)) + setattr(self, arg, value) + + def close(self): + return self.expected_return + + def set_client_encoding(self, *arg): + self._check_just_try() + assert len(arg) == 1 and isinstance(arg[0], str) + if self.expected_exception: + raise Exception("set_client_encoding(%s): Expected exception", arg[0]) + return self.expected_return + + def cursor(self): + return FakePsycopg2Cursor( + self.expected_sql, self.expected_params, + self.expected_return, self.expected_just_try or self.just_try, + self.expected_exception + ) + + def commit(self): + self._check_just_try() + return self.expected_return + + def rollback(self): + self._check_just_try() + return self.expected_return + + def _check_just_try(self): + if self.just_try: + assert false, "May not be executed in just try mode" + +def fake_psycopg2_connect(**kwargs): + return FakePsycopg2(**kwargs) + +def fake_psycopg2_connect_just_try(**kwargs): + con = FakePsycopg2(**kwargs) + con.just_try = True + return con + +@pytest.fixture +def test_pgdb(): + return PgDB('127.0.0.1', 'user', 'password', 'dbname') + +@pytest.fixture +def fake_pgdb(mocker): + mocker.patch('psycopg2.connect', fake_psycopg2_connect) + return PgDB('127.0.0.1', 'user', 'password', 'dbname') + +@pytest.fixture +def fake_just_try_pgdb(mocker): + mocker.patch('psycopg2.connect', fake_psycopg2_connect_just_try) + return PgDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True) + +@pytest.fixture +def fake_connected_pgdb(fake_pgdb): + fake_pgdb.connect() + return fake_pgdb + +@pytest.fixture +def fake_connected_just_try_pgdb(fake_just_try_pgdb): + fake_just_try_pgdb.connect() + return fake_just_try_pgdb + +def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): + def mock_args(*args, **kwargs): + assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) + assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs) + return expected_return + return mock_args + +def mock_doSQL_just_try(self, sql, params=None): + assert False, "doSQL() may not be executed in just try mode" + +def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): + def mock_doSQL(self, sql, params=None): + assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) + assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params) + return expected_return + return mock_doSQL + +# PgDB.doSelect() have same expected parameters as PgDB.doSQL() +generate_mock_doSelect = generate_mock_doSQL +mock_doSelect_just_try = mock_doSQL_just_try + +# +# Test on PgDB helper methods +# +def test_combine_params_with_to_add_parameter(): + assert PgDB._combine_params(dict(test1=1), dict(test2=2)) == dict( + test1=1, test2=2 + ) + +def test_combine_params_with_kargs(): + assert PgDB._combine_params(dict(test1=1), test2=2) == dict( + test1=1, test2=2 + ) + +def test_combine_params_with_kargs_and_to_add_parameter(): + assert PgDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict( + test1=1, test2=2, test3=3 + ) + +def test_format_where_clauses_params_are_preserved(): + args=('test = test', dict(test1=1)) + assert PgDB._format_where_clauses(*args) == args + +def test_format_where_clauses_raw(): + assert PgDB._format_where_clauses('test = test') == (('test = test'), dict()) + +def test_format_where_clauses_tuple_clause_with_params(): + where_clauses = ( + 'test1 = %(test1)s AND test2 = %(test2)s', + dict(test1=1, test2=2) + ) + assert PgDB._format_where_clauses(where_clauses) == where_clauses + +def test_format_where_clauses_dict(): + where_clauses = dict(test1=1, test2=2) + assert PgDB._format_where_clauses(where_clauses) == ( + '"test1" = %(test1)s AND "test2" = %(test2)s', + where_clauses + ) + +def test_format_where_clauses_combined_types(): + where_clauses = ( + 'test1 = 1', + ('test2 LIKE %(test2)s', dict(test2=2)), + dict(test3=3, test4=4) + ) + assert PgDB._format_where_clauses(where_clauses) == ( + 'test1 = 1 AND test2 LIKE %(test2)s AND "test3" = %(test3)s AND "test4" = %(test4)s', + dict(test2=2, test3=3, test4=4) + ) + +def test_format_where_clauses_with_where_op(): + where_clauses = dict(test1=1, test2=2) + assert PgDB._format_where_clauses(where_clauses, where_op='OR') == ( + '"test1" = %(test1)s OR "test2" = %(test2)s', + where_clauses + ) + +def test_add_where_clauses(): + sql="SELECT * FROM table" + where_clauses = dict(test1=1, test2=2) + assert PgDB._add_where_clauses(sql, None, where_clauses) == ( + sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s', + where_clauses + ) + +def test_add_where_clauses_preserved_params(): + sql="SELECT * FROM table" + where_clauses = dict(test1=1, test2=2) + params = dict(fake1=1) + assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == ( + sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s', + dict(**where_clauses, **params) + ) + +def test_add_where_clauses_with_op(): + sql="SELECT * FROM table" + where_clauses = ('test1=1', 'test2=2') + assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( + sql + ' WHERE test1=1 OR test2=2', + dict() + ) + +def test_insert(mocker, test_pgdb): + values = dict(test1=1, test2=2) + mocker.patch( + 'mylib.pgsql.PgDB.doSQL', + generate_mock_doSQL( + 'INSERT INTO "mytable" ("test1", "test2") VALUES (%(test1)s, %(test2)s)', + values + ) + ) + + assert test_pgdb.insert('mytable', values) + +def test_insert_just_try(mocker, test_pgdb): + mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) + assert test_pgdb.insert('mytable', dict(test1=1, test2=2), just_try=True) + +def test_update(mocker, test_pgdb): + values = dict(test1=1, test2=2) + where_clauses = dict(test3=3, test4=4) + mocker.patch( + 'mylib.pgsql.PgDB.doSQL', + generate_mock_doSQL( + 'UPDATE "mytable" SET "test1" = %(test1)s, "test2" = %(test2)s WHERE "test3" = %(test3)s AND "test4" = %(test4)s', + dict(**values, **where_clauses) + ) + ) + + assert test_pgdb.update('mytable', values, where_clauses) + +def test_update_just_try(mocker, test_pgdb): + mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) + assert test_pgdb.update('mytable', dict(test1=1, test2=2), None, just_try=True) + +def test_delete(mocker, test_pgdb): + where_clauses = dict(test1=1, test2=2) + mocker.patch( + 'mylib.pgsql.PgDB.doSQL', + generate_mock_doSQL( + 'DELETE FROM "mytable" WHERE "test1" = %(test1)s AND "test2" = %(test2)s', + where_clauses + ) + ) + + assert test_pgdb.delete('mytable', where_clauses) + +def test_delete_just_try(mocker, test_pgdb): + mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) + assert test_pgdb.delete('mytable', None, just_try=True) + +def test_truncate(mocker, test_pgdb): + mocker.patch( + 'mylib.pgsql.PgDB.doSQL', + generate_mock_doSQL('TRUNCATE "mytable"', None) + ) + + assert test_pgdb.truncate('mytable') + +def test_truncate_just_try(mocker, test_pgdb): + mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try) + assert test_pgdb.truncate('mytable', just_try=True) + +def test_select(mocker, test_pgdb): + fields = ('field1', 'field2') + where_clauses = dict(test3=3, test4=4) + expected_return = [ + dict(field1=1, field2=2), + dict(field1=2, field2=3), + ] + order_by = "field1, DESC" + mocker.patch( + 'mylib.pgsql.PgDB.doSelect', + generate_mock_doSQL( + 'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = %(test3)s AND "test4" = %(test4)s ORDER BY ' + order_by, + where_clauses, expected_return + ) + ) + + assert test_pgdb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return + +def test_select_without_field_and_order_by(mocker, test_pgdb): + mocker.patch( + 'mylib.pgsql.PgDB.doSelect', + generate_mock_doSQL( + 'SELECT * FROM "mytable"' + ) + ) + + assert test_pgdb.select('mytable') + +def test_select_just_try(mocker, test_pgdb): + mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try) + assert test_pgdb.select('mytable', None, None, just_try=True) + +# +# Tests on main methods +# + +def test_connect(mocker, test_pgdb): + expected_kwargs = dict( + dbname=test_pgdb.db, + user=test_pgdb.user, + host=test_pgdb.host, + password=test_pgdb.pwd + ) + def mock_psycopg2_connect(**kwargs): + assert expected_kwargs == kwargs + + mocker.patch( + 'psycopg2.connect', + generate_mock_args( + expected_kwargs=expected_kwargs + ) + ) + + assert test_pgdb.connect() + +def test_close(fake_pgdb): + assert fake_pgdb.close() is None + +def test_close_connected(fake_connected_pgdb): + assert fake_connected_pgdb.close() is None + +def test_setEncoding(fake_connected_pgdb): + assert fake_connected_pgdb.setEncoding('utf8') + +def test_setEncoding_not_connected(fake_pgdb): + assert fake_pgdb.setEncoding('utf8') is False + +def test_setEncoding_on_exception(fake_connected_pgdb): + fake_connected_pgdb.con.expected_exception = True + assert fake_connected_pgdb.setEncoding('utf8') is False + +def test_doSQL(fake_connected_pgdb): + fake_connected_pgdb.con.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s' + fake_connected_pgdb.con.expected_params = dict(test1=1) + fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params) + +def test_doSQL_without_params(fake_connected_pgdb): + fake_connected_pgdb.con.expected_sql = 'DELETE FROM table' + fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql) + +def test_doSQL_just_try(fake_connected_just_try_pgdb): + assert fake_connected_just_try_pgdb.doSQL('DELETE FROM table') + +def test_doSQL_on_exception(fake_connected_pgdb): + fake_connected_pgdb.con.expected_exception = True + assert fake_connected_pgdb.doSQL('DELETE FROM table') is False + +def test_doSelect(fake_connected_pgdb): + fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_pgdb.con.expected_params = dict(test1=1) + fake_connected_pgdb.con.expected_return = [dict(test1=1)] + assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params) == fake_connected_pgdb.con.expected_return + +def test_doSelect_without_params(fake_connected_pgdb): + fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table' + fake_connected_pgdb.con.expected_return = [dict(test1=1)] + assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql) == fake_connected_pgdb.con.expected_return + +def test_doSelect_on_exception(fake_connected_pgdb): + fake_connected_pgdb.con.expected_exception = True + assert fake_connected_pgdb.doSelect('SELECT * FROM table') is False + +def test_doSelect_just_try(fake_connected_just_try_pgdb): + fake_connected_just_try_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_just_try_pgdb.con.expected_params = dict(test1=1) + fake_connected_just_try_pgdb.con.expected_return = [dict(test1=1)] + assert fake_connected_just_try_pgdb.doSelect( + fake_connected_just_try_pgdb.con.expected_sql, + fake_connected_just_try_pgdb.con.expected_params + ) == fake_connected_just_try_pgdb.con.expected_return