428 lines
14 KiB
Python
428 lines
14 KiB
Python
""" Basic SQL DB client """
|
|
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
#
|
|
# Exceptions
|
|
#
|
|
|
|
|
|
class DBException(Exception):
|
|
"""That is the base exception class for all the other exceptions provided by this module."""
|
|
|
|
def __init__(self, error, *args, **kwargs):
|
|
for arg, value in kwargs.items():
|
|
setattr(self, arg, value)
|
|
super().__init__(error.format(*args, **kwargs))
|
|
|
|
|
|
class DBNotImplemented(DBException, RuntimeError):
|
|
"""
|
|
Raised when calling a method not implemented in child class
|
|
"""
|
|
|
|
def __init__(self, method, class_name):
|
|
super().__init__(
|
|
"The method {method} is not yet implemented in class {class_name}",
|
|
method=method,
|
|
class_name=class_name,
|
|
)
|
|
|
|
|
|
class DBFailToConnect(DBException, RuntimeError):
|
|
"""
|
|
Raised on connecting error occurred
|
|
"""
|
|
|
|
def __init__(self, uri):
|
|
super().__init__("An error occurred during database connection ({uri})", uri=uri)
|
|
|
|
|
|
class DBDuplicatedSQLParameter(DBException, KeyError):
|
|
"""
|
|
Raised when trying to set a SQL query parameter
|
|
and an other parameter with the same name is already set
|
|
"""
|
|
|
|
def __init__(self, parameter_name):
|
|
super().__init__(
|
|
"Duplicated SQL parameter '{parameter_name}'", parameter_name=parameter_name
|
|
)
|
|
|
|
|
|
class DBUnsupportedWHEREClauses(DBException, TypeError):
|
|
"""
|
|
Raised when trying to execute query with unsupported
|
|
WHERE clauses provided
|
|
"""
|
|
|
|
def __init__(self, where_clauses):
|
|
super().__init__("Unsupported WHERE clauses: {where_clauses}", where_clauses=where_clauses)
|
|
|
|
|
|
class DBInvalidOrderByClause(DBException, TypeError):
|
|
"""
|
|
Raised when trying to select on table with invalid
|
|
ORDER BY clause provided
|
|
"""
|
|
|
|
def __init__(self, order_by):
|
|
super().__init__(
|
|
"Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values"
|
|
" (ordering field name and direction)",
|
|
order_by=order_by,
|
|
)
|
|
|
|
|
|
class DBInvalidLimitClause(DBException, TypeError):
|
|
"""
|
|
Raised when trying to select on table with invalid
|
|
LIMIT clause provided
|
|
"""
|
|
|
|
def __init__(self, limit):
|
|
super().__init__(
|
|
"Invalid LIMIT clause: {limit}. Must be a non-zero positive integer.",
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
class DB:
|
|
"""Database client"""
|
|
|
|
just_try = False
|
|
|
|
def __init__(self, just_try=False, **kwargs):
|
|
self.just_try = just_try
|
|
self._conn = None
|
|
for arg, value in kwargs.items():
|
|
setattr(self, f"_{arg}", value)
|
|
|
|
def connect(self, exit_on_error=True):
|
|
"""Connect to DB server"""
|
|
raise DBNotImplemented("connect", self.__class__.__name__)
|
|
|
|
def close(self):
|
|
"""Close connection with DB server (if opened)"""
|
|
if self._conn:
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
@staticmethod
|
|
def _log_query(sql, params):
|
|
log.debug(
|
|
'Run SQL query "%s" %s',
|
|
sql,
|
|
"with params = {}".format( # pylint: disable=consider-using-f-string
|
|
", ".join([f"{key} = {value}" for key, value in params.items()])
|
|
if params
|
|
else "without params"
|
|
),
|
|
)
|
|
|
|
@staticmethod
|
|
def _log_query_exception(sql, params):
|
|
log.exception(
|
|
'Error during SQL query "%s" %s',
|
|
sql,
|
|
"with params = {}".format( # pylint: disable=consider-using-f-string
|
|
", ".join([f"{key} = {value}" for key, value in params.items()])
|
|
if params
|
|
else "without params"
|
|
),
|
|
)
|
|
|
|
def doSQL(self, sql, params=None):
|
|
"""
|
|
Run SQL query and commit changes (rollback on error)
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: True on success, False otherwise
|
|
:rtype: bool
|
|
"""
|
|
raise DBNotImplemented("doSQL", self.__class__.__name__)
|
|
|
|
def doSelect(self, sql, params=None):
|
|
"""
|
|
Run SELECT SQL query and return list of selected rows as dict
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: List of selected rows as dict on success, False otherwise
|
|
:rtype: list, bool
|
|
"""
|
|
raise DBNotImplemented("doSelect", self.__class__.__name__)
|
|
|
|
#
|
|
# SQL helpers
|
|
#
|
|
|
|
@staticmethod
|
|
def _quote_table_name(table):
|
|
"""Quote table name"""
|
|
return '"{}"'.format( # pylint: disable=consider-using-f-string
|
|
'"."'.join(table.split("."))
|
|
)
|
|
|
|
@staticmethod
|
|
def _quote_field_name(field):
|
|
"""Quote table name"""
|
|
return f'"{field}"'
|
|
|
|
@staticmethod
|
|
def format_param(param):
|
|
"""Format SQL query parameter for prepared query"""
|
|
return f"%({param})s"
|
|
|
|
@classmethod
|
|
def _combine_params(cls, params, to_add=None, **kwargs):
|
|
if to_add:
|
|
assert isinstance(to_add, dict), "to_add must be a dict or None"
|
|
params = cls._combine_params(params, **to_add)
|
|
|
|
for param, value in kwargs.items():
|
|
if param in params:
|
|
raise DBDuplicatedSQLParameter(param)
|
|
params[param] = value
|
|
return params
|
|
|
|
@staticmethod
|
|
def _get_unique_param_name(field, params):
|
|
"""Return a unique parameter name based on specified field name"""
|
|
param = field
|
|
if field in params:
|
|
idx = 1
|
|
while param in params:
|
|
param = f"{field}_{idx}"
|
|
idx += 1
|
|
return param
|
|
|
|
@classmethod
|
|
def _format_where_clauses(cls, where_clauses, params=None, where_op=None):
|
|
"""
|
|
Format WHERE clauses
|
|
|
|
:param where_clauses: The WHERE clauses. Could be:
|
|
- a raw SQL WHERE clause as string
|
|
- a tuple of two elements: a raw WHERE clause and its parameters as dict
|
|
- a dict of WHERE clauses with field name as key and WHERE clause value as value
|
|
- a list of any of previous valid WHERE clauses
|
|
:param params: Dict of other already set SQL query parameters (optional)
|
|
:param where_op: SQL operator used to combine WHERE clauses together (optional, default:
|
|
AND)
|
|
|
|
:return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success
|
|
:rtype: string, bool
|
|
"""
|
|
if params is None:
|
|
params = {}
|
|
if where_op is None:
|
|
where_op = "AND"
|
|
|
|
if isinstance(where_clauses, str):
|
|
return (where_clauses, params)
|
|
|
|
if (
|
|
isinstance(where_clauses, tuple)
|
|
and len(where_clauses) == 2
|
|
and isinstance(where_clauses[1], dict)
|
|
):
|
|
cls._combine_params(params, where_clauses[1])
|
|
return (where_clauses[0], params)
|
|
|
|
if isinstance(where_clauses, (list, tuple)):
|
|
sql_where_clauses = []
|
|
for where_clause in where_clauses:
|
|
sql2, params = cls._format_where_clauses(
|
|
where_clause, params=params, where_op=where_op
|
|
)
|
|
sql_where_clauses.append(sql2)
|
|
return (f" {where_op} ".join(sql_where_clauses), params)
|
|
|
|
if isinstance(where_clauses, dict):
|
|
sql_where_clauses = []
|
|
for field, value in where_clauses.items():
|
|
if isinstance(value, list):
|
|
param_names = []
|
|
for idx, v in enumerate(value):
|
|
param = cls._get_unique_param_name(f"{field}_{idx}", params)
|
|
cls._combine_params(params, **{param: v})
|
|
param_names.append(param)
|
|
sql_where_clauses.append(
|
|
f"{cls._quote_field_name(field)} IN "
|
|
f"({', '.join([cls.format_param(param) for param in param_names])})"
|
|
)
|
|
else:
|
|
param = cls._get_unique_param_name(field, params)
|
|
cls._combine_params(params, **{param: value})
|
|
sql_where_clauses.append(
|
|
f"{cls._quote_field_name(field)} = {cls.format_param(param)}"
|
|
)
|
|
return (f" {where_op} ".join(sql_where_clauses), params)
|
|
raise DBUnsupportedWHEREClauses(where_clauses)
|
|
|
|
@classmethod
|
|
def _add_where_clauses(cls, sql, params, where_clauses, where_op=None):
|
|
"""
|
|
Add WHERE clauses to an SQL query
|
|
|
|
:param sql: The SQL query to complete
|
|
:param params: The dict of parameters of the SQL query to complete
|
|
:param where_clauses: The WHERE clause (see _format_where_clauses())
|
|
:param where_op: SQL operator used to combine WHERE clauses together (optional, default:
|
|
see _format_where_clauses())
|
|
|
|
:return:
|
|
:rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters
|
|
"""
|
|
if where_clauses:
|
|
sql_where, params = cls._format_where_clauses(
|
|
where_clauses, params=params, where_op=where_op
|
|
)
|
|
sql += " WHERE " + sql_where
|
|
return (sql, params)
|
|
|
|
def insert(self, table, values, just_try=False):
|
|
"""Run INSERT SQL query"""
|
|
# pylint: disable=consider-using-f-string
|
|
sql = "INSERT INTO {} ({}) VALUES ({})".format( # nosec
|
|
self._quote_table_name(table),
|
|
", ".join([self._quote_field_name(field) for field in values.keys()]),
|
|
", ".join([self.format_param(key) for key in values]),
|
|
)
|
|
|
|
if just_try:
|
|
log.debug("Just-try mode: execute INSERT query: %s", sql)
|
|
return True
|
|
|
|
log.debug(sql)
|
|
if not self.doSQL(sql, params=values):
|
|
log.error("Fail to execute INSERT query (SQL: %s)", sql)
|
|
return False
|
|
return True
|
|
|
|
def update(self, table, values, where_clauses, where_op=None, just_try=False):
|
|
"""Run UPDATE SQL query"""
|
|
# pylint: disable=consider-using-f-string
|
|
sql = "UPDATE {} SET {}".format( # nosec
|
|
self._quote_table_name(table),
|
|
", ".join(
|
|
[f"{self._quote_field_name(key)} = {self.format_param(key)}" for key in values]
|
|
),
|
|
)
|
|
params = values
|
|
|
|
try:
|
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
|
except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses):
|
|
log.error("Fail to add WHERE clauses", exc_info=True)
|
|
return False
|
|
|
|
if just_try:
|
|
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
|
return True
|
|
|
|
log.debug(sql)
|
|
if not self.doSQL(sql, params=params):
|
|
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
|
return False
|
|
return True
|
|
|
|
def delete(self, table, where_clauses, where_op="AND", just_try=False):
|
|
"""Run DELETE SQL query"""
|
|
sql = f"DELETE FROM {self._quote_table_name(table)}" # nosec
|
|
params = {}
|
|
|
|
try:
|
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
|
except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses):
|
|
log.error("Fail to add WHERE clauses", exc_info=True)
|
|
return False
|
|
|
|
if just_try:
|
|
log.debug("Just-try mode: execute UPDATE query: %s", sql)
|
|
return True
|
|
|
|
log.debug(sql)
|
|
if not self.doSQL(sql, params=params):
|
|
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
|
|
return False
|
|
return True
|
|
|
|
def truncate(self, table, just_try=False):
|
|
"""Run TRUNCATE SQL query"""
|
|
sql = f"TRUNCATE TABLE {self._quote_table_name(table)}" # nosec
|
|
|
|
if just_try:
|
|
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
|
|
return True
|
|
|
|
log.debug(sql)
|
|
if not self.doSQL(sql):
|
|
log.error("Fail to execute TRUNCATE query (SQL: %s)", sql)
|
|
return False
|
|
return True
|
|
|
|
def select(
|
|
self,
|
|
table,
|
|
where_clauses=None,
|
|
fields=None,
|
|
where_op="AND",
|
|
order_by=None,
|
|
limit=None,
|
|
just_try=False,
|
|
):
|
|
"""Run SELECT SQL query"""
|
|
sql = "SELECT "
|
|
if fields is None:
|
|
sql += "*"
|
|
elif isinstance(fields, str):
|
|
sql += f"{self._quote_field_name(fields)}"
|
|
else:
|
|
sql += ", ".join([self._quote_field_name(field) for field in fields])
|
|
|
|
sql += f" FROM {self._quote_table_name(table)}"
|
|
params = {}
|
|
|
|
try:
|
|
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
|
|
except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses):
|
|
log.error("Fail to add WHERE clauses", exc_info=True)
|
|
return False
|
|
|
|
if order_by:
|
|
if isinstance(order_by, str):
|
|
sql += f" ORDER BY {order_by}"
|
|
elif (
|
|
isinstance(order_by, (list, tuple))
|
|
and len(order_by) == 2
|
|
and isinstance(order_by[0], str)
|
|
and isinstance(order_by[1], str)
|
|
and order_by[1].upper() in ("ASC", "UPPER")
|
|
):
|
|
sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}'
|
|
else:
|
|
raise DBInvalidOrderByClause(order_by)
|
|
|
|
if limit:
|
|
if not isinstance(limit, int):
|
|
try:
|
|
limit = int(limit)
|
|
except ValueError as err:
|
|
raise DBInvalidLimitClause(limit) from err
|
|
if limit <= 0:
|
|
raise DBInvalidLimitClause(limit)
|
|
sql += f" LIMIT {limit}"
|
|
|
|
if just_try:
|
|
log.debug("Just-try mode: execute SELECT query : %s", sql)
|
|
return just_try
|
|
|
|
return self.doSelect(sql, params=params)
|