456 lines
14 KiB
Python
456 lines
14 KiB
Python
|
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
||
|
""" Tests on opening hours helpers """
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
from MySQLdb._exceptions import Error
|
||
|
|
||
|
from mylib.mysql import MyDB
|
||
|
|
||
|
|
||
|
class FakeMySQLdbCursor:
|
||
|
""" Fake MySQLdb cursor """
|
||
|
|
||
|
def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception):
|
||
|
self.expected_sql = expected_sql
|
||
|
self.expected_params = expected_params
|
||
|
self.expected_return = expected_return
|
||
|
self.expected_just_try = expected_just_try
|
||
|
self.expected_exception = expected_exception
|
||
|
|
||
|
def execute(self, sql, params=None):
|
||
|
if self.expected_exception:
|
||
|
raise Error(f'{self}.execute({sql}, {params}): expected exception')
|
||
|
if self.expected_just_try and not sql.lower().startswith('select '):
|
||
|
assert False, f'{self}.execute({sql}, {params}) may not be executed in just try mode'
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params)
|
||
|
return self.expected_return
|
||
|
|
||
|
@property
|
||
|
def description(self):
|
||
|
assert self.expected_return
|
||
|
assert isinstance(self.expected_return, list)
|
||
|
assert isinstance(self.expected_return[0], dict)
|
||
|
return [(field, 1, 2, 3) for field in self.expected_return[0].keys()]
|
||
|
|
||
|
def fetchall(self):
|
||
|
if isinstance(self.expected_return, list):
|
||
|
return (
|
||
|
list(row.values())
|
||
|
if isinstance(row, dict) else row
|
||
|
for row in self.expected_return
|
||
|
)
|
||
|
return self.expected_return
|
||
|
|
||
|
def __repr__(self):
|
||
|
return (
|
||
|
f'FakeMySQLdbCursor({self.expected_sql}, {self.expected_params}, '
|
||
|
f'{self.expected_return}, {self.expected_just_try})'
|
||
|
)
|
||
|
|
||
|
|
||
|
class FakeMySQLdb:
|
||
|
""" Fake MySQLdb connection """
|
||
|
|
||
|
expected_sql = None
|
||
|
expected_params = None
|
||
|
expected_return = True
|
||
|
expected_just_try = False
|
||
|
expected_exception = False
|
||
|
just_try = False
|
||
|
|
||
|
def __init__(self, **kwargs):
|
||
|
allowed_kwargs = dict(db=str, user=str, passwd=(str, None), host=str, charset=str, use_unicode=bool)
|
||
|
for arg, value in kwargs.items():
|
||
|
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
|
||
|
assert isinstance(value, allowed_kwargs[arg]), \
|
||
|
f'Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})'
|
||
|
setattr(self, arg, value)
|
||
|
|
||
|
def close(self):
|
||
|
return self.expected_return
|
||
|
|
||
|
def cursor(self):
|
||
|
return FakeMySQLdbCursor(
|
||
|
self.expected_sql, self.expected_params,
|
||
|
self.expected_return, self.expected_just_try or self.just_try,
|
||
|
self.expected_exception
|
||
|
)
|
||
|
|
||
|
def commit(self):
|
||
|
self._check_just_try()
|
||
|
return self.expected_return
|
||
|
|
||
|
def rollback(self):
|
||
|
self._check_just_try()
|
||
|
return self.expected_return
|
||
|
|
||
|
def _check_just_try(self):
|
||
|
if self.just_try:
|
||
|
assert False, "May not be executed in just try mode"
|
||
|
|
||
|
|
||
|
def fake_mysqldb_connect(**kwargs):
|
||
|
return FakeMySQLdb(**kwargs)
|
||
|
|
||
|
|
||
|
def fake_mysqldb_connect_just_try(**kwargs):
|
||
|
con = FakeMySQLdb(**kwargs)
|
||
|
con.just_try = True
|
||
|
return con
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def test_mydb():
|
||
|
return MyDB('127.0.0.1', 'user', 'password', 'dbname')
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def fake_mydb(mocker):
|
||
|
mocker.patch('MySQLdb.connect', fake_mysqldb_connect)
|
||
|
return MyDB('127.0.0.1', 'user', 'password', 'dbname')
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def fake_just_try_mydb(mocker):
|
||
|
mocker.patch('MySQLdb.connect', fake_mysqldb_connect_just_try)
|
||
|
return MyDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True)
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def fake_connected_mydb(fake_mydb):
|
||
|
fake_mydb.connect()
|
||
|
return fake_mydb
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def fake_connected_just_try_mydb(fake_just_try_mydb):
|
||
|
fake_just_try_mydb.connect()
|
||
|
return fake_just_try_mydb
|
||
|
|
||
|
|
||
|
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
|
||
|
def mock_args(*args, **kwargs):
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
||
|
return expected_return
|
||
|
return mock_args
|
||
|
|
||
|
|
||
|
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
|
||
|
assert False, "doSQL() may not be executed in just try mode"
|
||
|
|
||
|
|
||
|
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
|
||
|
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
||
|
# pylint: disable=consider-using-f-string
|
||
|
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
||
|
return expected_return
|
||
|
return mock_doSQL
|
||
|
|
||
|
|
||
|
# MyDB.doSelect() have same expected parameters as MyDB.doSQL()
|
||
|
generate_mock_doSelect = generate_mock_doSQL
|
||
|
mock_doSelect_just_try = mock_doSQL_just_try
|
||
|
|
||
|
#
|
||
|
# Test on MyDB helper methods
|
||
|
#
|
||
|
|
||
|
|
||
|
def test_combine_params_with_to_add_parameter():
|
||
|
assert MyDB._combine_params(dict(test1=1), dict(test2=2)) == dict(
|
||
|
test1=1, test2=2
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_combine_params_with_kargs():
|
||
|
assert MyDB._combine_params(dict(test1=1), test2=2) == dict(
|
||
|
test1=1, test2=2
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_combine_params_with_kargs_and_to_add_parameter():
|
||
|
assert MyDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
|
||
|
test1=1, test2=2, test3=3
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_params_are_preserved():
|
||
|
args = ('test = test', dict(test1=1))
|
||
|
assert MyDB._format_where_clauses(*args) == args
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_raw():
|
||
|
assert MyDB._format_where_clauses('test = test') == (('test = test'), {})
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_tuple_clause_with_params():
|
||
|
where_clauses = (
|
||
|
'test1 = %(test1)s AND test2 = %(test2)s',
|
||
|
dict(test1=1, test2=2)
|
||
|
)
|
||
|
assert MyDB._format_where_clauses(where_clauses) == where_clauses
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_dict():
|
||
|
where_clauses = dict(test1=1, test2=2)
|
||
|
assert MyDB._format_where_clauses(where_clauses) == (
|
||
|
'`test1` = %(test1)s AND `test2` = %(test2)s',
|
||
|
where_clauses
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_combined_types():
|
||
|
where_clauses = (
|
||
|
'test1 = 1',
|
||
|
('test2 LIKE %(test2)s', dict(test2=2)),
|
||
|
dict(test3=3, test4=4)
|
||
|
)
|
||
|
assert MyDB._format_where_clauses(where_clauses) == (
|
||
|
'test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s',
|
||
|
dict(test2=2, test3=3, test4=4)
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_format_where_clauses_with_where_op():
|
||
|
where_clauses = dict(test1=1, test2=2)
|
||
|
assert MyDB._format_where_clauses(where_clauses, where_op='OR') == (
|
||
|
'`test1` = %(test1)s OR `test2` = %(test2)s',
|
||
|
where_clauses
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_add_where_clauses():
|
||
|
sql = "SELECT * FROM table"
|
||
|
where_clauses = dict(test1=1, test2=2)
|
||
|
assert MyDB._add_where_clauses(sql, None, where_clauses) == (
|
||
|
sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s',
|
||
|
where_clauses
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_add_where_clauses_preserved_params():
|
||
|
sql = "SELECT * FROM table"
|
||
|
where_clauses = dict(test1=1, test2=2)
|
||
|
params = dict(fake1=1)
|
||
|
assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == (
|
||
|
sql + ' WHERE `test1` = %(test1)s AND `test2` = %(test2)s',
|
||
|
dict(**where_clauses, **params)
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_add_where_clauses_with_op():
|
||
|
sql = "SELECT * FROM table"
|
||
|
where_clauses = ('test1=1', 'test2=2')
|
||
|
assert MyDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
||
|
sql + ' WHERE test1=1 OR test2=2',
|
||
|
{}
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_add_where_clauses_with_duplicated_field():
|
||
|
sql = "UPDATE table SET test1=%(test1)s"
|
||
|
params = dict(test1='new_value')
|
||
|
where_clauses = dict(test1='where_value')
|
||
|
assert MyDB._add_where_clauses(sql, params, where_clauses) == (
|
||
|
sql + ' WHERE `test1` = %(test1_1)s',
|
||
|
dict(test1='new_value', test1_1='where_value')
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_quote_table_name():
|
||
|
assert MyDB._quote_table_name("mytable") == '`mytable`'
|
||
|
assert MyDB._quote_table_name("myschema.mytable") == '`myschema`.`mytable`'
|
||
|
|
||
|
|
||
|
def test_insert(mocker, test_mydb):
|
||
|
values = dict(test1=1, test2=2)
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSQL',
|
||
|
generate_mock_doSQL(
|
||
|
'INSERT INTO `mytable` (`test1`, `test2`) VALUES (%(test1)s, %(test2)s)',
|
||
|
values
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.insert('mytable', values)
|
||
|
|
||
|
|
||
|
def test_insert_just_try(mocker, test_mydb):
|
||
|
mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try)
|
||
|
assert test_mydb.insert('mytable', dict(test1=1, test2=2), just_try=True)
|
||
|
|
||
|
|
||
|
def test_update(mocker, test_mydb):
|
||
|
values = dict(test1=1, test2=2)
|
||
|
where_clauses = dict(test3=3, test4=4)
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSQL',
|
||
|
generate_mock_doSQL(
|
||
|
'UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` = %(test3)s AND `test4` = %(test4)s',
|
||
|
dict(**values, **where_clauses)
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.update('mytable', values, where_clauses)
|
||
|
|
||
|
|
||
|
def test_update_just_try(mocker, test_mydb):
|
||
|
mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try)
|
||
|
assert test_mydb.update('mytable', dict(test1=1, test2=2), None, just_try=True)
|
||
|
|
||
|
|
||
|
def test_delete(mocker, test_mydb):
|
||
|
where_clauses = dict(test1=1, test2=2)
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSQL',
|
||
|
generate_mock_doSQL(
|
||
|
'DELETE FROM `mytable` WHERE `test1` = %(test1)s AND `test2` = %(test2)s',
|
||
|
where_clauses
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.delete('mytable', where_clauses)
|
||
|
|
||
|
|
||
|
def test_delete_just_try(mocker, test_mydb):
|
||
|
mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSQL_just_try)
|
||
|
assert test_mydb.delete('mytable', None, just_try=True)
|
||
|
|
||
|
|
||
|
def test_truncate(mocker, test_mydb):
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSQL',
|
||
|
generate_mock_doSQL('TRUNCATE TABLE `mytable`', None)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.truncate('mytable')
|
||
|
|
||
|
|
||
|
def test_truncate_just_try(mocker, test_mydb):
|
||
|
mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try)
|
||
|
assert test_mydb.truncate('mytable', just_try=True)
|
||
|
|
||
|
|
||
|
def test_select(mocker, test_mydb):
|
||
|
fields = ('field1', 'field2')
|
||
|
where_clauses = dict(test3=3, test4=4)
|
||
|
expected_return = [
|
||
|
dict(field1=1, field2=2),
|
||
|
dict(field1=2, field2=3),
|
||
|
]
|
||
|
order_by = "field1, DESC"
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSelect',
|
||
|
generate_mock_doSQL(
|
||
|
'SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` = %(test4)s ORDER BY ' + order_by,
|
||
|
where_clauses, expected_return
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return
|
||
|
|
||
|
|
||
|
def test_select_without_field_and_order_by(mocker, test_mydb):
|
||
|
mocker.patch(
|
||
|
'mylib.mysql.MyDB.doSelect',
|
||
|
generate_mock_doSQL(
|
||
|
'SELECT * FROM `mytable`'
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.select('mytable')
|
||
|
|
||
|
|
||
|
def test_select_just_try(mocker, test_mydb):
|
||
|
mocker.patch('mylib.mysql.MyDB.doSQL', mock_doSelect_just_try)
|
||
|
assert test_mydb.select('mytable', None, None, just_try=True)
|
||
|
|
||
|
#
|
||
|
# Tests on main methods
|
||
|
#
|
||
|
|
||
|
|
||
|
def test_connect(mocker, test_mydb):
|
||
|
expected_kwargs = dict(
|
||
|
db=test_mydb._db,
|
||
|
user=test_mydb._user,
|
||
|
host=test_mydb._host,
|
||
|
passwd=test_mydb._pwd,
|
||
|
charset=test_mydb._charset,
|
||
|
use_unicode=True,
|
||
|
)
|
||
|
|
||
|
mocker.patch(
|
||
|
'MySQLdb.connect',
|
||
|
generate_mock_args(
|
||
|
expected_kwargs=expected_kwargs
|
||
|
)
|
||
|
)
|
||
|
|
||
|
assert test_mydb.connect()
|
||
|
|
||
|
|
||
|
def test_close(fake_mydb):
|
||
|
assert fake_mydb.close() is None
|
||
|
|
||
|
|
||
|
def test_close_connected(fake_connected_mydb):
|
||
|
assert fake_connected_mydb.close() is None
|
||
|
|
||
|
|
||
|
def test_doSQL(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s'
|
||
|
fake_connected_mydb._conn.expected_params = dict(test1=1)
|
||
|
fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params)
|
||
|
|
||
|
|
||
|
def test_doSQL_without_params(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_sql = 'DELETE FROM table'
|
||
|
fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql)
|
||
|
|
||
|
|
||
|
def test_doSQL_just_try(fake_connected_just_try_mydb):
|
||
|
assert fake_connected_just_try_mydb.doSQL('DELETE FROM table')
|
||
|
|
||
|
|
||
|
def test_doSQL_on_exception(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_exception = True
|
||
|
assert fake_connected_mydb.doSQL('DELETE FROM table') is False
|
||
|
|
||
|
|
||
|
def test_doSelect(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
|
||
|
fake_connected_mydb._conn.expected_params = dict(test1=1)
|
||
|
fake_connected_mydb._conn.expected_return = [dict(test1=1)]
|
||
|
assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params) == fake_connected_mydb._conn.expected_return
|
||
|
|
||
|
|
||
|
def test_doSelect_without_params(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_sql = 'SELECT * FROM table'
|
||
|
fake_connected_mydb._conn.expected_return = [dict(test1=1)]
|
||
|
assert fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql) == fake_connected_mydb._conn.expected_return
|
||
|
|
||
|
|
||
|
def test_doSelect_on_exception(fake_connected_mydb):
|
||
|
fake_connected_mydb._conn.expected_exception = True
|
||
|
assert fake_connected_mydb.doSelect('SELECT * FROM table') is False
|
||
|
|
||
|
|
||
|
def test_doSelect_just_try(fake_connected_just_try_mydb):
|
||
|
fake_connected_just_try_mydb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
|
||
|
fake_connected_just_try_mydb._conn.expected_params = dict(test1=1)
|
||
|
fake_connected_just_try_mydb._conn.expected_return = [dict(test1=1)]
|
||
|
assert fake_connected_just_try_mydb.doSelect(
|
||
|
fake_connected_just_try_mydb._conn.expected_sql,
|
||
|
fake_connected_just_try_mydb._conn.expected_params
|
||
|
) == fake_connected_just_try_mydb._conn.expected_return
|