Add oracle lib
Based on mylib.pgsql.PgDB exiting lib and cx_Oracle, mylib.oracle.OracleDB provide an easy way to execute simple SQL query on an Oracle Database server.
This commit is contained in:
parent
dec88f814e
commit
7b1019cb1b
3 changed files with 796 additions and 0 deletions
361
mylib/oracle.py
Normal file
361
mylib/oracle.py
Normal file
|
@ -0,0 +1,361 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" Oracle client """
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import cx_Oracle
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
#
|
||||
# Exceptions
|
||||
#
|
||||
|
||||
class OracleDBException(Exception):
|
||||
""" That is the base exception class for all the other exceptions provided by this module. """
|
||||
|
||||
def __init__(self, error, *args, **kwargs):
|
||||
for arg, value in kwargs.items():
|
||||
setattr(self, arg, value)
|
||||
super().__init__(error.format(*args, **kwargs))
|
||||
|
||||
|
||||
class OracleDBDuplicatedSQLParameter(OracleDBException, KeyError):
|
||||
"""
|
||||
Raised when trying to set a SQL query parameter
|
||||
and an other parameter with the same name is already set
|
||||
"""
|
||||
|
||||
def __init__(self, parameter_name):
|
||||
super().__init__(
|
||||
"Duplicated SQL parameter '{parameter_name}'",
|
||||
parameter_name=parameter_name
|
||||
)
|
||||
|
||||
|
||||
class OracleDBUnsupportedWHEREClauses(OracleDBException, TypeError):
|
||||
"""
|
||||
Raised when trying to execute query with unsupported
|
||||
WHERE clauses provided
|
||||
"""
|
||||
|
||||
def __init__(self, where_clauses):
|
||||
super().__init__(
|
||||
"Unsupported WHERE clauses: {where_clauses}",
|
||||
where_clauses=where_clauses
|
||||
)
|
||||
|
||||
|
||||
class OracleDB:
|
||||
""" Oracle client """
|
||||
|
||||
def __init__(self, dsn, user, pwd, just_try=False):
|
||||
self._dsn = dsn
|
||||
self._user = user
|
||||
self._pwd = pwd
|
||||
self._conn = None
|
||||
self.just_try = just_try
|
||||
|
||||
def connect(self):
|
||||
""" Connect to Oracle server """
|
||||
if self._conn is None:
|
||||
try:
|
||||
self._conn = cx_Oracle.connect(
|
||||
user=self._user,
|
||||
password=self._pwd,
|
||||
dsn=self._dsn
|
||||
)
|
||||
except Exception:
|
||||
log.fatal(
|
||||
'An error occured during Oracle database connection (%s@%s).',
|
||||
self._user, self._dsn, exc_info=1
|
||||
)
|
||||
sys.exit(1)
|
||||
return True
|
||||
|
||||
def close(self):
|
||||
""" Close connection with Oracle server (if opened) """
|
||||
if self._conn:
|
||||
self._conn.close()
|
||||
self._conn = None
|
||||
|
||||
def doSQL(self, sql, params=None):
|
||||
"""
|
||||
Run SQL query and commit changes (rollback on error)
|
||||
|
||||
:param sql: The SQL query
|
||||
:param params: The SQL query's parameters as dict (optional)
|
||||
|
||||
:return: True on success, False otherwise
|
||||
:rtype: bool
|
||||
"""
|
||||
if self.just_try:
|
||||
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
|
||||
return True
|
||||
|
||||
cursor = self._conn.cursor()
|
||||
try:
|
||||
if isinstance(params, dict):
|
||||
cursor.execute(sql, **params)
|
||||
else:
|
||||
cursor.execute(sql)
|
||||
self._conn.commit()
|
||||
return True
|
||||
except Exception:
|
||||
log.error(
|
||||
'Error during SQL request "%s" %s',
|
||||
sql,
|
||||
"with params = %s" % ', '.join([
|
||||
"%s = %s" % (key, value)
|
||||
for key, value in params.items()
|
||||
]) if params else "without params",
|
||||
exc_info=True
|
||||
)
|
||||
self._conn.rollback()
|
||||
return False
|
||||
|
||||
def doSelect(self, sql, params=None):
|
||||
"""
|
||||
Run SELECT SQL query and return list of selected rows as dict
|
||||
|
||||
:param sql: The SQL query
|
||||
:param params: The SQL query's parameters as dict (optional)
|
||||
|
||||
:return: List of selected rows as dict on success, False otherwise
|
||||
:rtype: list, bool
|
||||
"""
|
||||
cursor = self._conn.cursor()
|
||||
try:
|
||||
if isinstance(params, dict):
|
||||
cursor.execute(sql, **params)
|
||||
else:
|
||||
cursor.execute(sql)
|
||||
cursor.rowfactory = lambda *args: dict(zip([d[0] for d in cursor.description], args))
|
||||
results = cursor.fetchall()
|
||||
return results
|
||||
except Exception:
|
||||
log.error(
|
||||
'Error during SQL request "%s" %s',
|
||||
sql,
|
||||
"with params = %s" % ', '.join([
|
||||
"%s = %s" % (key, value)
|
||||
for key, value in params.items()
|
||||
]) if params else "without params",
|
||||
exc_info=True
|
||||
)
|
||||
return False
|
||||
|
||||
#
|
||||
# SQL helpers
|
||||
#
|
||||
|
||||
@classmethod
|
||||
def _combine_params(cls, params, to_add=None, **kwargs):
|
||||
if to_add:
|
||||
assert isinstance(to_add, dict), "to_add must be a dict or None"
|
||||
params = cls._combine_params(params, **to_add)
|
||||
|
||||
for param, value in kwargs.items():
|
||||
if param in params:
|
||||
raise OracleDBDuplicatedSQLParameter(param)
|
||||
params[param] = value
|
||||
return params
|
||||
|
||||
@classmethod
|
||||
def _format_where_clauses(cls, where_clauses, params=None, where_op=None):
|
||||
"""
|
||||
Format WHERE clauses
|
||||
|
||||
:param where_clauses: The WHERE clauses. Could be:
|
||||
- a raw SQL WHERE clause as string
|
||||
- a tuple of two elements: a raw WHERE clause and its parameters as dict
|
||||
- a dict of WHERE clauses with field name as key and WHERE clause value as value
|
||||
- a list of any of previous valid WHERE clauses
|
||||
:param params: Dict of other already set SQL query parameters (optional)
|
||||
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND)
|
||||
|
||||
:return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success
|
||||
:rtype: string, bool
|
||||
"""
|
||||
if params is None:
|
||||
params = dict()
|
||||
if where_op is None:
|
||||
where_op = 'AND'
|
||||
|
||||
if isinstance(where_clauses, str):
|
||||
return (where_clauses, params)
|
||||
|
||||
if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict):
|
||||
cls._combine_params(params, where_clauses[1])
|
||||
return (where_clauses[0], params)
|
||||
|
||||
if isinstance(where_clauses, (list, tuple)):
|
||||
sql_where_clauses = []
|
||||
for where_clause in where_clauses:
|
||||
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
||||
sql_where_clauses.append(sql2)
|
||||
return (
|
||||
(" %s " % where_op).join(sql_where_clauses),
|
||||
params
|
||||
)
|
||||
|
||||
if isinstance(where_clauses, dict):
|
||||
sql_where_clauses = []
|
||||
for field, value in where_clauses.items():
|
||||
param = field
|
||||
if field in params:
|
||||
idx = 1
|
||||
while param in params:
|
||||
param = '%s_%d' % (field, idx)
|
||||
idx += 1
|
||||
cls._combine_params(params, {param: value})
|
||||
sql_where_clauses.append(
|
||||
'"{field}" = :{param}'.format(field=field, param=param)
|
||||
)
|
||||
return (
|
||||
(" %s " % where_op).join(sql_where_clauses),
|
||||
params
|
||||
)
|
||||
raise OracleDBUnsupportedWHEREClauses(where_clauses)
|
||||
|
||||
@classmethod
|
||||
def _add_where_clauses(cls, sql, params, where_clauses, where_op=None):
|
||||
"""
|
||||
Add WHERE clauses to an SQL query
|
||||
|
||||
:param sql: The SQL query to complete
|
||||
:param params: The dict of parameters of the SQL query to complete
|
||||
:param where_clauses: The WHERE clause (see _format_where_clauses())
|
||||
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses())
|
||||
|
||||
:return:
|
||||
:rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters
|
||||
"""
|
||||
if where_clauses:
|
||||
sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op)
|
||||
sql += " WHERE " + sql_where
|
||||
return (sql, params)
|
||||
|
||||
@staticmethod
|
||||
def _quote_table_name(table):
|
||||
""" Quote table name """
|
||||
return '"{0}"'.format(
|
||||
'"."'.join(
|
||||
table.split('.')
|
||||
)
|
||||
)
|
||||
|
||||
def insert(self, table, values, just_try=False):
|
||||
""" Run INSERT SQL query """
|
||||
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
||||
self._quote_table_name(table),
|
||||
'", "'.join(values.keys()),
|
||||
", ".join([
|
||||
':{0}'.format(key)
|
||||
for key in values
|
||||
])
|
||||
)
|
||||
|
||||
if just_try:
|
||||
log.debug("Just-try mode: execute INSERT query: %s", sql)
|
||||
return True
|
||||
|
||||
log.debug(sql)
|
||||
if not self.doSQL(sql, params=values):
|
||||
log.error("Fail to execute INSERT query (SQL: %s)", sql)
|
||||
return False
|
||||
return True
|
||||
|
||||
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
||||
""" Run UPDATE SQL query """
|
||||
sql = 'UPDATE {0} SET {1}'.format(
|
||||
self._quote_table_name(table),
|
||||
", ".join([
|
||||
'"{0}" = :{0}'.format(key)
|
||||
for key in values
|
||||
])
|
||||
)
|
||||
params = values
|
||||
|
||||
try:
|
||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||||
log.error('Fail to add WHERE clauses', exc_info=True)
|
||||
return False
|
||||
|
||||
if just_try:
|
||||
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
||||
return True
|
||||
|
||||
log.debug(sql)
|
||||
if not self.doSQL(sql, params=params):
|
||||
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
||||
return False
|
||||
return True
|
||||
|
||||
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
||||
""" Run DELETE SQL query """
|
||||
sql = 'DELETE FROM {0}'.format(self._quote_table_name(table))
|
||||
params = dict()
|
||||
|
||||
try:
|
||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||||
log.error('Fail to add WHERE clauses', exc_info=True)
|
||||
return False
|
||||
|
||||
if just_try:
|
||||
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
||||
return True
|
||||
|
||||
log.debug(sql)
|
||||
if not self.doSQL(sql, params=params):
|
||||
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
||||
return False
|
||||
return True
|
||||
|
||||
def truncate(self, table, just_try=False):
|
||||
""" Run TRUNCATE SQL query """
|
||||
|
||||
sql = 'TRUNCATE {0}'.format(self._quote_table_name(table))
|
||||
|
||||
if just_try:
|
||||
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
||||
return True
|
||||
|
||||
log.debug(sql)
|
||||
if not self.doSQL(sql):
|
||||
log.error("Fail to execute TRUNCATE query (SQL: %s)", sql)
|
||||
return False
|
||||
return True
|
||||
|
||||
def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False):
|
||||
""" Run SELECT SQL query """
|
||||
sql = "SELECT "
|
||||
if fields is None:
|
||||
sql += "*"
|
||||
elif isinstance(fields, str):
|
||||
sql += '"{0}"'.format(fields)
|
||||
else:
|
||||
sql += '"{0}"'.format('", "'.join(fields))
|
||||
|
||||
sql += ' FROM {0}'.format(self._quote_table_name(table))
|
||||
params = dict()
|
||||
|
||||
try:
|
||||
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||||
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||||
log.error('Fail to add WHERE clauses', exc_info=True)
|
||||
return False
|
||||
|
||||
if order_by:
|
||||
sql += ' ORDER BY {0}'.format(order_by)
|
||||
|
||||
if just_try:
|
||||
log.debug("Just-try mode: execute SELECT query : %s", sql)
|
||||
return just_try
|
||||
|
||||
return self.doSelect(sql, params=params)
|
3
setup.py
3
setup.py
|
@ -45,6 +45,9 @@ setup(
|
|||
'pgsql': [
|
||||
'psycopg2',
|
||||
],
|
||||
'oracle': [
|
||||
'cx_Oracle',
|
||||
],
|
||||
'mysql': [
|
||||
'mysqlclient',
|
||||
],
|
||||
|
|
432
tests/test_oracle.py
Normal file
432
tests/test_oracle.py
Normal file
|
@ -0,0 +1,432 @@
|
|||
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access
|
||||
""" Tests on opening hours helpers """
|
||||
|
||||
import pytest
|
||||
|
||||
from mylib.oracle import OracleDB
|
||||
|
||||
|
||||
class FakeCXOracleCursor:
|
||||
""" Fake cx_Oracle cursor """
|
||||
|
||||
def __init__(self, expected_sql, expected_params, expected_return, expected_just_try, expected_exception):
|
||||
self.expected_sql = expected_sql
|
||||
self.expected_params = expected_params
|
||||
self.expected_return = expected_return
|
||||
self.expected_just_try = expected_just_try
|
||||
self.expected_exception = expected_exception
|
||||
|
||||
def execute(self, sql, **params):
|
||||
if self.expected_exception:
|
||||
raise Exception("%s.execute(%s, %s): expected exception" % (self, sql, params))
|
||||
if self.expected_just_try and not sql.lower().startswith('select '):
|
||||
assert False, "%s.execute(%s, %s) may not be executed in just try mode" % (self, sql, params)
|
||||
assert sql == self.expected_sql, "%s.execute(): Invalid SQL query:\n '%s'\nMay be:\n '%s'" % (self, sql, self.expected_sql)
|
||||
assert params == self.expected_params, "%s.execute(): Invalid params:\n %s\nMay be:\n %s" % (self, params, self.expected_params)
|
||||
return self.expected_return
|
||||
|
||||
def fetchall(self):
|
||||
return self.expected_return
|
||||
|
||||
def __repr__(self):
|
||||
return "FakeCXOracleCursor(%s, %s, %s, %s)" % (
|
||||
self.expected_sql, self.expected_params,
|
||||
self.expected_return, self.expected_just_try
|
||||
)
|
||||
|
||||
|
||||
class FakeCXOracle:
|
||||
""" Fake cx_Oracle connection """
|
||||
|
||||
expected_sql = None
|
||||
expected_params = dict()
|
||||
expected_return = True
|
||||
expected_just_try = False
|
||||
expected_exception = False
|
||||
just_try = False
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
allowed_kwargs = dict(dsn=str, user=str, password=(str, None))
|
||||
for arg, value in kwargs.items():
|
||||
assert arg in allowed_kwargs, "Invalid arg %s='%s'" % (arg, value)
|
||||
assert isinstance(value, allowed_kwargs[arg]), "Arg %s not a %s (%s)" % (arg, allowed_kwargs[arg], type(value))
|
||||
setattr(self, arg, value)
|
||||
|
||||
def close(self):
|
||||
return self.expected_return
|
||||
|
||||
def cursor(self):
|
||||
return FakeCXOracleCursor(
|
||||
self.expected_sql, self.expected_params,
|
||||
self.expected_return, self.expected_just_try or self.just_try,
|
||||
self.expected_exception
|
||||
)
|
||||
|
||||
def commit(self):
|
||||
self._check_just_try()
|
||||
return self.expected_return
|
||||
|
||||
def rollback(self):
|
||||
self._check_just_try()
|
||||
return self.expected_return
|
||||
|
||||
def _check_just_try(self):
|
||||
if self.just_try:
|
||||
assert False, "May not be executed in just try mode"
|
||||
|
||||
|
||||
def fake_cxoracle_connect(**kwargs):
|
||||
return FakeCXOracle(**kwargs)
|
||||
|
||||
|
||||
def fake_cxoracle_connect_just_try(**kwargs):
|
||||
con = FakeCXOracle(**kwargs)
|
||||
con.just_try = True
|
||||
return con
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_oracledb():
|
||||
return OracleDB('127.0.0.1/dbname', 'user', 'password')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_oracledb(mocker):
|
||||
mocker.patch('cx_Oracle.connect', fake_cxoracle_connect)
|
||||
return OracleDB('127.0.0.1/dbname', 'user', 'password')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_just_try_oracledb(mocker):
|
||||
mocker.patch('cx_Oracle.connect', fake_cxoracle_connect_just_try)
|
||||
return OracleDB('127.0.0.1/dbname', 'user', 'password', just_try=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_connected_oracledb(fake_oracledb):
|
||||
fake_oracledb.connect()
|
||||
return fake_oracledb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_connected_just_try_oracledb(fake_just_try_oracledb):
|
||||
fake_just_try_oracledb.connect()
|
||||
return fake_just_try_oracledb
|
||||
|
||||
|
||||
def generate_mock_args(expected_args=(), expected_kwargs=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||
def mock_args(*args, **kwargs):
|
||||
assert args == expected_args, "Invalid call args:\n %s\nMay be:\n %s" % (args, expected_args)
|
||||
assert kwargs == expected_kwargs, "Invalid call kwargs:\n %s\nMay be:\n %s" % (kwargs, expected_kwargs)
|
||||
return expected_return
|
||||
return mock_args
|
||||
|
||||
|
||||
def mock_doSQL_just_try(self, sql, params=None): # pylint: disable=unused-argument
|
||||
assert False, "doSQL() may not be executed in just try mode"
|
||||
|
||||
|
||||
def generate_mock_doSQL(expected_sql, expected_params=dict(), expected_return=True): # pylint: disable=dangerous-default-value
|
||||
def mock_doSQL(self, sql, params=None): # pylint: disable=unused-argument
|
||||
assert sql == expected_sql, "Invalid generated SQL query:\n '%s'\nMay be:\n '%s'" % (sql, expected_sql)
|
||||
assert params == expected_params, "Invalid generated params:\n %s\nMay be:\n %s" % (params, expected_params)
|
||||
return expected_return
|
||||
return mock_doSQL
|
||||
|
||||
|
||||
# OracleDB.doSelect() have same expected parameters as OracleDB.doSQL()
|
||||
generate_mock_doSelect = generate_mock_doSQL
|
||||
mock_doSelect_just_try = mock_doSQL_just_try
|
||||
|
||||
#
|
||||
# Test on OracleDB helper methods
|
||||
#
|
||||
|
||||
|
||||
def test_combine_params_with_to_add_parameter():
|
||||
assert OracleDB._combine_params(dict(test1=1), dict(test2=2)) == dict(
|
||||
test1=1, test2=2
|
||||
)
|
||||
|
||||
|
||||
def test_combine_params_with_kargs():
|
||||
assert OracleDB._combine_params(dict(test1=1), test2=2) == dict(
|
||||
test1=1, test2=2
|
||||
)
|
||||
|
||||
|
||||
def test_combine_params_with_kargs_and_to_add_parameter():
|
||||
assert OracleDB._combine_params(dict(test1=1), dict(test2=2), test3=3) == dict(
|
||||
test1=1, test2=2, test3=3
|
||||
)
|
||||
|
||||
|
||||
def test_format_where_clauses_params_are_preserved():
|
||||
args = ('test = test', dict(test1=1))
|
||||
assert OracleDB._format_where_clauses(*args) == args
|
||||
|
||||
|
||||
def test_format_where_clauses_raw():
|
||||
assert OracleDB._format_where_clauses('test = test') == (('test = test'), dict())
|
||||
|
||||
|
||||
def test_format_where_clauses_tuple_clause_with_params():
|
||||
where_clauses = (
|
||||
'test1 = :test1 AND test2 = :test2',
|
||||
dict(test1=1, test2=2)
|
||||
)
|
||||
assert OracleDB._format_where_clauses(where_clauses) == where_clauses
|
||||
|
||||
|
||||
def test_format_where_clauses_dict():
|
||||
where_clauses = dict(test1=1, test2=2)
|
||||
assert OracleDB._format_where_clauses(where_clauses) == (
|
||||
'"test1" = :test1 AND "test2" = :test2',
|
||||
where_clauses
|
||||
)
|
||||
|
||||
|
||||
def test_format_where_clauses_combined_types():
|
||||
where_clauses = (
|
||||
'test1 = 1',
|
||||
('test2 LIKE :test2', dict(test2=2)),
|
||||
dict(test3=3, test4=4)
|
||||
)
|
||||
assert OracleDB._format_where_clauses(where_clauses) == (
|
||||
'test1 = 1 AND test2 LIKE :test2 AND "test3" = :test3 AND "test4" = :test4',
|
||||
dict(test2=2, test3=3, test4=4)
|
||||
)
|
||||
|
||||
|
||||
def test_format_where_clauses_with_where_op():
|
||||
where_clauses = dict(test1=1, test2=2)
|
||||
assert OracleDB._format_where_clauses(where_clauses, where_op='OR') == (
|
||||
'"test1" = :test1 OR "test2" = :test2',
|
||||
where_clauses
|
||||
)
|
||||
|
||||
|
||||
def test_add_where_clauses():
|
||||
sql = "SELECT * FROM table"
|
||||
where_clauses = dict(test1=1, test2=2)
|
||||
assert OracleDB._add_where_clauses(sql, None, where_clauses) == (
|
||||
sql + ' WHERE "test1" = :test1 AND "test2" = :test2',
|
||||
where_clauses
|
||||
)
|
||||
|
||||
|
||||
def test_add_where_clauses_preserved_params():
|
||||
sql = "SELECT * FROM table"
|
||||
where_clauses = dict(test1=1, test2=2)
|
||||
params = dict(fake1=1)
|
||||
assert OracleDB._add_where_clauses(sql, params.copy(), where_clauses) == (
|
||||
sql + ' WHERE "test1" = :test1 AND "test2" = :test2',
|
||||
dict(**where_clauses, **params)
|
||||
)
|
||||
|
||||
|
||||
def test_add_where_clauses_with_op():
|
||||
sql = "SELECT * FROM table"
|
||||
where_clauses = ('test1=1', 'test2=2')
|
||||
assert OracleDB._add_where_clauses(sql, None, where_clauses, where_op='OR') == (
|
||||
sql + ' WHERE test1=1 OR test2=2',
|
||||
dict()
|
||||
)
|
||||
|
||||
|
||||
def test_add_where_clauses_with_duplicated_field():
|
||||
sql = "UPDATE table SET test1=:test1"
|
||||
params = dict(test1='new_value')
|
||||
where_clauses = dict(test1='where_value')
|
||||
assert OracleDB._add_where_clauses(sql, params, where_clauses) == (
|
||||
sql + ' WHERE "test1" = :test1_1',
|
||||
dict(test1='new_value', test1_1='where_value')
|
||||
)
|
||||
|
||||
|
||||
def test_quote_table_name():
|
||||
assert OracleDB._quote_table_name("mytable") == '"mytable"'
|
||||
assert OracleDB._quote_table_name("myschema.mytable") == '"myschema"."mytable"'
|
||||
|
||||
|
||||
def test_insert(mocker, test_oracledb):
|
||||
values = dict(test1=1, test2=2)
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSQL',
|
||||
generate_mock_doSQL(
|
||||
'INSERT INTO "mytable" ("test1", "test2") VALUES (:test1, :test2)',
|
||||
values
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.insert('mytable', values)
|
||||
|
||||
|
||||
def test_insert_just_try(mocker, test_oracledb):
|
||||
mocker.patch('mylib.oracle.OracleDB.doSQL', mock_doSQL_just_try)
|
||||
assert test_oracledb.insert('mytable', dict(test1=1, test2=2), just_try=True)
|
||||
|
||||
|
||||
def test_update(mocker, test_oracledb):
|
||||
values = dict(test1=1, test2=2)
|
||||
where_clauses = dict(test3=3, test4=4)
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSQL',
|
||||
generate_mock_doSQL(
|
||||
'UPDATE "mytable" SET "test1" = :test1, "test2" = :test2 WHERE "test3" = :test3 AND "test4" = :test4',
|
||||
dict(**values, **where_clauses)
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.update('mytable', values, where_clauses)
|
||||
|
||||
|
||||
def test_update_just_try(mocker, test_oracledb):
|
||||
mocker.patch('mylib.oracle.OracleDB.doSQL', mock_doSQL_just_try)
|
||||
assert test_oracledb.update('mytable', dict(test1=1, test2=2), None, just_try=True)
|
||||
|
||||
|
||||
def test_delete(mocker, test_oracledb):
|
||||
where_clauses = dict(test1=1, test2=2)
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSQL',
|
||||
generate_mock_doSQL(
|
||||
'DELETE FROM "mytable" WHERE "test1" = :test1 AND "test2" = :test2',
|
||||
where_clauses
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.delete('mytable', where_clauses)
|
||||
|
||||
|
||||
def test_delete_just_try(mocker, test_oracledb):
|
||||
mocker.patch('mylib.oracle.OracleDB.doSQL', mock_doSQL_just_try)
|
||||
assert test_oracledb.delete('mytable', None, just_try=True)
|
||||
|
||||
|
||||
def test_truncate(mocker, test_oracledb):
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSQL',
|
||||
generate_mock_doSQL('TRUNCATE "mytable"', None)
|
||||
)
|
||||
|
||||
assert test_oracledb.truncate('mytable')
|
||||
|
||||
|
||||
def test_truncate_just_try(mocker, test_oracledb):
|
||||
mocker.patch('mylib.oracle.OracleDB.doSQL', mock_doSelect_just_try)
|
||||
assert test_oracledb.truncate('mytable', just_try=True)
|
||||
|
||||
|
||||
def test_select(mocker, test_oracledb):
|
||||
fields = ('field1', 'field2')
|
||||
where_clauses = dict(test3=3, test4=4)
|
||||
expected_return = [
|
||||
dict(field1=1, field2=2),
|
||||
dict(field1=2, field2=3),
|
||||
]
|
||||
order_by = "field1, DESC"
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSelect',
|
||||
generate_mock_doSQL(
|
||||
'SELECT "field1", "field2" FROM "mytable" WHERE "test3" = :test3 AND "test4" = :test4 ORDER BY ' + order_by,
|
||||
where_clauses, expected_return
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.select('mytable', where_clauses, fields, order_by=order_by) == expected_return
|
||||
|
||||
|
||||
def test_select_without_field_and_order_by(mocker, test_oracledb):
|
||||
mocker.patch(
|
||||
'mylib.oracle.OracleDB.doSelect',
|
||||
generate_mock_doSQL(
|
||||
'SELECT * FROM "mytable"'
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.select('mytable')
|
||||
|
||||
|
||||
def test_select_just_try(mocker, test_oracledb):
|
||||
mocker.patch('mylib.oracle.OracleDB.doSQL', mock_doSelect_just_try)
|
||||
assert test_oracledb.select('mytable', None, None, just_try=True)
|
||||
|
||||
#
|
||||
# Tests on main methods
|
||||
#
|
||||
|
||||
|
||||
def test_connect(mocker, test_oracledb):
|
||||
expected_kwargs = dict(
|
||||
dsn=test_oracledb._dsn,
|
||||
user=test_oracledb._user,
|
||||
password=test_oracledb._pwd
|
||||
)
|
||||
|
||||
mocker.patch(
|
||||
'cx_Oracle.connect',
|
||||
generate_mock_args(
|
||||
expected_kwargs=expected_kwargs
|
||||
)
|
||||
)
|
||||
|
||||
assert test_oracledb.connect()
|
||||
|
||||
|
||||
def test_close(fake_oracledb):
|
||||
assert fake_oracledb.close() is None
|
||||
|
||||
|
||||
def test_close_connected(fake_connected_oracledb):
|
||||
assert fake_connected_oracledb.close() is None
|
||||
|
||||
|
||||
def test_doSQL(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_sql = 'DELETE FROM table WHERE test1 = :test1'
|
||||
fake_connected_oracledb._conn.expected_params = dict(test1=1)
|
||||
fake_connected_oracledb.doSQL(fake_connected_oracledb._conn.expected_sql, fake_connected_oracledb._conn.expected_params)
|
||||
|
||||
|
||||
def test_doSQL_without_params(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_sql = 'DELETE FROM table'
|
||||
fake_connected_oracledb.doSQL(fake_connected_oracledb._conn.expected_sql)
|
||||
|
||||
|
||||
def test_doSQL_just_try(fake_connected_just_try_oracledb):
|
||||
assert fake_connected_just_try_oracledb.doSQL('DELETE FROM table')
|
||||
|
||||
|
||||
def test_doSQL_on_exception(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_exception = True
|
||||
assert fake_connected_oracledb.doSQL('DELETE FROM table') is False
|
||||
|
||||
|
||||
def test_doSelect(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = :test1'
|
||||
fake_connected_oracledb._conn.expected_params = dict(test1=1)
|
||||
fake_connected_oracledb._conn.expected_return = [dict(test1=1)]
|
||||
assert fake_connected_oracledb.doSelect(
|
||||
fake_connected_oracledb._conn.expected_sql,
|
||||
fake_connected_oracledb._conn.expected_params) == fake_connected_oracledb._conn.expected_return
|
||||
|
||||
|
||||
def test_doSelect_without_params(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_sql = 'SELECT * FROM table'
|
||||
fake_connected_oracledb._conn.expected_return = [dict(test1=1)]
|
||||
assert fake_connected_oracledb.doSelect(fake_connected_oracledb._conn.expected_sql) == fake_connected_oracledb._conn.expected_return
|
||||
|
||||
|
||||
def test_doSelect_on_exception(fake_connected_oracledb):
|
||||
fake_connected_oracledb._conn.expected_exception = True
|
||||
assert fake_connected_oracledb.doSelect('SELECT * FROM table') is False
|
||||
|
||||
|
||||
def test_doSelect_just_try(fake_connected_just_try_oracledb):
|
||||
fake_connected_just_try_oracledb._conn.expected_sql = 'SELECT * FROM table WHERE test1 = :test1'
|
||||
fake_connected_just_try_oracledb._conn.expected_params = dict(test1=1)
|
||||
fake_connected_just_try_oracledb._conn.expected_return = [dict(test1=1)]
|
||||
assert fake_connected_just_try_oracledb.doSelect(
|
||||
fake_connected_just_try_oracledb._conn.expected_sql,
|
||||
fake_connected_just_try_oracledb._conn.expected_params
|
||||
) == fake_connected_just_try_oracledb._conn.expected_return
|
Loading…
Reference in a new issue