python-mylib/tests/test_mysql.py

499 lines
15 KiB
Python
Raw Permalink Normal View History

# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
""" Tests on opening hours helpers """
import pytest
from MySQLdb._exceptions import Error
from mylib.mysql import MyDB
class FakeMySQLdbCursor:
"""Fake MySQLdb cursor"""
def __init__(
self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception
):
self.expected_sql = expected_sql
self.expected_params = expected_params
self.expected_return = expected_return
self.expected_just_try = expected_just_try
self.expected_exception = expected_exception
def execute(self, sql, params=None):
if self.expected_exception:
raise Error(f"{self}.execute({sql}, {params}): expected exception")
if self.expected_just_try and not sql.lower().startswith("select "):
assert False, f"{self}.execute({sql}, {params}) may not be executed in just try mode"
# pylint: disable=consider-using-f-string
assert (
sql == self.expected_sql
), "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (
self,
sql,
self.expected_sql,
)
# pylint: disable=consider-using-f-string
assert (
params == self.expected_params
), "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (
self,
params,
self.expected_params,
)
return self.expected_return
@property
def description(self):
assert self.expected_return
assert isinstance(self.expected_return, list)
assert isinstance(self.expected_return[0], dict)
return [(field, 1, 2, 3) for field in self.expected_return[0].keys()]
def fetchall(self):
if isinstance(self.expected_return, list):
return (
list(row.values()) if isinstance(row, dict) else row for row in self.expected_return
)
return self.expected_return
def __repr__(self):
return (
f"FakeMySQLdbCursor({self.expected_sql}, {self.expected_params}, "
f"{self.expected_return}, {self.expected_just_try})"
)
class FakeMySQLdb:
"""Fake MySQLdb connection"""
expected_sql = None
expected_params = None
expected_return = True
expected_just_try = False
expected_exception = False
just_try = False
def __init__(self, **kwargs):
allowed_kwargs = {
"db": str,
"user": str,
"passwd": (str, None),
"host": str,
"charset": str,
"use_unicode": bool,
}
for arg, value in kwargs.items():
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
assert isinstance(
value, allowed_kwargs[arg]
), f"Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})"
setattr(self, arg, value)
def close(self):
return self.expected_return
def cursor(self):
return FakeMySQLdbCursor(
self.expected_sql,
self.expected_params,
self.expected_return,
self.expected_just_try or self.just_try,
self.expected_exception,
)
def commit(self):
self._check_just_try()
return self.expected_return
def rollback(self):
self._check_just_try()
return self.expected_return
def _check_just_try(self):
if self.just_try:
assert False, "May not be executed in just try mode"
def fake_mysqldb_connect(**kwargs):
return FakeMySQLdb(**kwargs)
def fake_mysqldb_connect_just_try(**kwargs):
con = FakeMySQLdb(**kwargs)
con.just_try = True
return con
@pytest.fixture
def test_mydb():
return MyDB("127.0.0.1", "user", "password", "dbname")
@pytest.fixture
def fake_mydb(mocker):
mocker.patch("MySQLdb.connect", fake_mysqldb_connect)
return MyDB("127.0.0.1", "user", "password", "dbname")
@pytest.fixture
def fake_just_try_mydb(mocker):
mocker.patch("MySQLdb.connect", fake_mysqldb_connect_just_try)
return MyDB("127.0.0.1", "user", "password", "dbname", just_try=True)
@pytest.fixture
def fake_connected_mydb(fake_mydb):
fake_mydb.connect()
return fake_mydb
@pytest.fixture
def fake_connected_just_try_mydb(fake_just_try_mydb):
fake_just_try_mydb.connect()
return fake_just_try_mydb
def generate_mock_args(
expected_args=(), expected_kwargs={}, expected_return=True
): # pylint: disable=dangerous-default-value
def mock_args(*args, **kwargs):
# pylint: disable=consider-using-f-string
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (
args,
expected_args,
)
# pylint: disable=consider-using-f-string
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (
kwargs,
expected_kwargs,
)
return expected_return
return mock_args
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
assert False, "doSQL() may not be executed in just try mode"
def generate_mock_doSQL(
expected_sql, expected_params={}, expected_return=True
): # pylint: disable=dangerous-default-value
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
# pylint: disable=consider-using-f-string
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (
sql,
expected_sql,
)
# pylint: disable=consider-using-f-string
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (
params,
expected_params,
)
return expected_return
return mock_doSQL
# MyDB.doSelect() have same expected parameters as MyDB.doSQL()
generate_mock_doSelect = generate_mock_doSQL
mock_doSelect_just_try = mock_doSQL_just_try
#
# Test on MyDB helper methods
#
def test_combine_params_with_to_add_parameter():
assert MyDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs():
assert MyDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2}
def test_combine_params_with_kargs_and_to_add_parameter():
assert MyDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == {
"test1": 1,
"test2": 2,
"test3": 3,
}
def test_format_where_clauses_params_are_preserved():
args = ("test = test", {"test1": 1})
assert MyDB._format_where_clauses(*args) == args
def test_format_where_clauses_raw():
assert MyDB._format_where_clauses("test = test") == ("test = test", {})
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2})
assert MyDB._format_where_clauses(where_clauses) == where_clauses
def test_format_where_clauses_with_list_as_value():
assert MyDB._format_where_clauses({"test": [1, 2]}) == (
"`test` IN (%(test_0)s, %(test_1)s)",
{"test_0": 1, "test_1": 2},
)
def test_format_where_clauses_dict():
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._format_where_clauses(where_clauses) == (
"`test1` = %(test1)s AND `test2` = %(test2)s",
where_clauses,
)
def test_format_where_clauses_combined_types():
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4})
assert MyDB._format_where_clauses(where_clauses) == (
"test1 = 1 AND test2 LIKE %(test2)s AND `test3` = %(test3)s AND `test4` = %(test4)s",
{"test2": 2, "test3": 3, "test4": 4},
)
def test_format_where_clauses_with_where_op():
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._format_where_clauses(where_clauses, where_op="OR") == (
"`test1` = %(test1)s OR `test2` = %(test2)s",
where_clauses,
)
def test_add_where_clauses():
sql = "SELECT * FROM table"
where_clauses = {"test1": 1, "test2": 2}
assert MyDB._add_where_clauses(sql, None, where_clauses) == (
sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s",
where_clauses,
)
def test_add_where_clauses_preserved_params():
sql = "SELECT * FROM table"
where_clauses = {"test1": 1, "test2": 2}
params = {"fake1": 1}
assert MyDB._add_where_clauses(sql, params.copy(), where_clauses) == (
sql + " WHERE `test1` = %(test1)s AND `test2` = %(test2)s",
{**where_clauses, **params},
)
def test_add_where_clauses_with_op():
sql = "SELECT * FROM table"
where_clauses = ("test1=1", "test2=2")
assert MyDB._add_where_clauses(sql, None, where_clauses, where_op="OR") == (
sql + " WHERE test1=1 OR test2=2",
{},
)
def test_add_where_clauses_with_duplicated_field():
sql = "UPDATE table SET test1=%(test1)s"
params = {"test1": "new_value"}
where_clauses = {"test1": "where_value"}
assert MyDB._add_where_clauses(sql, params, where_clauses) == (
sql + " WHERE `test1` = %(test1_1)s",
{"test1": "new_value", "test1_1": "where_value"},
)
def test_quote_table_name():
assert MyDB._quote_table_name("mytable") == "`mytable`"
assert MyDB._quote_table_name("myschema.mytable") == "`myschema`.`mytable`"
def test_insert(mocker, test_mydb):
values = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
"INSERT INTO `mytable` (`test1`, `test2`) VALUES (%(test1)s, %(test2)s)", values
),
)
assert test_mydb.insert("mytable", values)
def test_insert_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try)
assert test_mydb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True)
def test_update(mocker, test_mydb):
values = {"test1": 1, "test2": 2}
where_clauses = {"test3": 3, "test4": 4}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
"UPDATE `mytable` SET `test1` = %(test1)s, `test2` = %(test2)s WHERE `test3` ="
" %(test3)s AND `test4` = %(test4)s",
{**values, **where_clauses},
),
)
assert test_mydb.update("mytable", values, where_clauses)
def test_update_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try)
assert test_mydb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True)
def test_delete(mocker, test_mydb):
where_clauses = {"test1": 1, "test2": 2}
mocker.patch(
"mylib.mysql.MyDB.doSQL",
generate_mock_doSQL(
"DELETE FROM `mytable` WHERE `test1` = %(test1)s AND `test2` = %(test2)s", where_clauses
),
)
assert test_mydb.delete("mytable", where_clauses)
def test_delete_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSQL_just_try)
assert test_mydb.delete("mytable", None, just_try=True)
def test_truncate(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", generate_mock_doSQL("TRUNCATE TABLE `mytable`", None))
assert test_mydb.truncate("mytable")
def test_truncate_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSelect_just_try)
assert test_mydb.truncate("mytable", just_try=True)
def test_select(mocker, test_mydb):
fields = ("field1", "field2")
where_clauses = {"test3": 3, "test4": 4}
expected_return = [
{"field1": 1, "field2": 2},
{"field1": 2, "field2": 3},
]
order_by = "field1, DESC"
limit = 10
mocker.patch(
"mylib.mysql.MyDB.doSelect",
generate_mock_doSQL(
"SELECT `field1`, `field2` FROM `mytable` WHERE `test3` = %(test3)s AND `test4` ="
" %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608
where_clauses,
expected_return,
),
)
assert (
test_mydb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit)
== expected_return
)
def test_select_without_field_and_order_by(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSelect", generate_mock_doSQL("SELECT * FROM `mytable`"))
assert test_mydb.select("mytable")
def test_select_just_try(mocker, test_mydb):
mocker.patch("mylib.mysql.MyDB.doSQL", mock_doSelect_just_try)
assert test_mydb.select("mytable", None, None, just_try=True)
#
# Tests on main methods
#
def test_connect(mocker, test_mydb):
expected_kwargs = {
"db": test_mydb._db,
"user": test_mydb._user,
"host": test_mydb._host,
"passwd": test_mydb._pwd,
"charset": test_mydb._charset,
"use_unicode": True,
}
mocker.patch("MySQLdb.connect", generate_mock_args(expected_kwargs=expected_kwargs))
assert test_mydb.connect()
def test_close(fake_mydb):
assert fake_mydb.close() is None
def test_close_connected(fake_connected_mydb):
assert fake_connected_mydb.close() is None
def test_doSQL(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s"
fake_connected_mydb._conn.expected_params = {"test1": 1}
fake_connected_mydb.doSQL(
fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params
)
def test_doSQL_without_params(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "DELETE FROM table"
fake_connected_mydb.doSQL(fake_connected_mydb._conn.expected_sql)
def test_doSQL_just_try(fake_connected_just_try_mydb):
assert fake_connected_just_try_mydb.doSQL("DELETE FROM table")
def test_doSQL_on_exception(fake_connected_mydb):
fake_connected_mydb._conn.expected_exception = True
assert fake_connected_mydb.doSQL("DELETE FROM table") is False
def test_doSelect(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_mydb._conn.expected_params = {"test1": 1}
fake_connected_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_mydb.doSelect(
fake_connected_mydb._conn.expected_sql, fake_connected_mydb._conn.expected_params
)
== fake_connected_mydb._conn.expected_return
)
def test_doSelect_without_params(fake_connected_mydb):
fake_connected_mydb._conn.expected_sql = "SELECT * FROM table"
fake_connected_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_mydb.doSelect(fake_connected_mydb._conn.expected_sql)
== fake_connected_mydb._conn.expected_return
)
def test_doSelect_on_exception(fake_connected_mydb):
fake_connected_mydb._conn.expected_exception = True
assert fake_connected_mydb.doSelect("SELECT * FROM table") is False
def test_doSelect_just_try(fake_connected_just_try_mydb):
fake_connected_just_try_mydb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
fake_connected_just_try_mydb._conn.expected_params = {"test1": 1}
fake_connected_just_try_mydb._conn.expected_return = [{"test1": 1}]
assert (
fake_connected_just_try_mydb.doSelect(
fake_connected_just_try_mydb._conn.expected_sql,
fake_connected_just_try_mydb._conn.expected_params,
)
== fake_connected_just_try_mydb._conn.expected_return
)