PgSQL: code cleaning

This commit is contained in:
Benjamin Renard 2021-11-07 21:37:18 +01:00
parent 86be381db9
commit 6b825813c1
2 changed files with 120 additions and 78 deletions

View file

@ -53,52 +53,46 @@ class PgDBUnsupportedWHEREClauses(PgDBException, TypeError):
class PgDB:
""" PostgreSQL client """
host = ""
user = ""
pwd = ""
db = ""
con = 0
date_format = '%Y-%m-%d'
datetime_format = '%Y-%m-%d %H:%M:%S'
def __init__(self, host, user, pwd, db, just_try=False):
self.host = host
self.user = user
self.pwd = pwd
self.db = db
self._host = host
self._user = user
self._pwd = pwd
self._db = db
self._conn = None
self.just_try = just_try
def connect(self):
""" Connect to PostgreSQL server """
if self.con == 0:
if self._conn is None:
try:
con = psycopg2.connect(
dbname=self.db,
user=self.user,
host=self.host,
password=self.pwd
self._conn = psycopg2.connect(
dbname=self._db,
user=self._user,
host=self._host,
password=self._pwd
)
self.con = con
except Exception:
logging.fatal(
log.fatal(
'An error occured during Postgresql database connection (%s@%s, database=%s).',
self.user, self.host, self.db, exc_info=1
self._user, self._host, self._db, exc_info=1
)
sys.exit(1)
return True
def close(self):
""" Close connection with PostgreSQL server (if opened) """
if self.con:
self.con.close()
if self._conn:
self._conn.close()
self._conn = None
def setEncoding(self, enc):
""" Set connection encoding """
if self.con:
if self._conn:
try:
self.con.set_client_encoding(enc)
self._conn.set_client_encoding(enc)
return True
except Exception:
log.error(
@ -121,13 +115,13 @@ class PgDB:
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
return True
cursor = self.con.cursor()
cursor = self._conn.cursor()
try:
if params is None:
cursor.execute(sql)
else:
cursor.execute(sql, params)
self.con.commit()
self._conn.commit()
return True
except Exception:
log.error(
@ -139,7 +133,7 @@ class PgDB:
]) if params else "without params",
exc_info=True
)
self.con.rollback()
self._conn.rollback()
return False
def doSelect(self, sql, params=None):
@ -152,7 +146,7 @@ class PgDB:
:return: List of selected rows as dict on success, False otherwise
:rtype: list, bool
"""
cursor = self.con.cursor()
cursor = self._conn.cursor()
try:
cursor.execute(sql, params)
results = cursor.fetchall()
@ -169,7 +163,6 @@ class PgDB:
)
return False
#
# SQL helpers
#
@ -261,10 +254,9 @@ class PgDB:
sql += " WHERE " + sql_where
return (sql, params)
def insert(self, table, values, just_try=False):
""" Run INSERT SQL query """
sql = 'INSERT INTO "%s" ("%s") VALUES (%s)' % (
sql = 'INSERT INTO "{0}" ("{1}") VALUES ({2})'.format(
table,
'", "'.join(values.keys()),
", ".join([
@ -274,18 +266,18 @@ class PgDB:
)
if just_try:
log.debug("Just-try mode : execute INSERT query : %s", sql)
log.debug("Just-try mode: execute INSERT query: %s", sql)
return True
log.debug(sql)
if not self.doSQL(sql, params=values):
log.error("Fail to execute INSERT query (SQL : %s)", sql)
log.error("Fail to execute INSERT query (SQL: %s)", sql)
return False
return True
def update(self, table, values, where_clauses, where_op=None, just_try=False):
""" Run UPDATE SQL query """
sql = 'UPDATE "%s" SET %s' % (
sql = 'UPDATE "{0}" SET {1}'.format(
table,
", ".join([
'"{0}" = %({0})s'.format(key)
@ -301,18 +293,18 @@ class PgDB:
return False
if just_try:
log.debug("Just-try mode : execute UPDATE query : %s", sql)
log.debug("Just-try mode: execute UPDATE query: %s", sql)
return True
log.debug(sql)
if not self.doSQL(sql, params=params):
log.error("Fail to execute UPDATE query (SQL : %s)", sql)
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
return False
return True
def delete(self, table, where_clauses, where_op='AND', just_try=False):
""" Run DELETE SQL query """
sql = 'DELETE FROM "%s"' % table
sql = 'DELETE FROM "{0}"'.format(table)
params = dict()
try:
@ -322,27 +314,27 @@ class PgDB:
return False
if just_try:
log.debug("Just-try mode : execute UPDATE query : %s", sql)
log.debug("Just-try mode: execute UPDATE query: %s", sql)
return True
log.debug(sql)
if not self.doSQL(sql, params=params):
log.error("Fail to execute UPDATE query (SQL : %s)", sql)
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
return False
return True
def truncate(self, table, just_try=False):
""" Run TRUNCATE SQL query """
sql = 'TRUNCATE "%s"' % table
sql = 'TRUNCATE "{0}"'.format(table)
if just_try:
log.debug("Just-try mode : execute TRUNCATE query : %s", sql)
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
return True
log.debug(sql)
if not self.doSQL(sql):
log.error("Fail to execute TRUNCATE query (SQL : %s)", sql)
log.error("Fail to execute TRUNCATE query (SQL: %s)", sql)
return False
return True
@ -352,11 +344,11 @@ class PgDB:
if fields is None:
sql += "*"
elif isinstance(fields, str):
sql += '"%s"' % fields
sql += '"{0}"'.format(fields)
else:
sql += '"' + '", "'.join(fields) + '"'
sql += '"{0}"'.format('", "'.join(fields))
sql += ' FROM "%s"' % table
sql += ' FROM "{0}"'.format(table)
params = dict()
try:
@ -366,15 +358,14 @@ class PgDB:
return False
if order_by:
sql += ' ORDER BY %s' % order_by
sql += ' ORDER BY {0}'.format(order_by)
if just_try:
log.debug("Just-try mode : execute SELECT query : %s", sql)
log.debug("Just-try mode: execute SELECT query : %s", sql)
return just_try
return self.doSelect(sql, params=params)
#
# Depreated helpers
#

View file

@ -5,6 +5,7 @@ import pytest
from mylib.pgsql import PgDB
class FakePsycopg2Cursor:
""" Fake Psycopg2 cursor """
@ -80,38 +81,46 @@ class FakePsycopg2:
if self.just_try:
assert False, "May not be executed in just try mode"
def fake_psycopg2_connect(**kwargs):
return FakePsycopg2(**kwargs)
def fake_psycopg2_connect_just_try(**kwargs):
con = FakePsycopg2(**kwargs)
con.just_try = True
return con
@pytest.fixture
def test_pgdb():
return PgDB('127.0.0.1', 'user', 'password', 'dbname')
@pytest.fixture
def fake_pgdb(mocker):
mocker.patch('psycopg2.connect', fake_psycopg2_connect)
return PgDB('127.0.0.1', 'user', 'password', 'dbname')
@pytest.fixture
def fake_just_try_pgdb(mocker):
mocker.patch('psycopg2.connect', fake_psycopg2_connect_just_try)
return PgDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True)
@pytest.fixture
def fake_connected_pgdb(fake_pgdb):
fake_pgdb.connect()
return fake_pgdb
@pytest.fixture
def fake_connected_just_try_pgdb(fake_just_try_pgdb):
fake_just_try_pgdb.connect()
return fake_just_try_pgdb
def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value
def mock_args(*args, **kwargs):
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
@ -119,9 +128,11 @@ def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return
return expected_return
return mock_args
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
assert False, "doSQL() may not be executed in just try mode"
def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
@ -129,6 +140,7 @@ def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=Tr
return expected_return
return mock_doSQL
# PgDB.doSelect() have same expected parameters as PgDB.doSQL()
generate_mock_doSelect = generate_mock_doSQL
mock_doSelect_just_try = mock_doSQL_just_try
@ -136,28 +148,35 @@ mock_doSelect_just_try = mock_doSQL_just_try
#
# Test on PgDB helper methods
#
def test_combine_params_with_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2)) == dict(
test1=1, test2=2
)
def test_combine_params_with_kargs():
assert PgDB._combine_params(dict(test1=1), test2=2) == dict(
test1=1, test2=2
)
def test_combine_params_with_kargs_and_to_add_parameter():
assert PgDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
test1=1, test2=2, test3=3
)
def test_format_where_clauses_params_are_preserved():
args=('test = test', dict(test1=1))
args = ('test = test', dict(test1=1))
assert PgDB._format_where_clauses(*args) == args
def test_format_where_clauses_raw():
assert PgDB._format_where_clauses('test = test') == (('test = test'), dict())
def test_format_where_clauses_tuple_clause_with_params():
where_clauses = (
'test1 = %(test1)s AND test2 = %(test2)s',
@ -165,6 +184,7 @@ def test_format_where_clauses_tuple_clause_with_params():
)
assert PgDB._format_where_clauses(where_clauses) == where_clauses
def test_format_where_clauses_dict():
where_clauses = dict(test1=1, test2=2)
assert PgDB._format_where_clauses(where_clauses) == (
@ -172,6 +192,7 @@ def test_format_where_clauses_dict():
where_clauses
)
def test_format_where_clauses_combined_types():
where_clauses = (
'test1 = 1',
@ -183,6 +204,7 @@ def test_format_where_clauses_combined_types():
dict(test2=2, test3=3, test4=4)
)
def test_format_where_clauses_with_where_op():
where_clauses = dict(test1=1, test2=2)
assert PgDB._format_where_clauses(where_clauses, where_op='OR') == (
@ -190,16 +212,18 @@ def test_format_where_clauses_with_where_op():
where_clauses
)
def test_add_where_clauses():
sql="SELECT * FROM table"
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
assert PgDB._add_where_clauses(sql, None, where_clauses) == (
sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s',
where_clauses
)
def test_add_where_clauses_preserved_params():
sql="SELECT * FROM table"
sql = "SELECT * FROM table"
where_clauses = dict(test1=1, test2=2)
params = dict(fake1=1)
assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == (
@ -207,16 +231,18 @@ def test_add_where_clauses_preserved_params():
dict(**where_clauses, **params)
)
def test_add_where_clauses_with_op():
sql="SELECT * FROM table"
sql = "SELECT * FROM table"
where_clauses = ('test1=1', 'test2=2')
assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
sql + ' WHERE test1=1 OR test2=2',
dict()
)
def test_add_where_clauses_with_duplicated_field():
sql="UPDATE table SET test1=%(test1)s"
sql = "UPDATE table SET test1=%(test1)s"
params = dict(test1='new_value')
where_clauses = dict(test1='where_value')
assert PgDB._add_where_clauses(sql, params, where_clauses) == (
@ -224,6 +250,7 @@ def test_add_where_clauses_with_duplicated_field():
dict(test1='new_value', test1_1='where_value')
)
def test_insert(mocker, test_pgdb):
values = dict(test1=1, test2=2)
mocker.patch(
@ -236,10 +263,12 @@ def test_insert(mocker, test_pgdb):
assert test_pgdb.insert('mytable', values)
def test_insert_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.insert('mytable', dict(test1=1, test2=2), just_try=True)
def test_update(mocker, test_pgdb):
values = dict(test1=1, test2=2)
where_clauses = dict(test3=3, test4=4)
@ -253,10 +282,12 @@ def test_update(mocker, test_pgdb):
assert test_pgdb.update('mytable', values, where_clauses)
def test_update_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.update('mytable', dict(test1=1, test2=2), None, just_try=True)
def test_delete(mocker, test_pgdb):
where_clauses = dict(test1=1, test2=2)
mocker.patch(
@ -269,10 +300,12 @@ def test_delete(mocker, test_pgdb):
assert test_pgdb.delete('mytable', where_clauses)
def test_delete_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try)
assert test_pgdb.delete('mytable', None, just_try=True)
def test_truncate(mocker, test_pgdb):
mocker.patch(
'mylib.pgsql.PgDB.doSQL',
@ -281,10 +314,12 @@ def test_truncate(mocker, test_pgdb):
assert test_pgdb.truncate('mytable')
def test_truncate_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try)
assert test_pgdb.truncate('mytable', just_try=True)
def test_select(mocker, test_pgdb):
fields = ('field1', 'field2')
where_clauses = dict(test3=3, test4=4)
@ -303,6 +338,7 @@ def test_select(mocker, test_pgdb):
assert test_pgdb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return
def test_select_without_field_and_order_by(mocker, test_pgdb):
mocker.patch(
'mylib.pgsql.PgDB.doSelect',
@ -313,6 +349,7 @@ def test_select_without_field_and_order_by(mocker, test_pgdb):
assert test_pgdb.select('mytable')
def test_select_just_try(mocker, test_pgdb):
mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try)
assert test_pgdb.select('mytable', None, None, just_try=True)
@ -321,12 +358,13 @@ def test_select_just_try(mocker, test_pgdb):
# Tests on main methods
#
def test_connect(mocker, test_pgdb):
expected_kwargs = dict(
dbname=test_pgdb.db,
user=test_pgdb.user,
host=test_pgdb.host,
password=test_pgdb.pwd
dbname=test_pgdb._db,
user=test_pgdb._user,
host=test_pgdb._host,
password=test_pgdb._pwd
)
mocker.patch(
@ -338,58 +376,71 @@ def test_connect(mocker, test_pgdb):
assert test_pgdb.connect()
def test_close(fake_pgdb):
assert fake_pgdb.close() is None
def test_close_connected(fake_connected_pgdb):
assert fake_connected_pgdb.close() is None
def test_setEncoding(fake_connected_pgdb):
assert fake_connected_pgdb.setEncoding('utf8')
def test_setEncoding_not_connected(fake_pgdb):
assert fake_pgdb.setEncoding('utf8') is False
def test_setEncoding_on_exception(fake_connected_pgdb):
fake_connected_pgdb.con.expected_exception = True
fake_connected_pgdb._conn.expected_exception = True
assert fake_connected_pgdb.setEncoding('utf8') is False
def test_doSQL(fake_connected_pgdb):
fake_connected_pgdb.con.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb.con.expected_params = dict(test1=1)
fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params)
fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params)
def test_doSQL_without_params(fake_connected_pgdb):
fake_connected_pgdb.con.expected_sql = 'DELETE FROM table'
fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql)
fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table'
fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql)
def test_doSQL_just_try(fake_connected_just_try_pgdb):
assert fake_connected_just_try_pgdb.doSQL('DELETE FROM table')
def test_doSQL_on_exception(fake_connected_pgdb):
fake_connected_pgdb.con.expected_exception = True
fake_connected_pgdb._conn.expected_exception = True
assert fake_connected_pgdb.doSQL('DELETE FROM table') is False
def test_doSelect(fake_connected_pgdb):
fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb.con.expected_params = dict(test1=1)
fake_connected_pgdb.con.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params) == fake_connected_pgdb.con.expected_return
fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_pgdb._conn.expected_params = dict(test1=1)
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params) == fake_connected_pgdb._conn.expected_return
def test_doSelect_without_params(fake_connected_pgdb):
fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table'
fake_connected_pgdb.con.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql) == fake_connected_pgdb.con.expected_return
fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table'
fake_connected_pgdb._conn.expected_return = [dict(test1=1)]
assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql) == fake_connected_pgdb._conn.expected_return
def test_doSelect_on_exception(fake_connected_pgdb):
fake_connected_pgdb.con.expected_exception = True
fake_connected_pgdb._conn.expected_exception = True
assert fake_connected_pgdb.doSelect('SELECT * FROM table') is False
def test_doSelect_just_try(fake_connected_just_try_pgdb):
fake_connected_just_try_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_just_try_pgdb.con.expected_params = dict(test1=1)
fake_connected_just_try_pgdb.con.expected_return = [dict(test1=1)]
fake_connected_just_try_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s'
fake_connected_just_try_pgdb._conn.expected_params = dict(test1=1)
fake_connected_just_try_pgdb._conn.expected_return = [dict(test1=1)]
assert fake_connected_just_try_pgdb.doSelect(
fake_connected_just_try_pgdb.con.expected_sql,
fake_connected_just_try_pgdb.con.expected_params
) == fake_connected_just_try_pgdb.con.expected_return
fake_connected_just_try_pgdb._conn.expected_sql,
fake_connected_just_try_pgdb._conn.expected_params
) == fake_connected_just_try_pgdb._conn.expected_return