# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access """ Tests on opening hours helpers """ import psycopg2 import pytest from psycopg2.extras import RealDictCursor from mylib.pgsql import PgDB class FakePsycopg2Cursor: """Fake Psycopg2 cursor""" def __init__( self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception ): self.expected_sql = expected_sql self.expected_params = expected_params self.expected_return = expected_return self.expected_just_try = expected_just_try self.expected_exception = expected_exception def execute(self, sql, params=None): if self.expected_exception: raise psycopg2.Error(f"{self}.execute({sql}, {params}): expected exception") if self.expected_just_try and not sql.lower().startswith("select "): assert False, f"{self}.execute({sql}, {params}) may not be executed in just try mode" # pylint: disable=consider-using-f-string assert ( sql == self.expected_sql ), "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % ( self, sql, self.expected_sql, ) # pylint: disable=consider-using-f-string assert ( params == self.expected_params ), "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % ( self, params, self.expected_params, ) return self.expected_return def fetchall(self): return self.expected_return def __repr__(self): return ( f"FakePsycopg2Cursor({self.expected_sql}, {self.expected_params}, " f"{self.expected_return}, {self.expected_just_try})" ) class FakePsycopg2: """Fake Psycopg2 connection""" expected_sql = None expected_params = None expected_cursor_factory = None expected_return = True expected_just_try = False expected_exception = False just_try = False def __init__(self, **kwargs): allowed_kwargs = {"dbname": str, "user": str, "password": (str, None), "host": str} for arg, value in kwargs.items(): assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"' assert isinstance( value, allowed_kwargs[arg] ), f"Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})" setattr(self, arg, value) def close(self): return self.expected_return def set_client_encoding(self, *arg): self._check_just_try() assert len(arg) == 1 and isinstance(arg[0], str) if self.expected_exception: raise psycopg2.Error(f"set_client_encoding({arg[0]}): Expected exception") return self.expected_return def cursor(self, cursor_factory=None): assert cursor_factory is self.expected_cursor_factory return FakePsycopg2Cursor( self.expected_sql, self.expected_params, self.expected_return, self.expected_just_try or self.just_try, self.expected_exception, ) def commit(self): self._check_just_try() return self.expected_return def rollback(self): self._check_just_try() return self.expected_return def _check_just_try(self): if self.just_try: assert False, "May not be executed in just try mode" def fake_psycopg2_connect(**kwargs): return FakePsycopg2(**kwargs) def fake_psycopg2_connect_just_try(**kwargs): con = FakePsycopg2(**kwargs) con.just_try = True return con @pytest.fixture def test_pgdb(): return PgDB("127.0.0.1", "user", "password", "dbname") @pytest.fixture def fake_pgdb(mocker): mocker.patch("psycopg2.connect", fake_psycopg2_connect) return PgDB("127.0.0.1", "user", "password", "dbname") @pytest.fixture def fake_just_try_pgdb(mocker): mocker.patch("psycopg2.connect", fake_psycopg2_connect_just_try) return PgDB("127.0.0.1", "user", "password", "dbname", just_try=True) @pytest.fixture def fake_connected_pgdb(fake_pgdb): fake_pgdb.connect() return fake_pgdb @pytest.fixture def fake_connected_just_try_pgdb(fake_just_try_pgdb): fake_just_try_pgdb.connect() return fake_just_try_pgdb def generate_mock_args( expected_args=(), expected_kwargs={}, expected_return=True ): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): # pylint: disable=consider-using-f-string assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % ( args, expected_args, ) # pylint: disable=consider-using-f-string assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % ( kwargs, expected_kwargs, ) return expected_return return mock_args def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument assert False, "doSQL() may not be executed in just try mode" def generate_mock_doSQL( expected_sql, expected_params={}, expected_return=True ): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument # pylint: disable=consider-using-f-string assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % ( sql, expected_sql, ) # pylint: disable=consider-using-f-string assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % ( params, expected_params, ) return expected_return return mock_doSQL # PgDB.doSelect() have same expected parameters as PgDB.doSQL() generate_mock_doSelect = generate_mock_doSQL mock_doSelect_just_try = mock_doSQL_just_try # # Test on PgDB helper methods # def test_combine_params_with_to_add_parameter(): assert PgDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2} def test_combine_params_with_kargs(): assert PgDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2} def test_combine_params_with_kargs_and_to_add_parameter(): assert PgDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == { "test1": 1, "test2": 2, "test3": 3, } def test_format_where_clauses_params_are_preserved(): args = ("test = test", {"test1": 1}) assert PgDB._format_where_clauses(*args) == args def test_format_where_clauses_raw(): assert PgDB._format_where_clauses("test = test") == ("test = test", {}) def test_format_where_clauses_tuple_clause_with_params(): where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2}) assert PgDB._format_where_clauses(where_clauses) == where_clauses def test_format_where_clauses_dict(): where_clauses = {"test1": 1, "test2": 2} assert PgDB._format_where_clauses(where_clauses) == ( '"test1" = %(test1)s AND "test2" = %(test2)s', where_clauses, ) def test_format_where_clauses_combined_types(): where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4}) assert PgDB._format_where_clauses(where_clauses) == ( 'test1 = 1 AND test2 LIKE %(test2)s AND "test3" = %(test3)s AND "test4" = %(test4)s', {"test2": 2, "test3": 3, "test4": 4}, ) def test_format_where_clauses_with_where_op(): where_clauses = {"test1": 1, "test2": 2} assert PgDB._format_where_clauses(where_clauses, where_op="OR") == ( '"test1" = %(test1)s OR "test2" = %(test2)s', where_clauses, ) def test_add_where_clauses(): sql = "SELECT * FROM table" where_clauses = {"test1": 1, "test2": 2} assert PgDB._add_where_clauses(sql, None, where_clauses) == ( sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s', where_clauses, ) def test_add_where_clauses_preserved_params(): sql = "SELECT * FROM table" where_clauses = {"test1": 1, "test2": 2} params = {"fake1": 1} assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == ( sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s', {**where_clauses, **params}, ) def test_add_where_clauses_with_op(): sql = "SELECT * FROM table" where_clauses = ("test1=1", "test2=2") assert PgDB._add_where_clauses(sql, None, where_clauses, where_op="OR") == ( sql + " WHERE test1=1 OR test2=2", {}, ) def test_add_where_clauses_with_duplicated_field(): sql = "UPDATE table SET test1=%(test1)s" params = {"test1": "new_value"} where_clauses = {"test1": "where_value"} assert PgDB._add_where_clauses(sql, params, where_clauses) == ( sql + ' WHERE "test1" = %(test1_1)s', {"test1": "new_value", "test1_1": "where_value"}, ) def test_quote_table_name(): assert PgDB._quote_table_name("mytable") == '"mytable"' assert PgDB._quote_table_name("myschema.mytable") == '"myschema"."mytable"' def test_insert(mocker, test_pgdb): values = {"test1": 1, "test2": 2} mocker.patch( "mylib.pgsql.PgDB.doSQL", generate_mock_doSQL( 'INSERT INTO "mytable" ("test1", "test2") VALUES (%(test1)s, %(test2)s)', values ), ) assert test_pgdb.insert("mytable", values) def test_insert_just_try(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try) assert test_pgdb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True) def test_update(mocker, test_pgdb): values = {"test1": 1, "test2": 2} where_clauses = {"test3": 3, "test4": 4} mocker.patch( "mylib.pgsql.PgDB.doSQL", generate_mock_doSQL( 'UPDATE "mytable" SET "test1" = %(test1)s, "test2" = %(test2)s WHERE "test3" =' ' %(test3)s AND "test4" = %(test4)s', {**values, **where_clauses}, ), ) assert test_pgdb.update("mytable", values, where_clauses) def test_update_just_try(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try) assert test_pgdb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True) def test_delete(mocker, test_pgdb): where_clauses = {"test1": 1, "test2": 2} mocker.patch( "mylib.pgsql.PgDB.doSQL", generate_mock_doSQL( 'DELETE FROM "mytable" WHERE "test1" = %(test1)s AND "test2" = %(test2)s', where_clauses ), ) assert test_pgdb.delete("mytable", where_clauses) def test_delete_just_try(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try) assert test_pgdb.delete("mytable", None, just_try=True) def test_truncate(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", generate_mock_doSQL('TRUNCATE TABLE "mytable"', None)) assert test_pgdb.truncate("mytable") def test_truncate_just_try(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSelect_just_try) assert test_pgdb.truncate("mytable", just_try=True) def test_select(mocker, test_pgdb): fields = ("field1", "field2") where_clauses = {"test3": 3, "test4": 4} expected_return = [ {"field1": 1, "field2": 2}, {"field1": 2, "field2": 3}, ] order_by = "field1, DESC" limit = 10 mocker.patch( "mylib.pgsql.PgDB.doSelect", generate_mock_doSQL( 'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = %(test3)s AND "test4" =' " %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608 where_clauses, expected_return, ), ) assert ( test_pgdb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit) == expected_return ) def test_select_without_field_and_order_by(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSelect", generate_mock_doSQL('SELECT * FROM "mytable"')) assert test_pgdb.select("mytable") def test_select_just_try(mocker, test_pgdb): mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSelect_just_try) assert test_pgdb.select("mytable", None, None, just_try=True) # # Tests on main methods # def test_connect(mocker, test_pgdb): expected_kwargs = { "dbname": test_pgdb._db, "user": test_pgdb._user, "host": test_pgdb._host, "password": test_pgdb._pwd, } mocker.patch("psycopg2.connect", generate_mock_args(expected_kwargs=expected_kwargs)) assert test_pgdb.connect() def test_close(fake_pgdb): assert fake_pgdb.close() is None def test_close_connected(fake_connected_pgdb): assert fake_connected_pgdb.close() is None def test_setEncoding(fake_connected_pgdb): assert fake_connected_pgdb.setEncoding("utf8") def test_setEncoding_not_connected(fake_pgdb): assert fake_pgdb.setEncoding("utf8") is False def test_setEncoding_on_exception(fake_connected_pgdb): fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.setEncoding("utf8") is False def test_doSQL(fake_connected_pgdb): fake_connected_pgdb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s" fake_connected_pgdb._conn.expected_params = {"test1": 1} fake_connected_pgdb.doSQL( fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params ) def test_doSQL_without_params(fake_connected_pgdb): fake_connected_pgdb._conn.expected_sql = "DELETE FROM table" fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql) def test_doSQL_just_try(fake_connected_just_try_pgdb): assert fake_connected_just_try_pgdb.doSQL("DELETE FROM table") def test_doSQL_on_exception(fake_connected_pgdb): fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.doSQL("DELETE FROM table") is False def test_doSelect(fake_connected_pgdb): fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s" fake_connected_pgdb._conn.expected_params = {"test1": 1} fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor fake_connected_pgdb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_pgdb.doSelect( fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params ) == fake_connected_pgdb._conn.expected_return ) def test_doSelect_without_params(fake_connected_pgdb): fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table" fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor fake_connected_pgdb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql) == fake_connected_pgdb._conn.expected_return ) def test_doSelect_on_exception(fake_connected_pgdb): fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.doSelect("SELECT * FROM table") is False def test_doSelect_just_try(fake_connected_just_try_pgdb): fake_connected_just_try_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s" fake_connected_just_try_pgdb._conn.expected_params = {"test1": 1} fake_connected_just_try_pgdb._conn.expected_cursor_factory = RealDictCursor fake_connected_just_try_pgdb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_just_try_pgdb.doSelect( fake_connected_just_try_pgdb._conn.expected_sql, fake_connected_just_try_pgdb._conn.expected_params, ) == fake_connected_just_try_pgdb._conn.expected_return )