# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access """ Tests on opening hours helpers """ import pytest from MySQLdb._exceptions import Error from mylib.mysql import MyDB class FakeMySQLdbCursor: """Fake MySQLdb cursor""" def __init__( self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception ): self.expected_sql = expected_sql self.expected_params = expected_params self.expected_return = expected_return self.expected_just_try = expected_just_try self.expected_exception = expected_exception def execute(self, sql, params=None): if self.expected_exception: raise Error(f"{self}.execute({sql}, {params}): expected exception") if self.expected_just_try and not sql.lower().startswith("select "): assert False, f"{self}.execute({sql}, {params}) may not be executed in just try mode" # pylint: disable=consider-using-f-string assert ( sql == self.expected_sql ), "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % ( self, sql, self.expected_sql, ) # pylint: disable=consider-using-f-string assert ( params == self.expected_params ), "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % ( self, params, self.expected_params, ) return self.expected_return @property def description(self): assert self.expected_return assert isinstance(self.expected_return, list) assert isinstance(self.expected_return[0], dict) return [(field, 1, 2, 3) for field in self.expected_return[0].keys()] def fetchall(self): if isinstance(self.expected_return, list): return ( list(row.values()) if isinstance(row, dict) else row for row in self.expected_return ) return self.expected_return def __repr__(self): return ( f"FakeMySQLdbCursor({self.expected_sql}, {self.expected_params}, " f"{self.expected_return}, {self.expected_just_try})" ) class FakeMySQLdb: """Fake MySQLdb connection""" expected_sql = None expected_params = None expected_return = True expected_just_try = False expected_exception = False just_try = False def __init__(self, **kwargs): allowed_kwargs = { "db": str, "user": str, "passwd": (str, None), "host": str, "charset": str, "use_unicode": bool, } for arg, value in kwargs.items(): assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"' assert isinstance( value, allowed_kwargs[arg] ), f"Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})" setattr(self, arg, value) def close(self): return self.expected_return def cursor(self): return FakeMySQLdbCursor( self.expected_sql, self.expected_params, self.expected_return, self.expected_just_try or self.just_try, self.expected_exception, ) def commit(self): self._check_just_try() return self.expected_return def rollback(self): self._check_just_try() return self.expected_return def _check_just_try(self): if self.just_try: assert False, "May not be executed in just try mode" def fake_mysqldb_connect(**kwargs): return FakeMySQLdb(**kwargs) def fake_mysqldb_connect_just_try(**kwargs): con = FakeMySQLdb(**kwargs) con.just_try = True return con @pytest.fixture def test_mydb(): return MyDB("127.0.0.1", "user", "password", "dbname") @pytest.fixture def fake_mydb(mocker): mocker.patch("MySQLdb.connect", fake_mysqldb_connect) return MyDB("127.0.0.1", "user", "password", "dbname") @pytest.fixture def fake_just_try_mydb(mocker): mocker.patch("MySQLdb.connect", fake_mysqldb_connect_just_try) return MyDB("127.0.0.1", "user", "password", "dbname", just_try=True) @pytest.fixture def fake_connected_mydb(fake_mydb): fake_mydb.connect() return fake_mydb @pytest.fixture def fake_connected_just_try_mydb(fake_just_try_mydb): fake_just_try_mydb.connect() return fake_just_try_mydb def generate_mock_args( expected_args=(), expected_kwargs={}, expected_return=True ): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): # pylint: disable=consider-using-f-string assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % ( args, expected_args, ) # pylint: disable=consider-using-f-string assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % ( kwargs, expected_kwargs, ) return expected_return return mock_args def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument assert False, "doSQL() may not be executed in just try mode" def generate_mock_doSQL( expected_sql, expected_params={}, expected_return=True ): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument # pylint: disable=consider-using-f-string assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % ( sql, expected_sql, ) # pylint: disable=consider-using-f-string assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % ( params, expected_params, ) return expected_return return mock_doSQL # MyDB.doSelect() have same expected parameters as MyDB.doSQL() generate_mock_doSelect = generate_mock_doSQL mock_doSelect_just_try = mock_doSQL_just_try # # Test on MyDB helper methods # def test_combine_params_with_to_add_parameter(): assert MyDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2} def test_combine_params_with_kargs(): assert MyDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2} def test_combine_params_with_kargs_and_to_add_parameter(): assert MyDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == { "test1": 1, "test2": 2, "test3": 3, } def test_format_where_clauses_params_are_preserved(): args = ("test = test", {"test1": 1}) assert MyDB._format_where_clauses(*args) == args def test_format_where_clauses_raw(): assert MyDB._format_where_clauses("test = test") == ("test = test", {}) def test_format_where_clauses_tuple_clause_with_params(): where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2}) assert MyDB._format_where_clauses(where_clauses) == where_clauses def test_format_where_clauses_with_list_as_value(): assert MyDB._format_where_clauses({"test": [1, 2]}) == ( "`test` IN (%(test_0)s, %(test_1)s)", {"test_0": 1, "test_1": 2}, ) def test_format_where_clauses_dict(): where_clauses = {"test1": 1, "test2": 2} assert MyDB._format_where_clauses(where_clauses) == ( "`test1` = %(test1)s AND `test2` = %(test2)s", where_clauses, ) def test_format_where_clauses_combined_types(): where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4}) assert MyDB._format_where_clauses(where_clauses) == ( "test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s", {"test2": 2, "test3": 3, "test4": 4}, ) def test_format_where_clauses_with_where_op(): where_clauses = {"test1": 1, "test2": 2} assert MyDB._format_where_clauses(where_clauses, where_op="OR") == ( "`test1` = %(test1)s OR `test2` = %(test2)s", where_clauses, ) def test_add_where_clauses(): sql = "SELECT * FROM table" where_clauses = {"test1": 1, "test2": 2} assert MyDB._add_where_clauses(sql, None, where_clauses) == ( sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s", where_clauses, ) def test_add_where_clauses_preserved_params(): sql = "SELECT * FROM table" where_clauses = {"test1": 1, "test2": 2} params = {"fake1": 1} assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == ( sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s", {**where_clauses, **params}, ) def test_add_where_clauses_with_op(): sql = "SELECT * FROM table" where_clauses = ("test1=1", "test2=2") assert MyDB._add_where_clauses(sql, None, where_clauses, where_op="OR") == ( sql + " WHERE test1=1 OR test2=2", {}, ) def test_add_where_clauses_with_duplicated_field(): sql = "UPDATE table SET test1=%(test1)s" params = {"test1": "new_value"} where_clauses = {"test1": "where_value"} assert MyDB._add_where_clauses(sql, params, where_clauses) == ( sql + " WHERE `test1` = %(test1_1)s", {"test1": "new_value", "test1_1": "where_value"}, ) def test_quote_table_name(): assert MyDB._quote_table_name("mytable") == "`mytable`" assert MyDB._quote_table_name("myschema.mytable") == "`myschema`.`mytable`" def test_insert(mocker, test_mydb): values = {"test1": 1, "test2": 2} mocker.patch( "mylib.mysql.MyDB.doSQL", generate_mock_doSQL( "INSERT INTO `mytable` (`test1`, `test2`) VALUES (%(test1)s, %(test2)s)", values ), ) assert test_mydb.insert("mytable", values) def test_insert_just_try(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try) assert test_mydb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True) def test_update(mocker, test_mydb): values = {"test1": 1, "test2": 2} where_clauses = {"test3": 3, "test4": 4} mocker.patch( "mylib.mysql.MyDB.doSQL", generate_mock_doSQL( "UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` =" " %(test3)s AND `test4` = %(test4)s", {**values, **where_clauses}, ), ) assert test_mydb.update("mytable", values, where_clauses) def test_update_just_try(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try) assert test_mydb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True) def test_delete(mocker, test_mydb): where_clauses = {"test1": 1, "test2": 2} mocker.patch( "mylib.mysql.MyDB.doSQL", generate_mock_doSQL( "DELETE FROM `mytable` WHERE `test1` = %(test1)s AND `test2` = %(test2)s", where_clauses ), ) assert test_mydb.delete("mytable", where_clauses) def test_delete_just_try(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try) assert test_mydb.delete("mytable", None, just_try=True) def test_truncate(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", generate_mock_doSQL("TRUNCATE TABLE `mytable`", None)) assert test_mydb.truncate("mytable") def test_truncate_just_try(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSelect_just_try) assert test_mydb.truncate("mytable", just_try=True) def test_select(mocker, test_mydb): fields = ("field1", "field2") where_clauses = {"test3": 3, "test4": 4} expected_return = [ {"field1": 1, "field2": 2}, {"field1": 2, "field2": 3}, ] order_by = "field1, DESC" limit = 10 mocker.patch( "mylib.mysql.MyDB.doSelect", generate_mock_doSQL( "SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` =" " %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608 where_clauses, expected_return, ), ) assert ( test_mydb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit) == expected_return ) def test_select_without_field_and_order_by(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSelect", generate_mock_doSQL("SELECT * FROM `mytable`")) assert test_mydb.select("mytable") def test_select_just_try(mocker, test_mydb): mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSelect_just_try) assert test_mydb.select("mytable", None, None, just_try=True) # # Tests on main methods # def test_connect(mocker, test_mydb): expected_kwargs = { "db": test_mydb._db, "user": test_mydb._user, "host": test_mydb._host, "passwd": test_mydb._pwd, "charset": test_mydb._charset, "use_unicode": True, } mocker.patch("MySQLdb.connect", generate_mock_args(expected_kwargs=expected_kwargs)) assert test_mydb.connect() def test_close(fake_mydb): assert fake_mydb.close() is None def test_close_connected(fake_connected_mydb): assert fake_connected_mydb.close() is None def test_doSQL(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s" fake_connected_mydb._conn.expected_params = {"test1": 1} fake_connected_mydb.doSQL( fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params ) def test_doSQL_without_params(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = "DELETE FROM table" fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql) def test_doSQL_just_try(fake_connected_just_try_mydb): assert fake_connected_just_try_mydb.doSQL("DELETE FROM table") def test_doSQL_on_exception(fake_connected_mydb): fake_connected_mydb._conn.expected_exception = True assert fake_connected_mydb.doSQL("DELETE FROM table") is False def test_doSelect(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s" fake_connected_mydb._conn.expected_params = {"test1": 1} fake_connected_mydb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_mydb.doSelect( fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params ) == fake_connected_mydb._conn.expected_return ) def test_doSelect_without_params(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = "SELECT * FROM table" fake_connected_mydb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql) == fake_connected_mydb._conn.expected_return ) def test_doSelect_on_exception(fake_connected_mydb): fake_connected_mydb._conn.expected_exception = True assert fake_connected_mydb.doSelect("SELECT * FROM table") is False def test_doSelect_just_try(fake_connected_just_try_mydb): fake_connected_just_try_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s" fake_connected_just_try_mydb._conn.expected_params = {"test1": 1} fake_connected_just_try_mydb._conn.expected_return = [{"test1": 1}] assert ( fake_connected_just_try_mydb.doSelect( fake_connected_just_try_mydb._conn.expected_sql, fake_connected_just_try_mydb._conn.expected_params, ) == fake_connected_just_try_mydb._conn.expected_return )