362 lines
12 KiB
Python
362 lines
12 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
""" Oracle client """
|
||
|
|
||
|
import logging
|
||
|
import sys
|
||
|
|
||
|
import cx_Oracle
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
#
|
||
|
# Exceptions
|
||
|
#
|
||
|
|
||
|
class OracleDBException(Exception):
|
||
|
""" That is the base exception class for all the other exceptions provided by this module. """
|
||
|
|
||
|
def __init__(self, error, *args, **kwargs):
|
||
|
for arg, value in kwargs.items():
|
||
|
setattr(self, arg, value)
|
||
|
super().__init__(error.format(*args, **kwargs))
|
||
|
|
||
|
|
||
|
class OracleDBDuplicatedSQLParameter(OracleDBException, KeyError):
|
||
|
"""
|
||
|
Raised when trying to set a SQL query parameter
|
||
|
and an other parameter with the same name is already set
|
||
|
"""
|
||
|
|
||
|
def __init__(self, parameter_name):
|
||
|
super().__init__(
|
||
|
"Duplicated SQL parameter '{parameter_name}'",
|
||
|
parameter_name=parameter_name
|
||
|
)
|
||
|
|
||
|
|
||
|
class OracleDBUnsupportedWHEREClauses(OracleDBException, TypeError):
|
||
|
"""
|
||
|
Raised when trying to execute query with unsupported
|
||
|
WHERE clauses provided
|
||
|
"""
|
||
|
|
||
|
def __init__(self, where_clauses):
|
||
|
super().__init__(
|
||
|
"Unsupported WHERE clauses: {where_clauses}",
|
||
|
where_clauses=where_clauses
|
||
|
)
|
||
|
|
||
|
|
||
|
class OracleDB:
|
||
|
""" Oracle client """
|
||
|
|
||
|
def __init__(self, dsn, user, pwd, just_try=False):
|
||
|
self._dsn = dsn
|
||
|
self._user = user
|
||
|
self._pwd = pwd
|
||
|
self._conn = None
|
||
|
self.just_try = just_try
|
||
|
|
||
|
def connect(self):
|
||
|
""" Connect to Oracle server """
|
||
|
if self._conn is None:
|
||
|
try:
|
||
|
self._conn = cx_Oracle.connect(
|
||
|
user=self._user,
|
||
|
password=self._pwd,
|
||
|
dsn=self._dsn
|
||
|
)
|
||
|
except Exception:
|
||
|
log.fatal(
|
||
|
'An error occured during Oracle database connection (%s@%s).',
|
||
|
self._user, self._dsn, exc_info=1
|
||
|
)
|
||
|
sys.exit(1)
|
||
|
return True
|
||
|
|
||
|
def close(self):
|
||
|
""" Close connection with Oracle server (if opened) """
|
||
|
if self._conn:
|
||
|
self._conn.close()
|
||
|
self._conn = None
|
||
|
|
||
|
def doSQL(self, sql, params=None):
|
||
|
"""
|
||
|
Run SQL query and commit changes (rollback on error)
|
||
|
|
||
|
:param sql: The SQL query
|
||
|
:param params: The SQL query's parameters as dict (optional)
|
||
|
|
||
|
:return: True on success, False otherwise
|
||
|
:rtype: bool
|
||
|
"""
|
||
|
if self.just_try:
|
||
|
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
|
||
|
return True
|
||
|
|
||
|
cursor = self._conn.cursor()
|
||
|
try:
|
||
|
if isinstance(params, dict):
|
||
|
cursor.execute(sql, **params)
|
||
|
else:
|
||
|
cursor.execute(sql)
|
||
|
self._conn.commit()
|
||
|
return True
|
||
|
except Exception:
|
||
|
log.error(
|
||
|
'Error during SQL request "%s" %s',
|
||
|
sql,
|
||
|
"with params = %s" % ', '.join([
|
||
|
"%s = %s" % (key, value)
|
||
|
for key, value in params.items()
|
||
|
]) if params else "without params",
|
||
|
exc_info=True
|
||
|
)
|
||
|
self._conn.rollback()
|
||
|
return False
|
||
|
|
||
|
def doSelect(self, sql, params=None):
|
||
|
"""
|
||
|
Run SELECT SQL query and return list of selected rows as dict
|
||
|
|
||
|
:param sql: The SQL query
|
||
|
:param params: The SQL query's parameters as dict (optional)
|
||
|
|
||
|
:return: List of selected rows as dict on success, False otherwise
|
||
|
:rtype: list, bool
|
||
|
"""
|
||
|
cursor = self._conn.cursor()
|
||
|
try:
|
||
|
if isinstance(params, dict):
|
||
|
cursor.execute(sql, **params)
|
||
|
else:
|
||
|
cursor.execute(sql)
|
||
|
cursor.rowfactory = lambda *args: dict(zip([d[0] for d in cursor.description], args))
|
||
|
results = cursor.fetchall()
|
||
|
return results
|
||
|
except Exception:
|
||
|
log.error(
|
||
|
'Error during SQL request "%s" %s',
|
||
|
sql,
|
||
|
"with params = %s" % ', '.join([
|
||
|
"%s = %s" % (key, value)
|
||
|
for key, value in params.items()
|
||
|
]) if params else "without params",
|
||
|
exc_info=True
|
||
|
)
|
||
|
return False
|
||
|
|
||
|
#
|
||
|
# SQL helpers
|
||
|
#
|
||
|
|
||
|
@classmethod
|
||
|
def _combine_params(cls, params, to_add=None, **kwargs):
|
||
|
if to_add:
|
||
|
assert isinstance(to_add, dict), "to_add must be a dict or None"
|
||
|
params = cls._combine_params(params, **to_add)
|
||
|
|
||
|
for param, value in kwargs.items():
|
||
|
if param in params:
|
||
|
raise OracleDBDuplicatedSQLParameter(param)
|
||
|
params[param] = value
|
||
|
return params
|
||
|
|
||
|
@classmethod
|
||
|
def _format_where_clauses(cls, where_clauses, params=None, where_op=None):
|
||
|
"""
|
||
|
Format WHERE clauses
|
||
|
|
||
|
:param where_clauses: The WHERE clauses. Could be:
|
||
|
- a raw SQL WHERE clause as string
|
||
|
- a tuple of two elements: a raw WHERE clause and its parameters as dict
|
||
|
- a dict of WHERE clauses with field name as key and WHERE clause value as value
|
||
|
- a list of any of previous valid WHERE clauses
|
||
|
:param params: Dict of other already set SQL query parameters (optional)
|
||
|
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND)
|
||
|
|
||
|
:return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success
|
||
|
:rtype: string, bool
|
||
|
"""
|
||
|
if params is None:
|
||
|
params = dict()
|
||
|
if where_op is None:
|
||
|
where_op = 'AND'
|
||
|
|
||
|
if isinstance(where_clauses, str):
|
||
|
return (where_clauses, params)
|
||
|
|
||
|
if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict):
|
||
|
cls._combine_params(params, where_clauses[1])
|
||
|
return (where_clauses[0], params)
|
||
|
|
||
|
if isinstance(where_clauses, (list, tuple)):
|
||
|
sql_where_clauses = []
|
||
|
for where_clause in where_clauses:
|
||
|
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
|
||
|
sql_where_clauses.append(sql2)
|
||
|
return (
|
||
|
(" %s " % where_op).join(sql_where_clauses),
|
||
|
params
|
||
|
)
|
||
|
|
||
|
if isinstance(where_clauses, dict):
|
||
|
sql_where_clauses = []
|
||
|
for field, value in where_clauses.items():
|
||
|
param = field
|
||
|
if field in params:
|
||
|
idx = 1
|
||
|
while param in params:
|
||
|
param = '%s_%d' % (field, idx)
|
||
|
idx += 1
|
||
|
cls._combine_params(params, {param: value})
|
||
|
sql_where_clauses.append(
|
||
|
'"{field}" = :{param}'.format(field=field, param=param)
|
||
|
)
|
||
|
return (
|
||
|
(" %s " % where_op).join(sql_where_clauses),
|
||
|
params
|
||
|
)
|
||
|
raise OracleDBUnsupportedWHEREClauses(where_clauses)
|
||
|
|
||
|
@classmethod
|
||
|
def _add_where_clauses(cls, sql, params, where_clauses, where_op=None):
|
||
|
"""
|
||
|
Add WHERE clauses to an SQL query
|
||
|
|
||
|
:param sql: The SQL query to complete
|
||
|
:param params: The dict of parameters of the SQL query to complete
|
||
|
:param where_clauses: The WHERE clause (see _format_where_clauses())
|
||
|
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses())
|
||
|
|
||
|
:return:
|
||
|
:rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters
|
||
|
"""
|
||
|
if where_clauses:
|
||
|
sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op)
|
||
|
sql += " WHERE " + sql_where
|
||
|
return (sql, params)
|
||
|
|
||
|
@staticmethod
|
||
|
def _quote_table_name(table):
|
||
|
""" Quote table name """
|
||
|
return '"{0}"'.format(
|
||
|
'"."'.join(
|
||
|
table.split('.')
|
||
|
)
|
||
|
)
|
||
|
|
||
|
def insert(self, table, values, just_try=False):
|
||
|
""" Run INSERT SQL query """
|
||
|
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
|
||
|
self._quote_table_name(table),
|
||
|
'", "'.join(values.keys()),
|
||
|
", ".join([
|
||
|
':{0}'.format(key)
|
||
|
for key in values
|
||
|
])
|
||
|
)
|
||
|
|
||
|
if just_try:
|
||
|
log.debug("Just-try mode: execute INSERT query: %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql, params=values):
|
||
|
log.error("Fail to execute INSERT query (SQL: %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
||
|
""" Run UPDATE SQL query """
|
||
|
sql = 'UPDATE {0} SET {1}'.format(
|
||
|
self._quote_table_name(table),
|
||
|
", ".join([
|
||
|
'"{0}" = :{0}'.format(key)
|
||
|
for key in values
|
||
|
])
|
||
|
)
|
||
|
params = values
|
||
|
|
||
|
try:
|
||
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||
|
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||
|
log.error('Fail to add WHERE clauses', exc_info=True)
|
||
|
return False
|
||
|
|
||
|
if just_try:
|
||
|
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql, params=params):
|
||
|
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def delete(self, table, where_clauses, where_op='AND', just_try=False):
|
||
|
""" Run DELETE SQL query """
|
||
|
sql = 'DELETE FROM {0}'.format(self._quote_table_name(table))
|
||
|
params = dict()
|
||
|
|
||
|
try:
|
||
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||
|
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||
|
log.error('Fail to add WHERE clauses', exc_info=True)
|
||
|
return False
|
||
|
|
||
|
if just_try:
|
||
|
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql, params=params):
|
||
|
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def truncate(self, table, just_try=False):
|
||
|
""" Run TRUNCATE SQL query """
|
||
|
|
||
|
sql = 'TRUNCATE {0}'.format(self._quote_table_name(table))
|
||
|
|
||
|
if just_try:
|
||
|
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql):
|
||
|
log.error("Fail to execute TRUNCATE query (SQL: %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False):
|
||
|
""" Run SELECT SQL query """
|
||
|
sql = "SELECT "
|
||
|
if fields is None:
|
||
|
sql += "*"
|
||
|
elif isinstance(fields, str):
|
||
|
sql += '"{0}"'.format(fields)
|
||
|
else:
|
||
|
sql += '"{0}"'.format('", "'.join(fields))
|
||
|
|
||
|
sql += ' FROM {0}'.format(self._quote_table_name(table))
|
||
|
params = dict()
|
||
|
|
||
|
try:
|
||
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
||
|
except (OracleDBDuplicatedSQLParameter, OracleDBUnsupportedWHEREClauses):
|
||
|
log.error('Fail to add WHERE clauses', exc_info=True)
|
||
|
return False
|
||
|
|
||
|
if order_by:
|
||
|
sql += ' ORDER BY {0}'.format(order_by)
|
||
|
|
||
|
if just_try:
|
||
|
log.debug("Just-try mode: execute SELECT query : %s", sql)
|
||
|
return just_try
|
||
|
|
||
|
return self.doSelect(sql, params=params)
|