""" Basic SQL DB client """ import logging log = logging.getLogger(__name__) # # Exceptions # class DBException(Exception): """That is the base exception class for all the other exceptions provided by this module.""" def __init__(self, error, *args, **kwargs): for arg, value in kwargs.items(): setattr(self, arg, value) super().__init__(error.format(*args, **kwargs)) class DBNotImplemented(DBException, RuntimeError): """ Raised when calling a method not implemented in child class """ def __init__(self, method, class_name): super().__init__( "The method {method} is not yet implemented in class {class_name}", method=method, class_name=class_name, ) class DBFailToConnect(DBException, RuntimeError): """ Raised on connecting error occurred """ def __init__(self, uri): super().__init__("An error occured during database connection ({uri})", uri=uri) class DBDuplicatedSQLParameter(DBException, KeyError): """ Raised when trying to set a SQL query parameter and an other parameter with the same name is already set """ def __init__(self, parameter_name): super().__init__( "Duplicated SQL parameter '{parameter_name}'", parameter_name=parameter_name ) class DBUnsupportedWHEREClauses(DBException, TypeError): """ Raised when trying to execute query with unsupported WHERE clauses provided """ def __init__(self, where_clauses): super().__init__("Unsupported WHERE clauses: {where_clauses}", where_clauses=where_clauses) class DBInvalidOrderByClause(DBException, TypeError): """ Raised when trying to select on table with invalid ORDER BY clause provided """ def __init__(self, order_by): super().__init__( "Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values" " (ordering field name and direction)", order_by=order_by, ) class DB: """Database client""" just_try = False def __init__(self, just_try=False, **kwargs): self.just_try = just_try self._conn = None for arg, value in kwargs.items(): setattr(self, f"_{arg}", value) def connect(self, exit_on_error=True): """Connect to DB server""" raise DBNotImplemented("connect", self.__class__.__name__) def close(self): """Close connection with DB server (if opened)""" if self._conn: self._conn.close() self._conn = None @staticmethod def _log_query(sql, params): log.debug( 'Run SQL query "%s" %s', sql, "with params = {}".format( # pylint: disable=consider-using-f-string ", ".join([f"{key} = {value}" for key, value in params.items()]) if params else "without params" ), ) @staticmethod def _log_query_exception(sql, params): log.exception( 'Error during SQL query "%s" %s', sql, "with params = {}".format( # pylint: disable=consider-using-f-string ", ".join([f"{key} = {value}" for key, value in params.items()]) if params else "without params" ), ) def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) :param sql: The SQL query :param params: The SQL query's parameters as dict (optional) :return: True on success, False otherwise :rtype: bool """ raise DBNotImplemented("doSQL", self.__class__.__name__) def doSelect(self, sql, params=None): """ Run SELECT SQL query and return list of selected rows as dict :param sql: The SQL query :param params: The SQL query's parameters as dict (optional) :return: List of selected rows as dict on success, False otherwise :rtype: list, bool """ raise DBNotImplemented("doSelect", self.__class__.__name__) # # SQL helpers # @staticmethod def _quote_table_name(table): """Quote table name""" return '"{}"'.format( # pylint: disable=consider-using-f-string '"."'.join(table.split(".")) ) @staticmethod def _quote_field_name(field): """Quote table name""" return f'"{field}"' @staticmethod def format_param(param): """Format SQL query parameter for prepared query""" return f"%({param})s" @classmethod def _combine_params(cls, params, to_add=None, **kwargs): if to_add: assert isinstance(to_add, dict), "to_add must be a dict or None" params = cls._combine_params(params, **to_add) for param, value in kwargs.items(): if param in params: raise DBDuplicatedSQLParameter(param) params[param] = value return params @classmethod def _format_where_clauses(cls, where_clauses, params=None, where_op=None): """ Format WHERE clauses :param where_clauses: The WHERE clauses. Could be: - a raw SQL WHERE clause as string - a tuple of two elements: a raw WHERE clause and its parameters as dict - a dict of WHERE clauses with field name as key and WHERE clause value as value - a list of any of previous valid WHERE clauses :param params: Dict of other already set SQL query parameters (optional) :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success :rtype: string, bool """ if params is None: params = {} if where_op is None: where_op = "AND" if isinstance(where_clauses, str): return (where_clauses, params) if ( isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict) ): cls._combine_params(params, where_clauses[1]) return (where_clauses[0], params) if isinstance(where_clauses, (list, tuple)): sql_where_clauses = [] for where_clause in where_clauses: sql2, params = cls._format_where_clauses( where_clause, params=params, where_op=where_op ) sql_where_clauses.append(sql2) return (f" {where_op} ".join(sql_where_clauses), params) if isinstance(where_clauses, dict): sql_where_clauses = [] for field, value in where_clauses.items(): param = field if field in params: idx = 1 while param in params: param = f"{field}_{idx}" idx += 1 cls._combine_params(params, {param: value}) sql_where_clauses.append( f"{cls._quote_field_name(field)} = {cls.format_param(param)}" ) return (f" {where_op} ".join(sql_where_clauses), params) raise DBUnsupportedWHEREClauses(where_clauses) @classmethod def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): """ Add WHERE clauses to an SQL query :param sql: The SQL query to complete :param params: The dict of parameters of the SQL query to complete :param where_clauses: The WHERE clause (see _format_where_clauses()) :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) :return: :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters """ if where_clauses: sql_where, params = cls._format_where_clauses( where_clauses, params=params, where_op=where_op ) sql += " WHERE " + sql_where return (sql, params) def insert(self, table, values, just_try=False): """Run INSERT SQL query""" # pylint: disable=consider-using-f-string sql = "INSERT INTO {} ({}) VALUES ({})".format( # nosec self._quote_table_name(table), ", ".join([self._quote_field_name(field) for field in values.keys()]), ", ".join([self.format_param(key) for key in values]), ) if just_try: log.debug("Just-try mode: execute INSERT query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=values): log.error("Fail to execute INSERT query (SQL: %s)", sql) return False return True def update(self, table, values, where_clauses, where_op=None, just_try=False): """Run UPDATE SQL query""" # pylint: disable=consider-using-f-string sql = "UPDATE {} SET {}".format( # nosec self._quote_table_name(table), ", ".join( [f"{self._quote_field_name(key)} = {self.format_param(key)}" for key in values] ), ) params = values try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): log.error("Fail to add WHERE clauses", exc_info=True) return False if just_try: log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def delete(self, table, where_clauses, where_op="AND", just_try=False): """Run DELETE SQL query""" sql = f"DELETE FROM {self._quote_table_name(table)}" # nosec params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): log.error("Fail to add WHERE clauses", exc_info=True) return False if just_try: log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def truncate(self, table, just_try=False): """Run TRUNCATE SQL query""" sql = f"TRUNCATE TABLE {self._quote_table_name(table)}" # nosec if just_try: log.debug("Just-try mode: execute TRUNCATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql): log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) return False return True def select( self, table, where_clauses=None, fields=None, where_op="AND", order_by=None, just_try=False ): """Run SELECT SQL query""" sql = "SELECT " if fields is None: sql += "*" elif isinstance(fields, str): sql += f"{self._quote_field_name(fields)}" else: sql += ", ".join([self._quote_field_name(field) for field in fields]) sql += f" FROM {self._quote_table_name(table)}" params = {} try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (DBDuplicatedSQLParameter, DBUnsupportedWHEREClauses): log.error("Fail to add WHERE clauses", exc_info=True) return False if order_by: if isinstance(order_by, str): sql += f" ORDER BY {order_by}" elif ( isinstance(order_by, (list, tuple)) and len(order_by) == 2 and isinstance(order_by[0], str) and isinstance(order_by[1], str) and order_by[1].upper() in ("ASC", "UPPER") ): sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}' else: raise DBInvalidOrderByClause(order_by) if just_try: log.debug("Just-try mode: execute SELECT query : %s", sql) return just_try return self.doSelect(sql, params=params)