505 lines
16 KiB
Python
505 lines
16 KiB
Python
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
|
""" Tests on opening hours helpers """
|
|
|
|
import psycopg2
|
|
import pytest
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
from mylib.pgsql import PgDB
|
|
|
|
|
|
class FakePsycopg2Cursor:
|
|
"""Fake Psycopg2 cursor"""
|
|
|
|
def __init__(
|
|
self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception
|
|
):
|
|
self.expected_sql = expected_sql
|
|
self.expected_params = expected_params
|
|
self.expected_return = expected_return
|
|
self.expected_just_try = expected_just_try
|
|
self.expected_exception = expected_exception
|
|
|
|
def execute(self, sql, params=None):
|
|
if self.expected_exception:
|
|
raise psycopg2.Error(f"{self}.execute({sql}, {params}): expected exception")
|
|
if self.expected_just_try and not sql.lower().startswith("select "):
|
|
assert False, f"{self}.execute({sql}, {params}) may not be executed in just try mode"
|
|
# pylint: disable=consider-using-f-string
|
|
assert (
|
|
sql == self.expected_sql
|
|
), "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (
|
|
self,
|
|
sql,
|
|
self.expected_sql,
|
|
)
|
|
# pylint: disable=consider-using-f-string
|
|
assert (
|
|
params == self.expected_params
|
|
), "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (
|
|
self,
|
|
params,
|
|
self.expected_params,
|
|
)
|
|
return self.expected_return
|
|
|
|
def fetchall(self):
|
|
return self.expected_return
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"FakePsycopg2Cursor({self.expected_sql}, {self.expected_params}, "
|
|
f"{self.expected_return}, {self.expected_just_try})"
|
|
)
|
|
|
|
|
|
class FakePsycopg2:
|
|
"""Fake Psycopg2 connection"""
|
|
|
|
expected_sql = None
|
|
expected_params = None
|
|
expected_cursor_factory = None
|
|
expected_return = True
|
|
expected_just_try = False
|
|
expected_exception = False
|
|
just_try = False
|
|
|
|
def __init__(self, **kwargs):
|
|
allowed_kwargs = {"dbname": str, "user": str, "password": (str, None), "host": str}
|
|
for arg, value in kwargs.items():
|
|
assert arg in allowed_kwargs, f'Invalid arg {arg}="{value}"'
|
|
assert isinstance(
|
|
value, allowed_kwargs[arg]
|
|
), f"Arg {arg} not a {allowed_kwargs[arg]} ({type(value)})"
|
|
setattr(self, arg, value)
|
|
|
|
def close(self):
|
|
return self.expected_return
|
|
|
|
def set_client_encoding(self, *arg):
|
|
self._check_just_try()
|
|
assert len(arg) == 1 and isinstance(arg[0], str)
|
|
if self.expected_exception:
|
|
raise psycopg2.Error(f"set_client_encoding({arg[0]}): Expected exception")
|
|
return self.expected_return
|
|
|
|
def cursor(self, cursor_factory=None):
|
|
assert cursor_factory is self.expected_cursor_factory
|
|
return FakePsycopg2Cursor(
|
|
self.expected_sql,
|
|
self.expected_params,
|
|
self.expected_return,
|
|
self.expected_just_try or self.just_try,
|
|
self.expected_exception,
|
|
)
|
|
|
|
def commit(self):
|
|
self._check_just_try()
|
|
return self.expected_return
|
|
|
|
def rollback(self):
|
|
self._check_just_try()
|
|
return self.expected_return
|
|
|
|
def _check_just_try(self):
|
|
if self.just_try:
|
|
assert False, "May not be executed in just try mode"
|
|
|
|
|
|
def fake_psycopg2_connect(**kwargs):
|
|
return FakePsycopg2(**kwargs)
|
|
|
|
|
|
def fake_psycopg2_connect_just_try(**kwargs):
|
|
con = FakePsycopg2(**kwargs)
|
|
con.just_try = True
|
|
return con
|
|
|
|
|
|
@pytest.fixture
|
|
def test_pgdb():
|
|
return PgDB("127.0.0.1", "user", "password", "dbname")
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_pgdb(mocker):
|
|
mocker.patch("psycopg2.connect", fake_psycopg2_connect)
|
|
return PgDB("127.0.0.1", "user", "password", "dbname")
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_just_try_pgdb(mocker):
|
|
mocker.patch("psycopg2.connect", fake_psycopg2_connect_just_try)
|
|
return PgDB("127.0.0.1", "user", "password", "dbname", just_try=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_connected_pgdb(fake_pgdb):
|
|
fake_pgdb.connect()
|
|
return fake_pgdb
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_connected_just_try_pgdb(fake_just_try_pgdb):
|
|
fake_just_try_pgdb.connect()
|
|
return fake_just_try_pgdb
|
|
|
|
|
|
def generate_mock_args(
|
|
expected_args=(), expected_kwargs={}, expected_return=True
|
|
): # pylint: disable=dangerous-default-value
|
|
def mock_args(*args, **kwargs):
|
|
# pylint: disable=consider-using-f-string
|
|
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (
|
|
args,
|
|
expected_args,
|
|
)
|
|
# pylint: disable=consider-using-f-string
|
|
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (
|
|
kwargs,
|
|
expected_kwargs,
|
|
)
|
|
return expected_return
|
|
|
|
return mock_args
|
|
|
|
|
|
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
|
|
assert False, "doSQL() may not be executed in just try mode"
|
|
|
|
|
|
def generate_mock_doSQL(
|
|
expected_sql, expected_params={}, expected_return=True
|
|
): # pylint: disable=dangerous-default-value
|
|
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
|
# pylint: disable=consider-using-f-string
|
|
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (
|
|
sql,
|
|
expected_sql,
|
|
)
|
|
# pylint: disable=consider-using-f-string
|
|
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (
|
|
params,
|
|
expected_params,
|
|
)
|
|
return expected_return
|
|
|
|
return mock_doSQL
|
|
|
|
|
|
# PgDB.doSelect() have same expected parameters as PgDB.doSQL()
|
|
generate_mock_doSelect = generate_mock_doSQL
|
|
mock_doSelect_just_try = mock_doSQL_just_try
|
|
|
|
#
|
|
# Test on PgDB helper methods
|
|
#
|
|
|
|
|
|
def test_combine_params_with_to_add_parameter():
|
|
assert PgDB._combine_params({"test1": 1}, {"test2": 2}) == {"test1": 1, "test2": 2}
|
|
|
|
|
|
def test_combine_params_with_kargs():
|
|
assert PgDB._combine_params({"test1": 1}, test2=2) == {"test1": 1, "test2": 2}
|
|
|
|
|
|
def test_combine_params_with_kargs_and_to_add_parameter():
|
|
assert PgDB._combine_params({"test1": 1}, {"test2": 2}, test3=3) == {
|
|
"test1": 1,
|
|
"test2": 2,
|
|
"test3": 3,
|
|
}
|
|
|
|
|
|
def test_format_where_clauses_params_are_preserved():
|
|
args = ("test = test", {"test1": 1})
|
|
assert PgDB._format_where_clauses(*args) == args
|
|
|
|
|
|
def test_format_where_clauses_raw():
|
|
assert PgDB._format_where_clauses("test = test") == ("test = test", {})
|
|
|
|
|
|
def test_format_where_clauses_tuple_clause_with_params():
|
|
where_clauses = ("test1 = %(test1)s AND test2 = %(test2)s", {"test1": 1, "test2": 2})
|
|
assert PgDB._format_where_clauses(where_clauses) == where_clauses
|
|
|
|
|
|
def test_format_where_clauses_with_list_as_value():
|
|
assert PgDB._format_where_clauses({"test": [1, 2]}) == (
|
|
'"test" IN (%(test_0)s, %(test_1)s)',
|
|
{"test_0": 1, "test_1": 2},
|
|
)
|
|
|
|
|
|
def test_format_where_clauses_dict():
|
|
where_clauses = {"test1": 1, "test2": 2}
|
|
assert PgDB._format_where_clauses(where_clauses) == (
|
|
'"test1" = %(test1)s AND "test2" = %(test2)s',
|
|
where_clauses,
|
|
)
|
|
|
|
|
|
def test_format_where_clauses_combined_types():
|
|
where_clauses = ("test1 = 1", ("test2 LIKE %(test2)s", {"test2": 2}), {"test3": 3, "test4": 4})
|
|
assert PgDB._format_where_clauses(where_clauses) == (
|
|
'test1 = 1 AND test2 LIKE %(test2)s AND "test3" = %(test3)s AND "test4" = %(test4)s',
|
|
{"test2": 2, "test3": 3, "test4": 4},
|
|
)
|
|
|
|
|
|
def test_format_where_clauses_with_where_op():
|
|
where_clauses = {"test1": 1, "test2": 2}
|
|
assert PgDB._format_where_clauses(where_clauses, where_op="OR") == (
|
|
'"test1" = %(test1)s OR "test2" = %(test2)s',
|
|
where_clauses,
|
|
)
|
|
|
|
|
|
def test_add_where_clauses():
|
|
sql = "SELECT * FROM table"
|
|
where_clauses = {"test1": 1, "test2": 2}
|
|
assert PgDB._add_where_clauses(sql, None, where_clauses) == (
|
|
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
|
|
where_clauses,
|
|
)
|
|
|
|
|
|
def test_add_where_clauses_preserved_params():
|
|
sql = "SELECT * FROM table"
|
|
where_clauses = {"test1": 1, "test2": 2}
|
|
params = {"fake1": 1}
|
|
assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == (
|
|
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
|
|
{**where_clauses, **params},
|
|
)
|
|
|
|
|
|
def test_add_where_clauses_with_op():
|
|
sql = "SELECT * FROM table"
|
|
where_clauses = ("test1=1", "test2=2")
|
|
assert PgDB._add_where_clauses(sql, None, where_clauses, where_op="OR") == (
|
|
sql + " WHERE test1=1 OR test2=2",
|
|
{},
|
|
)
|
|
|
|
|
|
def test_add_where_clauses_with_duplicated_field():
|
|
sql = "UPDATE table SET test1=%(test1)s"
|
|
params = {"test1": "new_value"}
|
|
where_clauses = {"test1": "where_value"}
|
|
assert PgDB._add_where_clauses(sql, params, where_clauses) == (
|
|
sql + ' WHERE "test1" = %(test1_1)s',
|
|
{"test1": "new_value", "test1_1": "where_value"},
|
|
)
|
|
|
|
|
|
def test_quote_table_name():
|
|
assert PgDB._quote_table_name("mytable") == '"mytable"'
|
|
assert PgDB._quote_table_name("myschema.mytable") == '"myschema"."mytable"'
|
|
|
|
|
|
def test_insert(mocker, test_pgdb):
|
|
values = {"test1": 1, "test2": 2}
|
|
mocker.patch(
|
|
"mylib.pgsql.PgDB.doSQL",
|
|
generate_mock_doSQL(
|
|
'INSERT INTO "mytable" ("test1", "test2") VALUES (%(test1)s, %(test2)s)', values
|
|
),
|
|
)
|
|
|
|
assert test_pgdb.insert("mytable", values)
|
|
|
|
|
|
def test_insert_just_try(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try)
|
|
assert test_pgdb.insert("mytable", {"test1": 1, "test2": 2}, just_try=True)
|
|
|
|
|
|
def test_update(mocker, test_pgdb):
|
|
values = {"test1": 1, "test2": 2}
|
|
where_clauses = {"test3": 3, "test4": 4}
|
|
mocker.patch(
|
|
"mylib.pgsql.PgDB.doSQL",
|
|
generate_mock_doSQL(
|
|
'UPDATE "mytable" SET "test1" = %(test1)s, "test2" = %(test2)s WHERE "test3" ='
|
|
' %(test3)s AND "test4" = %(test4)s',
|
|
{**values, **where_clauses},
|
|
),
|
|
)
|
|
|
|
assert test_pgdb.update("mytable", values, where_clauses)
|
|
|
|
|
|
def test_update_just_try(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try)
|
|
assert test_pgdb.update("mytable", {"test1": 1, "test2": 2}, None, just_try=True)
|
|
|
|
|
|
def test_delete(mocker, test_pgdb):
|
|
where_clauses = {"test1": 1, "test2": 2}
|
|
mocker.patch(
|
|
"mylib.pgsql.PgDB.doSQL",
|
|
generate_mock_doSQL(
|
|
'DELETE FROM "mytable" WHERE "test1" = %(test1)s AND "test2" = %(test2)s', where_clauses
|
|
),
|
|
)
|
|
|
|
assert test_pgdb.delete("mytable", where_clauses)
|
|
|
|
|
|
def test_delete_just_try(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSQL_just_try)
|
|
assert test_pgdb.delete("mytable", None, just_try=True)
|
|
|
|
|
|
def test_truncate(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", generate_mock_doSQL('TRUNCATE TABLE "mytable"', None))
|
|
|
|
assert test_pgdb.truncate("mytable")
|
|
|
|
|
|
def test_truncate_just_try(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSelect_just_try)
|
|
assert test_pgdb.truncate("mytable", just_try=True)
|
|
|
|
|
|
def test_select(mocker, test_pgdb):
|
|
fields = ("field1", "field2")
|
|
where_clauses = {"test3": 3, "test4": 4}
|
|
expected_return = [
|
|
{"field1": 1, "field2": 2},
|
|
{"field1": 2, "field2": 3},
|
|
]
|
|
order_by = "field1, DESC"
|
|
limit = 10
|
|
mocker.patch(
|
|
"mylib.pgsql.PgDB.doSelect",
|
|
generate_mock_doSQL(
|
|
'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = %(test3)s AND "test4" ='
|
|
" %(test4)s ORDER BY " + order_by + " LIMIT " + str(limit), # nosec: B608
|
|
where_clauses,
|
|
expected_return,
|
|
),
|
|
)
|
|
|
|
assert (
|
|
test_pgdb.select("mytable", where_clauses, fields, order_by=order_by, limit=limit)
|
|
== expected_return
|
|
)
|
|
|
|
|
|
def test_select_without_field_and_order_by(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSelect", generate_mock_doSQL('SELECT * FROM "mytable"'))
|
|
|
|
assert test_pgdb.select("mytable")
|
|
|
|
|
|
def test_select_just_try(mocker, test_pgdb):
|
|
mocker.patch("mylib.pgsql.PgDB.doSQL", mock_doSelect_just_try)
|
|
assert test_pgdb.select("mytable", None, None, just_try=True)
|
|
|
|
|
|
#
|
|
# Tests on main methods
|
|
#
|
|
|
|
|
|
def test_connect(mocker, test_pgdb):
|
|
expected_kwargs = {
|
|
"dbname": test_pgdb._db,
|
|
"user": test_pgdb._user,
|
|
"host": test_pgdb._host,
|
|
"password": test_pgdb._pwd,
|
|
}
|
|
|
|
mocker.patch("psycopg2.connect", generate_mock_args(expected_kwargs=expected_kwargs))
|
|
|
|
assert test_pgdb.connect()
|
|
|
|
|
|
def test_close(fake_pgdb):
|
|
assert fake_pgdb.close() is None
|
|
|
|
|
|
def test_close_connected(fake_connected_pgdb):
|
|
assert fake_connected_pgdb.close() is None
|
|
|
|
|
|
def test_setEncoding(fake_connected_pgdb):
|
|
assert fake_connected_pgdb.setEncoding("utf8")
|
|
|
|
|
|
def test_setEncoding_not_connected(fake_pgdb):
|
|
assert fake_pgdb.setEncoding("utf8") is False
|
|
|
|
|
|
def test_setEncoding_on_exception(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_exception = True
|
|
assert fake_connected_pgdb.setEncoding("utf8") is False
|
|
|
|
|
|
def test_doSQL(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_sql = "DELETE FROM table WHERE test1 = %(test1)s"
|
|
fake_connected_pgdb._conn.expected_params = {"test1": 1}
|
|
fake_connected_pgdb.doSQL(
|
|
fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params
|
|
)
|
|
|
|
|
|
def test_doSQL_without_params(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_sql = "DELETE FROM table"
|
|
fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql)
|
|
|
|
|
|
def test_doSQL_just_try(fake_connected_just_try_pgdb):
|
|
assert fake_connected_just_try_pgdb.doSQL("DELETE FROM table")
|
|
|
|
|
|
def test_doSQL_on_exception(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_exception = True
|
|
assert fake_connected_pgdb.doSQL("DELETE FROM table") is False
|
|
|
|
|
|
def test_doSelect(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
|
|
fake_connected_pgdb._conn.expected_params = {"test1": 1}
|
|
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
|
|
fake_connected_pgdb._conn.expected_return = [{"test1": 1}]
|
|
assert (
|
|
fake_connected_pgdb.doSelect(
|
|
fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params
|
|
)
|
|
== fake_connected_pgdb._conn.expected_return
|
|
)
|
|
|
|
|
|
def test_doSelect_without_params(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_sql = "SELECT * FROM table"
|
|
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
|
|
fake_connected_pgdb._conn.expected_return = [{"test1": 1}]
|
|
assert (
|
|
fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql)
|
|
== fake_connected_pgdb._conn.expected_return
|
|
)
|
|
|
|
|
|
def test_doSelect_on_exception(fake_connected_pgdb):
|
|
fake_connected_pgdb._conn.expected_cursor_factory = RealDictCursor
|
|
fake_connected_pgdb._conn.expected_exception = True
|
|
assert fake_connected_pgdb.doSelect("SELECT * FROM table") is False
|
|
|
|
|
|
def test_doSelect_just_try(fake_connected_just_try_pgdb):
|
|
fake_connected_just_try_pgdb._conn.expected_sql = "SELECT * FROM table WHERE test1 = %(test1)s"
|
|
fake_connected_just_try_pgdb._conn.expected_params = {"test1": 1}
|
|
fake_connected_just_try_pgdb._conn.expected_cursor_factory = RealDictCursor
|
|
fake_connected_just_try_pgdb._conn.expected_return = [{"test1": 1}]
|
|
assert (
|
|
fake_connected_just_try_pgdb.doSelect(
|
|
fake_connected_just_try_pgdb._conn.expected_sql,
|
|
fake_connected_just_try_pgdb._conn.expected_params,
|
|
)
|
|
== fake_connected_just_try_pgdb._conn.expected_return
|
|
)
|