# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access """ Tests on opening hours helpers """ import pytest from MySQLdb._exceptions import Error from mylib.mysql import MyDB class FakeMySQLdbCursor: """ Fake MySQLdb cursor """ def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception): self.expected_sql = expected_sql self.expected_params = expected_params self.expected_return = expected_return self.expected_just_try = expected_just_try self.expected_exception = expected_exception def execute(self, sql, params=None): if self.expected_exception: raise Error(f'{self}.execute({sql}, {params}): expected exception') if self.expected_just_try and not sql.lower().startswith('select '): assert False, f'{self}.execute({sql}, {params}) may not be executed in just try mode' # pylint: disable=consider-using-f-string assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql) # pylint: disable=consider-using-f-string assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params) return self.expected_return @property def description(self): assert self.expected_return assert isinstance(self.expected_return, list) assert isinstance(self.expected_return[0], dict) return [(field, 1, 2, 3) for field in self.expected_return[0].keys()] def fetchall(self): if isinstance(self.expected_return, list): return ( list(row.values()) if isinstance(row, dict) else row for row in self.expected_return ) return self.expected_return def __repr__(self): return ( f'FakeMySQLdbCursor({self.expected_sql}, {self.expected_params}, ' f'{self.expected_return}, {self.expected_just_try})' ) class FakeMySQLdb: """ Fake MySQLdb connection """ expected_sql = None expected_params = None expected_return = True expected_just_try = False expected_exception = False just_try = False def __init__(self, **kwargs): allowed_kwargs = dict(db=str, user=str, passwd=(str, None), host=str, charset=str, use_unicode=bool) for arg, value in kwargs.items(): assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"' assert isinstance(value, allowed_kwargs[arg]), \ f'Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})' setattr(self, arg, value) def close(self): return self.expected_return def cursor(self): return FakeMySQLdbCursor( self.expected_sql, self.expected_params, self.expected_return, self.expected_just_try or self.just_try, self.expected_exception ) def commit(self): self._check_just_try() return self.expected_return def rollback(self): self._check_just_try() return self.expected_return def _check_just_try(self): if self.just_try: assert False, "May not be executed in just try mode" def fake_mysqldb_connect(**kwargs): return FakeMySQLdb(**kwargs) def fake_mysqldb_connect_just_try(**kwargs): con = FakeMySQLdb(**kwargs) con.just_try = True return con @pytest.fixture def test_mydb(): return MyDB('127.0.0.1', 'user', 'password', 'dbname') @pytest.fixture def fake_mydb(mocker): mocker.patch('MySQLdb.connect', fake_mysqldb_connect) return MyDB('127.0.0.1', 'user', 'password', 'dbname') @pytest.fixture def fake_just_try_mydb(mocker): mocker.patch('MySQLdb.connect', fake_mysqldb_connect_just_try) return MyDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True) @pytest.fixture def fake_connected_mydb(fake_mydb): fake_mydb.connect() return fake_mydb @pytest.fixture def fake_connected_just_try_mydb(fake_just_try_mydb): fake_just_try_mydb.connect() return fake_just_try_mydb def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): # pylint: disable=consider-using-f-string assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) # pylint: disable=consider-using-f-string assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs) return expected_return return mock_args def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument assert False, "doSQL() may not be executed in just try mode" def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument # pylint: disable=consider-using-f-string assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) # pylint: disable=consider-using-f-string assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params) return expected_return return mock_doSQL # MyDB.doSelect() have same expected parameters as MyDB.doSQL() generate_mock_doSelect = generate_mock_doSQL mock_doSelect_just_try = mock_doSQL_just_try # # Test on MyDB helper methods # def test_combine_params_with_to_add_parameter(): assert MyDB._combine_params(dict(test1=1), dict(test2=2)) == dict( test1=1, test2=2 ) def test_combine_params_with_kargs(): assert MyDB._combine_params(dict(test1=1), test2=2) == dict( test1=1, test2=2 ) def test_combine_params_with_kargs_and_to_add_parameter(): assert MyDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict( test1=1, test2=2, test3=3 ) def test_format_where_clauses_params_are_preserved(): args = ('test = test', dict(test1=1)) assert MyDB._format_where_clauses(*args) == args def test_format_where_clauses_raw(): assert MyDB._format_where_clauses('test = test') == (('test = test'), {}) def test_format_where_clauses_tuple_clause_with_params(): where_clauses = ( 'test1 = %(test1)s AND test2 = %(test2)s', dict(test1=1, test2=2) ) assert MyDB._format_where_clauses(where_clauses) == where_clauses def test_format_where_clauses_dict(): where_clauses = dict(test1=1, test2=2) assert MyDB._format_where_clauses(where_clauses) == ( '`test1` = %(test1)s AND `test2` = %(test2)s', where_clauses ) def test_format_where_clauses_combined_types(): where_clauses = ( 'test1 = 1', ('test2 LIKE %(test2)s', dict(test2=2)), dict(test3=3, test4=4) ) assert MyDB._format_where_clauses(where_clauses) == ( 'test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s', dict(test2=2, test3=3, test4=4) ) def test_format_where_clauses_with_where_op(): where_clauses = dict(test1=1, test2=2) assert MyDB._format_where_clauses(where_clauses, where_op='OR') == ( '`test1` = %(test1)s OR `test2` = %(test2)s', where_clauses ) def test_add_where_clauses(): sql = "SELECT * FROM table" where_clauses = dict(test1=1, test2=2) assert MyDB._add_where_clauses(sql, None, where_clauses) == ( sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s', where_clauses ) def test_add_where_clauses_preserved_params(): sql = "SELECT * FROM table" where_clauses = dict(test1=1, test2=2) params = dict(fake1=1) assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == ( sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s', dict(**where_clauses, **params) ) def test_add_where_clauses_with_op(): sql = "SELECT * FROM table" where_clauses = ('test1=1', 'test2=2') assert MyDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( sql + ' WHERE test1=1 OR test2=2', {} ) def test_add_where_clauses_with_duplicated_field(): sql = "UPDATE table SET test1=%(test1)s" params = dict(test1='new_value') where_clauses = dict(test1='where_value') assert MyDB._add_where_clauses(sql, params, where_clauses) == ( sql + ' WHERE `test1` = %(test1_1)s', dict(test1='new_value', test1_1='where_value') ) def test_quote_table_name(): assert MyDB._quote_table_name("mytable") == '`mytable`' assert MyDB._quote_table_name("myschema.mytable") == '`myschema`.`mytable`' def test_insert(mocker, test_mydb): values = dict(test1=1, test2=2) mocker.patch( 'mylib.mysql.MyDB.doSQL', generate_mock_doSQL( 'INSERT INTO `mytable` (`test1`, `test2`) VALUES (%(test1)s, %(test2)s)', values ) ) assert test_mydb.insert('mytable', values) def test_insert_just_try(mocker, test_mydb): mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) assert test_mydb.insert('mytable', dict(test1=1, test2=2), just_try=True) def test_update(mocker, test_mydb): values = dict(test1=1, test2=2) where_clauses = dict(test3=3, test4=4) mocker.patch( 'mylib.mysql.MyDB.doSQL', generate_mock_doSQL( 'UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` = %(test3)s AND `test4` = %(test4)s', dict(**values, **where_clauses) ) ) assert test_mydb.update('mytable', values, where_clauses) def test_update_just_try(mocker, test_mydb): mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) assert test_mydb.update('mytable', dict(test1=1, test2=2), None, just_try=True) def test_delete(mocker, test_mydb): where_clauses = dict(test1=1, test2=2) mocker.patch( 'mylib.mysql.MyDB.doSQL', generate_mock_doSQL( 'DELETE FROM `mytable` WHERE `test1` = %(test1)s AND `test2` = %(test2)s', where_clauses ) ) assert test_mydb.delete('mytable', where_clauses) def test_delete_just_try(mocker, test_mydb): mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try) assert test_mydb.delete('mytable', None, just_try=True) def test_truncate(mocker, test_mydb): mocker.patch( 'mylib.mysql.MyDB.doSQL', generate_mock_doSQL('TRUNCATE TABLE `mytable`', None) ) assert test_mydb.truncate('mytable') def test_truncate_just_try(mocker, test_mydb): mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try) assert test_mydb.truncate('mytable', just_try=True) def test_select(mocker, test_mydb): fields = ('field1', 'field2') where_clauses = dict(test3=3, test4=4) expected_return = [ dict(field1=1, field2=2), dict(field1=2, field2=3), ] order_by = "field1, DESC" mocker.patch( 'mylib.mysql.MyDB.doSelect', generate_mock_doSQL( 'SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` = %(test4)s ORDER BY ' + order_by, where_clauses, expected_return ) ) assert test_mydb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return def test_select_without_field_and_order_by(mocker, test_mydb): mocker.patch( 'mylib.mysql.MyDB.doSelect', generate_mock_doSQL( 'SELECT * FROM `mytable`' ) ) assert test_mydb.select('mytable') def test_select_just_try(mocker, test_mydb): mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try) assert test_mydb.select('mytable', None, None, just_try=True) # # Tests on main methods # def test_connect(mocker, test_mydb): expected_kwargs = dict( db=test_mydb._db, user=test_mydb._user, host=test_mydb._host, passwd=test_mydb._pwd, charset=test_mydb._charset, use_unicode=True, ) mocker.patch( 'MySQLdb.connect', generate_mock_args( expected_kwargs=expected_kwargs ) ) assert test_mydb.connect() def test_close(fake_mydb): assert fake_mydb.close() is None def test_close_connected(fake_connected_mydb): assert fake_connected_mydb.close() is None def test_doSQL(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s' fake_connected_mydb._conn.expected_params = dict(test1=1) fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params) def test_doSQL_without_params(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = 'DELETE FROM table' fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql) def test_doSQL_just_try(fake_connected_just_try_mydb): assert fake_connected_just_try_mydb.doSQL('DELETE FROM table') def test_doSQL_on_exception(fake_connected_mydb): fake_connected_mydb._conn.expected_exception = True assert fake_connected_mydb.doSQL('DELETE FROM table') is False def test_doSelect(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' fake_connected_mydb._conn.expected_params = dict(test1=1) fake_connected_mydb._conn.expected_return = [dict(test1=1)] assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params) == fake_connected_mydb._conn.expected_return def test_doSelect_without_params(fake_connected_mydb): fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table' fake_connected_mydb._conn.expected_return = [dict(test1=1)] assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql) == fake_connected_mydb._conn.expected_return def test_doSelect_on_exception(fake_connected_mydb): fake_connected_mydb._conn.expected_exception = True assert fake_connected_mydb.doSelect('SELECT * FROM table') is False def test_doSelect_just_try(fake_connected_just_try_mydb): fake_connected_just_try_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' fake_connected_just_try_mydb._conn.expected_params = dict(test1=1) fake_connected_just_try_mydb._conn.expected_return = [dict(test1=1)] assert fake_connected_just_try_mydb.doSelect( fake_connected_just_try_mydb._conn.expected_sql, fake_connected_just_try_mydb._conn.expected_params ) == fake_connected_just_try_mydb._conn.expected_return