python-mylib/tests/test_pgsql.py

453 lines
14 KiB
Python
Raw Normal View History

# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
2021-10-06 21:33:25 +02:00
""" Tests on opening hours helpers """
2023-01-06 19:36:14 +01:00
import psycopg2
2021-10-06 21:33:25 +02:00
import pytest
from mylib.pgsql import PgDB
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
class FakePsycopg2Cursor:
""" Fake Psycopg2 cursor """
2021-10-06 21:33:25 +02:00
def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception):
self.expected_sql = expected_sql
self.expected_params = expected_params
self.expected_return = expected_return
self.expected_just_try = expected_just_try
self.expected_exception = expected_exception
def execute(self, sql, params=None):
if self.expected_exception:
2023-01-06 19:36:14 +01:00
raise psycopg2.Error("%s.execute(%s, %s): expected exception" % (self, sql, params))
2021-10-06 21:33:25 +02:00
if self.expected_just_try and not sql.lower().startswith('select '):
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params)
return self.expected_return
def fetchall(self):
return self.expected_return
def __repr__(self):
return "FakePsycopg2Cursor(%s, %s, %s, %s)" % (
self.expected_sql, self.expected_params,
self.expected_return, self.expected_just_try
)
class FakePsycopg2:
""" Fake Psycopg2 connection """
2021-10-06 21:33:25 +02:00
expected_sql = None
expected_params = None
expected_return = True
expected_just_try = False
expected_exception = False
just_try = False
def __init__(self, **kwargs):
allowed_kwargs = dict(dbname=str, user=str, password=(str, None), host=str)
for arg, value in kwargs.items():
assert arg in allowed_kwargs, "Invalid arg %s='%s'" % (arg, value)
assert isinstance(value, allowed_kwargs[arg]), "Arg %s not a %s (%s)" % (arg, allowed_kwargs[arg], type(value))
setattr(self, arg, value)
def close(self):
return self.expected_return
def set_client_encoding(self, *arg):
self._check_just_try()
assert len(arg) == 1 and isinstance(arg[0], str)
if self.expected_exception:
2023-01-06 19:36:14 +01:00
raise psycopg2.Error("set_client_encoding(%s): Expected exception" % arg[0])
2021-10-06 21:33:25 +02:00
return self.expected_return
def cursor(self):
return FakePsycopg2Cursor(
self.expected_sql, self.expected_params,
self.expected_return, self.expected_just_try or self.just_try,
self.expected_exception
)
def commit(self):
self._check_just_try()
return self.expected_return
def rollback(self):
self._check_just_try()
return self.expected_return
def _check_just_try(self):
if self.just_try:
assert False, "May not be executed in just try mode"
2021-10-06 21:33:25 +02:00
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def fake_psycopg2_connect(**kwargs):
return FakePsycopg2(**kwargs)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def fake_psycopg2_connect_just_try(**kwargs):
con = FakePsycopg2(**kwargs)
con.just_try = True
return con
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
@pytest.fixture
def test_pgdb():
return PgDB('127.0.0.1', 'user', 'password', 'dbname')
2021-10-06 21:33:25 +02:00
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
@pytest.fixture
def fake_pgdb(mocker):
mocker.patch('psycopg2.connect', fake_psycopg2_connect)
return PgDB('127.0.0.1', 'user', 'password', 'dbname')
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
@pytest.fixture
def fake_just_try_pgdb(mocker):
mocker.patch('psycopg2.connect', fake_psycopg2_connect_just_try)
return PgDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
@pytest.fixture
def fake_connected_pgdb(fake_pgdb):
fake_pgdb.connect()
return fake_pgdb
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
@pytest.fixture
def fake_connected_just_try_pgdb(fake_just_try_pgdb):
fake_just_try_pgdb.connect()
return fake_just_try_pgdb
2021-11-07 21:37:18 +01:00
2023-01-06 19:36:14 +01:00
def generate_mock_args(expected_args=(), expected_kwargs={}, expected_return=True): # pylint: disable=dangerous-default-value
2021-10-06 21:33:25 +02:00
def mock_args(*args, **kwargs):
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
return expected_return
return mock_args
2021-11-07 21:37:18 +01:00
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
2021-10-06 21:33:25 +02:00
assert False, "doSQL() may not be executed in just try mode"
2021-11-07 21:37:18 +01:00
2023-01-06 19:36:14 +01:00
def generate_mock_doSQL(expected_sql, expected_params={}, expected_return=True): # pylint: disable=dangerous-default-value
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
2021-10-06 21:33:25 +02:00
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
return expected_return
return mock_doSQL
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
# PgDB.doSelect() have same expected parameters as PgDB.doSQL()
generate_mock_doSelect = generate_mock_doSQL
mock_doSelect_just_try = mock_doSQL_just_try
#
# Test on PgDB helper methods
#
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_combine_params_with_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2)) == dict(
test1=1, test2=2
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_combine_params_with_kargs():
assert PgDB._combine_params(dict(test1=1), test2=2) == dict(
test1=1, test2=2
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_combine_params_with_kargs_and_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
test1=1, test2=2, test3=3
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_params_are_preserved():
2021-11-07 21:37:18 +01:00
args = ('test = test', dict(test1=1))
2021-10-06 21:33:25 +02:00
assert PgDB._format_where_clauses(*args) == args
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_raw():
2023-01-06 19:36:14 +01:00
assert PgDB._format_where_clauses('test = test') == (('test = test'), {})
2021-10-06 21:33:25 +02:00
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = (
'test1 = %(test1)s AND test2 = %(test2)s',
dict(test1=1, test2=2)
)
assert PgDB._format_where_clauses(where_clauses) == where_clauses
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_dict():
where_clauses = dict(test1=1, test2=2)
assert PgDB._format_where_clauses(where_clauses) == (
'"test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_combined_types():
where_clauses = (
'test1 = 1',
('test2 LIKE %(test2)s', dict(test2=2)),
dict(test3=3, test4=4)
)
assert PgDB._format_where_clauses(where_clauses) == (
'test1 = 1 AND test2 LIKE %(test2)s AND "test3" = %(test3)s AND "test4" = %(test4)s',
dict(test2=2, test3=3, test4=4)
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_format_where_clauses_with_where_op():
where_clauses = dict(test1=1, test2=2)
assert PgDB._format_where_clauses(where_clauses, where_op='OR') == (
'"test1" = %(test1)s OR "test2" = %(test2)s',
where_clauses
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_add_where_clauses():
2021-11-07 21:37:18 +01:00
sql = "SELECT * FROM table"
2021-10-06 21:33:25 +02:00
where_clauses = dict(test1=1, test2=2)
assert PgDB._add_where_clauses(sql, None, where_clauses) == (
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_add_where_clauses_preserved_params():
2021-11-07 21:37:18 +01:00
sql = "SELECT * FROM table"
2021-10-06 21:33:25 +02:00
where_clauses = dict(test1=1, test2=2)
params = dict(fake1=1)
assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == (
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
dict(**where_clauses, **params)
)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_add_where_clauses_with_op():
2021-11-07 21:37:18 +01:00
sql = "SELECT * FROM table"
2021-10-06 21:33:25 +02:00
where_clauses = ('test1=1', 'test2=2')
assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
sql + ' WHERE test1=1 OR test2=2',
2023-01-06 19:36:14 +01:00
{}
2021-10-06 21:33:25 +02:00
)
2021-11-07 21:37:18 +01:00
def test_add_where_clauses_with_duplicated_field():
2021-11-07 21:37:18 +01:00
sql = "UPDATE table SET test1=%(test1)s"
params = dict(test1='new_value')
where_clauses = dict(test1='where_value')
assert PgDB._add_where_clauses(sql, params, where_clauses) == (
sql + ' WHERE "test1" = %(test1_1)s',
dict(test1='new_value', test1_1='where_value')
)
2021-11-07 21:37:18 +01:00
def test_quote_table_name():
assert PgDB._quote_table_name("mytable") == '"mytable"'
assert PgDB._quote_table_name("myschema.mytable") == '"myschema"."mytable"'
2021-10-06 21:33:25 +02:00
def test_insert(mocker, test_pgdb):
values = dict(test1=1, test2=2)
mocker.patch(
'mylib.pgsql.PgDB.doSQL',
generate_mock_doSQL(
'INSERT INTO "mytable" ("test1", "test2") VALUES (%(test1)s, %(test2)s)',
values
)
)
assert test_pgdb.insert('mytable', values)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_insert_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.insert('mytable', dict(test1=1, test2=2), just_try=True)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_update(mocker, test_pgdb):
values = dict(test1=1, test2=2)
where_clauses = dict(test3=3, test4=4)
mocker.patch(
'mylib.pgsql.PgDB.doSQL',
generate_mock_doSQL(
'UPDATE "mytable" SET "test1" = %(test1)s, "test2" = %(test2)s WHERE "test3" = %(test3)s AND "test4" = %(test4)s',
dict(**values, **where_clauses)
)
)
assert test_pgdb.update('mytable', values, where_clauses)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_update_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.update('mytable', dict(test1=1, test2=2), None, just_try=True)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_delete(mocker, test_pgdb):
where_clauses = dict(test1=1, test2=2)
mocker.patch(
'mylib.pgsql.PgDB.doSQL',
generate_mock_doSQL(
'DELETE FROM "mytable" WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses
)
)
assert test_pgdb.delete('mytable', where_clauses)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_delete_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.delete('mytable', None, just_try=True)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_truncate(mocker, test_pgdb):
mocker.patch(
'mylib.pgsql.PgDB.doSQL',
2023-01-06 19:36:14 +01:00
generate_mock_doSQL('TRUNCATE TABLE "mytable"', None)
2021-10-06 21:33:25 +02:00
)
assert test_pgdb.truncate('mytable')
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_truncate_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try)
assert test_pgdb.truncate('mytable', just_try=True)
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_select(mocker, test_pgdb):
fields = ('field1', 'field2')
where_clauses = dict(test3=3, test4=4)
expected_return = [
dict(field1=1, field2=2),
dict(field1=2, field2=3),
]
order_by = "field1, DESC"
mocker.patch(
'mylib.pgsql.PgDB.doSelect',
generate_mock_doSQL(
'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = %(test3)s AND "test4" = %(test4)s ORDER BY ' + order_by,
where_clauses, expected_return
)
)
assert test_pgdb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_select_without_field_and_order_by(mocker, test_pgdb):
mocker.patch(
'mylib.pgsql.PgDB.doSelect',
generate_mock_doSQL(
'SELECT * FROM "mytable"'
)
)
assert test_pgdb.select('mytable')
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_select_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try)
assert test_pgdb.select('mytable', None, None, just_try=True)
#
# Tests on main methods
#
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_connect(mocker, test_pgdb):
expected_kwargs = dict(
2021-11-07 21:37:18 +01:00
dbname=test_pgdb._db,
user=test_pgdb._user,
host=test_pgdb._host,
password=test_pgdb._pwd
2021-10-06 21:33:25 +02:00
)
2021-10-06 21:33:25 +02:00
mocker.patch(
'psycopg2.connect',
generate_mock_args(
expected_kwargs=expected_kwargs
)
)
assert test_pgdb.connect()
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_close(fake_pgdb):
assert fake_pgdb.close() is None
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_close_connected(fake_connected_pgdb):
assert fake_connected_pgdb.close() is None
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_setEncoding(fake_connected_pgdb):
assert fake_connected_pgdb.setEncoding('utf8')
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_setEncoding_not_connected(fake_pgdb):
assert fake_pgdb.setEncoding('utf8') is False
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_setEncoding_on_exception(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_exception = True
2021-10-06 21:33:25 +02:00
assert fake_connected_pgdb.setEncoding('utf8') is False
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_doSQL(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params)
2021-10-06 21:33:25 +02:00
def test_doSQL_without_params(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table'
fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql)
2021-10-06 21:33:25 +02:00
def test_doSQL_just_try(fake_connected_just_try_pgdb):
assert fake_connected_just_try_pgdb.doSQL('DELETE FROM table')
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_doSQL_on_exception(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_exception = True
2021-10-06 21:33:25 +02:00
assert fake_connected_pgdb.doSQL('DELETE FROM table') is False
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_doSelect(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params) == fake_connected_pgdb._conn.expected_return
2021-10-06 21:33:25 +02:00
def test_doSelect_without_params(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table'
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql) == fake_connected_pgdb._conn.expected_return
2021-10-06 21:33:25 +02:00
def test_doSelect_on_exception(fake_connected_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_pgdb._conn.expected_exception = True
2021-10-06 21:33:25 +02:00
assert fake_connected_pgdb.doSelect('SELECT * FROM table') is False
2021-11-07 21:37:18 +01:00
2021-10-06 21:33:25 +02:00
def test_doSelect_just_try(fake_connected_just_try_pgdb):
2021-11-07 21:37:18 +01:00
fake_connected_just_try_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_just_try_pgdb._conn.expected_params = dict(test1=1)
fake_connected_just_try_pgdb._conn.expected_return = [dict(test1=1)]
2021-10-06 21:33:25 +02:00
assert fake_connected_just_try_pgdb.doSelect(
2021-11-07 21:37:18 +01:00
fake_connected_just_try_pgdb._conn.expected_sql,
fake_connected_just_try_pgdb._conn.expected_params
) == fake_connected_just_try_pgdb._conn.expected_return