diff --git a/mylib/pgsql.py b/mylib/pgsql.py index 606fc9e..b8e559a 100644 --- a/mylib/pgsql.py +++ b/mylib/pgsql.py @@ -53,52 +53,46 @@ class PgDBUnsupportedWHEREClauses(PgDBException, TypeError): class PgDB: """ PostgreSQL client """ - host = "" - user = "" - pwd = "" - db = "" - - con = 0 - date_format = '%Y-%m-%d' datetime_format = '%Y-%m-%d %H:%M:%S' def __init__(self, host, user, pwd, db, just_try=False): - self.host = host - self.user = user - self.pwd = pwd - self.db = db + self._host = host + self._user = user + self._pwd = pwd + self._db = db + self._conn = None self.just_try = just_try def connect(self): """ Connect to PostgreSQL server """ - if self.con == 0: + if self._conn is None: try: - con = psycopg2.connect( - dbname=self.db, - user=self.user, - host=self.host, - password=self.pwd + self._conn = psycopg2.connect( + dbname=self._db, + user=self._user, + host=self._host, + password=self._pwd ) - self.con = con except Exception: - logging.fatal( + log.fatal( 'An error occured during Postgresql database connection (%s@%s, database=%s).', - self.user, self.host, self.db, exc_info=1 + self._user, self._host, self._db, exc_info=1 ) sys.exit(1) return True def close(self): """ Close connection with PostgreSQL server (if opened) """ - if self.con: - self.con.close() + if self._conn: + self._conn.close() + self._conn = None def setEncoding(self, enc): """ Set connection encoding """ - if self.con: + if self._conn: try: - self.con.set_client_encoding(enc) + self._conn.set_client_encoding(enc) return True except Exception: log.error( @@ -121,13 +115,13 @@ class PgDB: log.debug("Just-try mode : do not really execute SQL query '%s'", sql) return True - cursor = self.con.cursor() + cursor = self._conn.cursor() try: if params is None: cursor.execute(sql) else: cursor.execute(sql, params) - self.con.commit() + self._conn.commit() return True except Exception: log.error( @@ -139,7 +133,7 @@ class PgDB: ]) if params else "without params", exc_info=True ) - self.con.rollback() + self._conn.rollback() return False def doSelect(self, sql, params=None): @@ -152,7 +146,7 @@ class PgDB: :return: List of selected rows as dict on success, False otherwise :rtype: list, bool """ - cursor = self.con.cursor() + cursor = self._conn.cursor() try: cursor.execute(sql, params) results = cursor.fetchall() @@ -169,7 +163,6 @@ class PgDB: ) return False - # # SQL helpers # @@ -261,10 +254,9 @@ class PgDB: sql += " WHERE " + sql_where return (sql, params) - def insert(self, table, values, just_try=False): """ Run INSERT SQL query """ - sql = 'INSERT INTO "%s" ("%s") VALUES (%s)' % ( + sql = 'INSERT INTO "{0}" ("{1}") VALUES ({2})'.format( table, '", "'.join(values.keys()), ", ".join([ @@ -274,18 +266,18 @@ class PgDB: ) if just_try: - log.debug("Just-try mode : execute INSERT query : %s", sql) + log.debug("Just-try mode: execute INSERT query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=values): - log.error("Fail to execute INSERT query (SQL : %s)", sql) + log.error("Fail to execute INSERT query (SQL: %s)", sql) return False return True def update(self, table, values, where_clauses, where_op=None, just_try=False): """ Run UPDATE SQL query """ - sql = 'UPDATE "%s" SET %s' % ( + sql = 'UPDATE "{0}" SET {1}'.format( table, ", ".join([ '"{0}" = %({0})s'.format(key) @@ -301,18 +293,18 @@ class PgDB: return False if just_try: - log.debug("Just-try mode : execute UPDATE query : %s", sql) + log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL : %s)", sql) + log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def delete(self, table, where_clauses, where_op='AND', just_try=False): """ Run DELETE SQL query """ - sql = 'DELETE FROM "%s"' % table + sql = 'DELETE FROM "{0}"'.format(table) params = dict() try: @@ -322,27 +314,27 @@ class PgDB: return False if just_try: - log.debug("Just-try mode : execute UPDATE query : %s", sql) + log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): - log.error("Fail to execute UPDATE query (SQL : %s)", sql) + log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def truncate(self, table, just_try=False): """ Run TRUNCATE SQL query """ - sql = 'TRUNCATE "%s"' % table + sql = 'TRUNCATE "{0}"'.format(table) if just_try: - log.debug("Just-try mode : execute TRUNCATE query : %s", sql) + log.debug("Just-try mode: execute TRUNCATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql): - log.error("Fail to execute TRUNCATE query (SQL : %s)", sql) + log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) return False return True @@ -352,11 +344,11 @@ class PgDB: if fields is None: sql += "*" elif isinstance(fields, str): - sql += '"%s"' % fields + sql += '"{0}"'.format(fields) else: - sql += '"' + '", "'.join(fields) + '"' + sql += '"{0}"'.format('", "'.join(fields)) - sql += ' FROM "%s"' % table + sql += ' FROM "{0}"'.format(table) params = dict() try: @@ -366,15 +358,14 @@ class PgDB: return False if order_by: - sql += ' ORDER BY %s' % order_by + sql += ' ORDER BY {0}'.format(order_by) if just_try: - log.debug("Just-try mode : execute SELECT query : %s", sql) + log.debug("Just-try mode: execute SELECT query : %s", sql) return just_try return self.doSelect(sql, params=params) - # # Depreated helpers # diff --git a/tests/test_pgsql.py b/tests/test_pgsql.py index 8776c8f..254d94d 100644 --- a/tests/test_pgsql.py +++ b/tests/test_pgsql.py @@ -5,6 +5,7 @@ import pytest from mylib.pgsql import PgDB + class FakePsycopg2Cursor: """ Fake Psycopg2 cursor """ @@ -80,38 +81,46 @@ class FakePsycopg2: if self.just_try: assert False, "May not be executed in just try mode" + def fake_psycopg2_connect(**kwargs): return FakePsycopg2(**kwargs) + def fake_psycopg2_connect_just_try(**kwargs): con = FakePsycopg2(**kwargs) con.just_try = True return con + @pytest.fixture def test_pgdb(): return PgDB('127.0.0.1', 'user', 'password', 'dbname') + @pytest.fixture def fake_pgdb(mocker): mocker.patch('psycopg2.connect', fake_psycopg2_connect) return PgDB('127.0.0.1', 'user', 'password', 'dbname') + @pytest.fixture def fake_just_try_pgdb(mocker): mocker.patch('psycopg2.connect', fake_psycopg2_connect_just_try) return PgDB('127.0.0.1', 'user', 'password', 'dbname', just_try=True) + @pytest.fixture def fake_connected_pgdb(fake_pgdb): fake_pgdb.connect() return fake_pgdb + @pytest.fixture def fake_connected_just_try_pgdb(fake_just_try_pgdb): fake_just_try_pgdb.connect() return fake_just_try_pgdb + def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value def mock_args(*args, **kwargs): assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args) @@ -119,9 +128,11 @@ def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return return expected_return return mock_args + def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument assert False, "doSQL() may not be executed in just try mode" + def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql) @@ -129,6 +140,7 @@ def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=Tr return expected_return return mock_doSQL + # PgDB.doSelect() have same expected parameters as PgDB.doSQL() generate_mock_doSelect = generate_mock_doSQL mock_doSelect_just_try = mock_doSQL_just_try @@ -136,28 +148,35 @@ mock_doSelect_just_try = mock_doSQL_just_try # # Test on PgDB helper methods # + + def test_combine_params_with_to_add_parameter(): assert PgDB._combine_params(dict(test1=1), dict(test2=2)) == dict( test1=1, test2=2 ) + def test_combine_params_with_kargs(): assert PgDB._combine_params(dict(test1=1), test2=2) == dict( test1=1, test2=2 ) + def test_combine_params_with_kargs_and_to_add_parameter(): assert PgDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict( test1=1, test2=2, test3=3 ) + def test_format_where_clauses_params_are_preserved(): - args=('test = test', dict(test1=1)) + args = ('test = test', dict(test1=1)) assert PgDB._format_where_clauses(*args) == args + def test_format_where_clauses_raw(): assert PgDB._format_where_clauses('test = test') == (('test = test'), dict()) + def test_format_where_clauses_tuple_clause_with_params(): where_clauses = ( 'test1 = %(test1)s AND test2 = %(test2)s', @@ -165,6 +184,7 @@ def test_format_where_clauses_tuple_clause_with_params(): ) assert PgDB._format_where_clauses(where_clauses) == where_clauses + def test_format_where_clauses_dict(): where_clauses = dict(test1=1, test2=2) assert PgDB._format_where_clauses(where_clauses) == ( @@ -172,6 +192,7 @@ def test_format_where_clauses_dict(): where_clauses ) + def test_format_where_clauses_combined_types(): where_clauses = ( 'test1 = 1', @@ -183,6 +204,7 @@ def test_format_where_clauses_combined_types(): dict(test2=2, test3=3, test4=4) ) + def test_format_where_clauses_with_where_op(): where_clauses = dict(test1=1, test2=2) assert PgDB._format_where_clauses(where_clauses, where_op='OR') == ( @@ -190,16 +212,18 @@ def test_format_where_clauses_with_where_op(): where_clauses ) + def test_add_where_clauses(): - sql="SELECT * FROM table" + sql = "SELECT * FROM table" where_clauses = dict(test1=1, test2=2) assert PgDB._add_where_clauses(sql, None, where_clauses) == ( sql + ' WHERE "test1" = %(test1)s AND "test2" = %(test2)s', where_clauses ) + def test_add_where_clauses_preserved_params(): - sql="SELECT * FROM table" + sql = "SELECT * FROM table" where_clauses = dict(test1=1, test2=2) params = dict(fake1=1) assert PgDB._add_where_clauses(sql, params.copy(), where_clauses) == ( @@ -207,16 +231,18 @@ def test_add_where_clauses_preserved_params(): dict(**where_clauses, **params) ) + def test_add_where_clauses_with_op(): - sql="SELECT * FROM table" + sql = "SELECT * FROM table" where_clauses = ('test1=1', 'test2=2') assert PgDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == ( sql + ' WHERE test1=1 OR test2=2', dict() ) + def test_add_where_clauses_with_duplicated_field(): - sql="UPDATE table SET test1=%(test1)s" + sql = "UPDATE table SET test1=%(test1)s" params = dict(test1='new_value') where_clauses = dict(test1='where_value') assert PgDB._add_where_clauses(sql, params, where_clauses) == ( @@ -224,6 +250,7 @@ def test_add_where_clauses_with_duplicated_field(): dict(test1='new_value', test1_1='where_value') ) + def test_insert(mocker, test_pgdb): values = dict(test1=1, test2=2) mocker.patch( @@ -236,10 +263,12 @@ def test_insert(mocker, test_pgdb): assert test_pgdb.insert('mytable', values) + def test_insert_just_try(mocker, test_pgdb): mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) assert test_pgdb.insert('mytable', dict(test1=1, test2=2), just_try=True) + def test_update(mocker, test_pgdb): values = dict(test1=1, test2=2) where_clauses = dict(test3=3, test4=4) @@ -253,10 +282,12 @@ def test_update(mocker, test_pgdb): assert test_pgdb.update('mytable', values, where_clauses) + def test_update_just_try(mocker, test_pgdb): mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) assert test_pgdb.update('mytable', dict(test1=1, test2=2), None, just_try=True) + def test_delete(mocker, test_pgdb): where_clauses = dict(test1=1, test2=2) mocker.patch( @@ -269,10 +300,12 @@ def test_delete(mocker, test_pgdb): assert test_pgdb.delete('mytable', where_clauses) + def test_delete_just_try(mocker, test_pgdb): mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSQL_just_try) assert test_pgdb.delete('mytable', None, just_try=True) + def test_truncate(mocker, test_pgdb): mocker.patch( 'mylib.pgsql.PgDB.doSQL', @@ -281,10 +314,12 @@ def test_truncate(mocker, test_pgdb): assert test_pgdb.truncate('mytable') + def test_truncate_just_try(mocker, test_pgdb): mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try) assert test_pgdb.truncate('mytable', just_try=True) + def test_select(mocker, test_pgdb): fields = ('field1', 'field2') where_clauses = dict(test3=3, test4=4) @@ -303,6 +338,7 @@ def test_select(mocker, test_pgdb): assert test_pgdb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return + def test_select_without_field_and_order_by(mocker, test_pgdb): mocker.patch( 'mylib.pgsql.PgDB.doSelect', @@ -313,6 +349,7 @@ def test_select_without_field_and_order_by(mocker, test_pgdb): assert test_pgdb.select('mytable') + def test_select_just_try(mocker, test_pgdb): mocker.patch('mylib.pgsql.PgDB.doSQL', mock_doSelect_just_try) assert test_pgdb.select('mytable', None, None, just_try=True) @@ -321,12 +358,13 @@ def test_select_just_try(mocker, test_pgdb): # Tests on main methods # + def test_connect(mocker, test_pgdb): expected_kwargs = dict( - dbname=test_pgdb.db, - user=test_pgdb.user, - host=test_pgdb.host, - password=test_pgdb.pwd + dbname=test_pgdb._db, + user=test_pgdb._user, + host=test_pgdb._host, + password=test_pgdb._pwd ) mocker.patch( @@ -338,58 +376,71 @@ def test_connect(mocker, test_pgdb): assert test_pgdb.connect() + def test_close(fake_pgdb): assert fake_pgdb.close() is None + def test_close_connected(fake_connected_pgdb): assert fake_connected_pgdb.close() is None + def test_setEncoding(fake_connected_pgdb): assert fake_connected_pgdb.setEncoding('utf8') + def test_setEncoding_not_connected(fake_pgdb): assert fake_pgdb.setEncoding('utf8') is False + def test_setEncoding_on_exception(fake_connected_pgdb): - fake_connected_pgdb.con.expected_exception = True + fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.setEncoding('utf8') is False + def test_doSQL(fake_connected_pgdb): - fake_connected_pgdb.con.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s' - fake_connected_pgdb.con.expected_params = dict(test1=1) - fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params) + fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table WHERE test1 = %(test1)s' + fake_connected_pgdb._conn.expected_params = dict(test1=1) + fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params) + def test_doSQL_without_params(fake_connected_pgdb): - fake_connected_pgdb.con.expected_sql = 'DELETE FROM table' - fake_connected_pgdb.doSQL(fake_connected_pgdb.con.expected_sql) + fake_connected_pgdb._conn.expected_sql = 'DELETE FROM table' + fake_connected_pgdb.doSQL(fake_connected_pgdb._conn.expected_sql) + def test_doSQL_just_try(fake_connected_just_try_pgdb): assert fake_connected_just_try_pgdb.doSQL('DELETE FROM table') + def test_doSQL_on_exception(fake_connected_pgdb): - fake_connected_pgdb.con.expected_exception = True + fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.doSQL('DELETE FROM table') is False + def test_doSelect(fake_connected_pgdb): - fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' - fake_connected_pgdb.con.expected_params = dict(test1=1) - fake_connected_pgdb.con.expected_return = [dict(test1=1)] - assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql, fake_connected_pgdb.con.expected_params) == fake_connected_pgdb.con.expected_return + fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_pgdb._conn.expected_params = dict(test1=1) + fake_connected_pgdb._conn.expected_return = [dict(test1=1)] + assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql, fake_connected_pgdb._conn.expected_params) == fake_connected_pgdb._conn.expected_return + def test_doSelect_without_params(fake_connected_pgdb): - fake_connected_pgdb.con.expected_sql = 'SELECT * FROM table' - fake_connected_pgdb.con.expected_return = [dict(test1=1)] - assert fake_connected_pgdb.doSelect(fake_connected_pgdb.con.expected_sql) == fake_connected_pgdb.con.expected_return + fake_connected_pgdb._conn.expected_sql = 'SELECT * FROM table' + fake_connected_pgdb._conn.expected_return = [dict(test1=1)] + assert fake_connected_pgdb.doSelect(fake_connected_pgdb._conn.expected_sql) == fake_connected_pgdb._conn.expected_return + def test_doSelect_on_exception(fake_connected_pgdb): - fake_connected_pgdb.con.expected_exception = True + fake_connected_pgdb._conn.expected_exception = True assert fake_connected_pgdb.doSelect('SELECT * FROM table') is False + def test_doSelect_just_try(fake_connected_just_try_pgdb): - fake_connected_just_try_pgdb.con.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' - fake_connected_just_try_pgdb.con.expected_params = dict(test1=1) - fake_connected_just_try_pgdb.con.expected_return = [dict(test1=1)] + fake_connected_just_try_pgdb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = %(test1)s' + fake_connected_just_try_pgdb._conn.expected_params = dict(test1=1) + fake_connected_just_try_pgdb._conn.expected_return = [dict(test1=1)] assert fake_connected_just_try_pgdb.doSelect( - fake_connected_just_try_pgdb.con.expected_sql, - fake_connected_just_try_pgdb.con.expected_params - ) == fake_connected_just_try_pgdb.con.expected_return + fake_connected_just_try_pgdb._conn.expected_sql, + fake_connected_just_try_pgdb._conn.expected_params + ) == fake_connected_just_try_pgdb._conn.expected_return