python-mylib/mylib/pgsql.py

486 lines
16 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
""" PostgreSQL client """
import datetime
import logging
import sys
import psycopg2
log = logging.getLogger(__name__)
2021-05-19 19:19:57 +02:00
2021-10-06 21:33:25 +02:00
#
# Exceptions
#
class PgDBException(Exception):
""" That is the base exception class for all the other exceptions provided by this module. """
def __init__(self, error, *args, **kwargs):
for arg, value in kwargs.items():
setattr(self, arg, value)
super().__init__(error.format(*args, **kwargs))
class PgDBFailToConnect(PgDBException, RuntimeError):
"""
Raised on connecting error occurred
"""
def __init__(self, host, user, db):
super().__init__(
"An error occured during Postgresql database connection ({user}@{host}, database={db})",
user=user, host=host, db=db
)
2021-10-06 21:33:25 +02:00
class PgDBDuplicatedSQLParameter(PgDBException, KeyError):
"""
Raised when trying to set a SQL query parameter
and an other parameter with the same name is already set
"""
def __init__(self, parameter_name):
super().__init__(
"Duplicated SQL parameter '{parameter_name}'",
parameter_name=parameter_name
)
class PgDBUnsupportedWHEREClauses(PgDBException, TypeError):
"""
Raised when trying to execute query with unsupported
WHERE clauses provided
"""
def __init__(self, where_clauses):
super().__init__(
"Unsupported WHERE clauses: {where_clauses}",
where_clauses=where_clauses
)
class PgDBInvalidOrderByClause(PgDBException, TypeError):
"""
Raised when trying to select on table with invalid
ORDER BY clause provided
"""
def __init__(self, order_by):
super().__init__(
"Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values (ordering field name and direction)",
order_by=order_by
)
class PgDB:
""" PostgreSQL client """
date_format = '%Y-%m-%d'
datetime_format = '%Y-%m-%d %H:%M:%S'
def __init__(self, host, user, pwd, db, just_try=False):
2021-11-07 21:37:18 +01:00
self._host = host
self._user = user
self._pwd = pwd
self._db = db
self._conn = None
self.just_try = just_try
def connect(self, exit_on_error=True):
""" Connect to PostgreSQL server """
2021-11-07 21:37:18 +01:00
if self._conn is None:
try:
log.info(
'Connect on PostgreSQL server %s as %s on database %s',
self._host, self._user, self._db)
2021-11-07 21:37:18 +01:00
self._conn = psycopg2.connect(
dbname=self._db,
user=self._user,
host=self._host,
password=self._pwd
2021-05-19 19:19:57 +02:00
)
2023-01-06 19:36:14 +01:00
except psycopg2.Error as err:
2021-11-07 21:37:18 +01:00
log.fatal(
'An error occured during Postgresql database connection (%s@%s, database=%s).',
2021-11-07 21:37:18 +01:00
self._user, self._host, self._db, exc_info=1
)
if exit_on_error:
sys.exit(1)
else:
raise PgDBFailToConnect(self._host, self._user, self._db) from err
return True
def close(self):
""" Close connection with PostgreSQL server (if opened) """
2021-11-07 21:37:18 +01:00
if self._conn:
self._conn.close()
self._conn = None
def setEncoding(self, enc):
""" Set connection encoding """
2021-11-07 21:37:18 +01:00
if self._conn:
try:
2021-11-07 21:37:18 +01:00
self._conn.set_client_encoding(enc)
return True
2023-01-06 19:36:14 +01:00
except psycopg2.Error:
2021-10-06 21:33:25 +02:00
log.error(
'An error occured setting Postgresql database connection encoding to "%s"',
enc, exc_info=1
)
return False
2023-01-06 19:36:14 +01:00
@staticmethod
def _log_query(sql, params):
log.debug(
'Run SQL query "%s" %s',
sql,
"with params = {0}".format( # pylint: disable=consider-using-f-string
', '.join([
f'{key} = {value}'
for key, value in params.items()
]) if params else "without params"
)
)
@staticmethod
def _log_query_exception(sql, params):
log.exception(
'Error during SQL query "%s" %s',
sql,
"with params = {0}".format( # pylint: disable=consider-using-f-string
', '.join([
f'{key} = {value}'
for key, value in params.items()
]) if params else "without params"
)
)
def doSQL(self, sql, params=None):
2021-10-06 21:33:25 +02:00
"""
Run SQL query and commit changes (rollback on error)
:param sql: The SQL query
:param params: The SQL query's parameters as dict (optional)
:return: True on success, False otherwise
:rtype: bool
"""
if self.just_try:
2021-10-06 21:33:25 +02:00
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
return True
2021-11-07 21:37:18 +01:00
cursor = self._conn.cursor()
try:
2023-01-06 19:36:14 +01:00
self._log_query(sql, params)
if params is None:
cursor.execute(sql)
else:
cursor.execute(sql, params)
2021-11-07 21:37:18 +01:00
self._conn.commit()
return True
2023-01-06 19:36:14 +01:00
except psycopg2.Error:
self._log_query_exception(sql, params)
2021-11-07 21:37:18 +01:00
self._conn.rollback()
return False
2021-10-06 21:33:25 +02:00
def doSelect(self, sql, params=None):
"""
Run SELECT SQL query and return list of selected rows as dict
:param sql: The SQL query
:param params: The SQL query's parameters as dict (optional)
:return: List of selected rows as dict on success, False otherwise
:rtype: list, bool
"""
2021-11-07 21:37:18 +01:00
cursor = self._conn.cursor()
try:
2023-01-06 19:36:14 +01:00
self._log_query(sql, params)
cursor.execute(sql, params)
results = cursor.fetchall()
return results
2023-01-06 19:36:14 +01:00
except psycopg2.Error:
self._log_query_exception(sql, params)
2021-10-06 21:33:25 +02:00
return False
2023-01-06 19:36:14 +01:00
@staticmethod
def _map_row_fields_by_index(fields, row):
return dict(
(field, row[idx])
for idx, field in enumerate(fields)
)
#
# SQL helpers
#
2021-11-18 19:21:27 +01:00
@staticmethod
def format_param(param):
2023-01-06 19:36:14 +01:00
""" Format SQL query parameter for prepared query """
return f'%({param})s'
2021-11-18 19:21:27 +01:00
2021-10-06 21:33:25 +02:00
@classmethod
def _combine_params(cls, params, to_add=None, **kwargs):
if to_add:
assert isinstance(to_add, dict), "to_add must be a dict or None"
params = cls._combine_params(params, **to_add)
for param, value in kwargs.items():
if param in params:
raise PgDBDuplicatedSQLParameter(param)
params[param] = value
return params
@classmethod
def _format_where_clauses(cls, where_clauses, params=None, where_op=None):
"""
Format WHERE clauses
:param where_clauses: The WHERE clauses. Could be:
- a raw SQL WHERE clause as string
- a tuple of two elements: a raw WHERE clause and its parameters as dict
- a dict of WHERE clauses with field name as key and WHERE clause value as value
- a list of any of previous valid WHERE clauses
:param params: Dict of other already set SQL query parameters (optional)
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND)
:return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success
:rtype: string, bool
"""
if params is None:
2023-01-06 19:36:14 +01:00
params = {}
2021-10-06 21:33:25 +02:00
if where_op is None:
where_op = 'AND'
if isinstance(where_clauses, str):
2021-10-06 21:33:25 +02:00
return (where_clauses, params)
if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict):
cls._combine_params(params, where_clauses[1])
return (where_clauses[0], params)
if isinstance(where_clauses, (list, tuple)):
sql_where_clauses = []
for where_clause in where_clauses:
sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op)
sql_where_clauses.append(sql2)
return (
2023-01-06 19:36:14 +01:00
f' {where_op} '.join(sql_where_clauses),
2021-10-06 21:33:25 +02:00
params
)
2021-10-06 21:33:25 +02:00
if isinstance(where_clauses, dict):
sql_where_clauses = []
for field, value in where_clauses.items():
param = field
if field in params:
idx = 1
while param in params:
2023-01-06 19:36:14 +01:00
param = f'{field}_{idx}'
idx += 1
cls._combine_params(params, {param: value})
sql_where_clauses.append(
2023-01-06 19:36:14 +01:00
f'"{field}" = {cls.format_param(param)}'
)
2021-10-06 21:33:25 +02:00
return (
2023-01-06 19:36:14 +01:00
f' {where_op} '.join(sql_where_clauses),
2021-10-06 21:33:25 +02:00
params
)
raise PgDBUnsupportedWHEREClauses(where_clauses)
@classmethod
def _add_where_clauses(cls, sql, params, where_clauses, where_op=None):
"""
Add WHERE clauses to an SQL query
:param sql: The SQL query to complete
:param params: The dict of parameters of the SQL query to complete
:param where_clauses: The WHERE clause (see _format_where_clauses())
:param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses())
:return:
:rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters
"""
if where_clauses:
sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op)
sql += " WHERE " + sql_where
return (sql, params)
@staticmethod
def _quote_table_name(table):
""" Quote table name """
2023-01-06 19:36:14 +01:00
return '"{0}"'.format( # pylint: disable=consider-using-f-string
'"."'.join(
table.split('.')
)
)
def insert(self, table, values, just_try=False):
""" Run INSERT SQL query """
2023-01-06 19:36:14 +01:00
# pylint: disable=consider-using-f-string
sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format(
self._quote_table_name(table),
2021-10-06 21:33:25 +02:00
'", "'.join(values.keys()),
", ".join([
2021-11-18 19:21:27 +01:00
self.format_param(key)
2021-10-06 21:33:25 +02:00
for key in values
])
)
if just_try:
2021-11-07 21:37:18 +01:00
log.debug("Just-try mode: execute INSERT query: %s", sql)
return True
log.debug(sql)
2021-10-06 21:33:25 +02:00
if not self.doSQL(sql, params=values):
2021-11-07 21:37:18 +01:00
log.error("Fail to execute INSERT query (SQL: %s)", sql)
return False
return True
2021-10-06 21:33:25 +02:00
def update(self, table, values, where_clauses, where_op=None, just_try=False):
""" Run UPDATE SQL query """
2023-01-06 19:36:14 +01:00
# pylint: disable=consider-using-f-string
sql = 'UPDATE {0} SET {1}'.format(
self._quote_table_name(table),
2021-10-06 21:33:25 +02:00
", ".join([
2023-01-06 19:36:14 +01:00
f'"{key}" = {self.format_param(key)}'
2021-10-06 21:33:25 +02:00
for key in values
])
)
params = values
2021-10-06 21:33:25 +02:00
try:
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses):
log.error('Fail to add WHERE clauses', exc_info=True)
return False
if just_try:
2021-11-07 21:37:18 +01:00
log.debug("Just-try mode: execute UPDATE query: %s", sql)
return True
log.debug(sql)
2021-10-06 21:33:25 +02:00
if not self.doSQL(sql, params=params):
2021-11-07 21:37:18 +01:00
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
return False
return True
2021-10-06 21:33:25 +02:00
def delete(self, table, where_clauses, where_op='AND', just_try=False):
""" Run DELETE SQL query """
2023-01-06 19:36:14 +01:00
sql = f'DELETE FROM {self._quote_table_name(table)}'
params = {}
2021-10-06 21:33:25 +02:00
try:
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses):
log.error('Fail to add WHERE clauses', exc_info=True)
return False
if just_try:
2021-11-07 21:37:18 +01:00
log.debug("Just-try mode: execute UPDATE query: %s", sql)
2021-10-06 21:33:25 +02:00
return True
log.debug(sql)
if not self.doSQL(sql, params=params):
2021-11-07 21:37:18 +01:00
log.error("Fail to execute UPDATE query (SQL: %s)", sql)
return False
2021-10-06 21:33:25 +02:00
return True
2021-10-06 21:33:25 +02:00
def truncate(self, table, just_try=False):
""" Run TRUNCATE SQL query """
2023-01-06 19:36:14 +01:00
sql = f'TRUNCATE TABLE {self._quote_table_name(table)}'
if just_try:
2021-11-07 21:37:18 +01:00
log.debug("Just-try mode: execute TRUNCATE query: %s", sql)
return True
log.debug(sql)
if not self.doSQL(sql):
2021-11-07 21:37:18 +01:00
log.error("Fail to execute TRUNCATE query (SQL: %s)", sql)
return False
return True
2021-10-06 21:33:25 +02:00
def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False):
""" Run SELECT SQL query """
2021-10-06 21:33:25 +02:00
sql = "SELECT "
if fields is None:
sql += "*"
elif isinstance(fields, str):
2023-01-06 19:36:14 +01:00
sql += f'"{fields}"'
else:
2023-01-06 19:36:14 +01:00
sql += '"{0}"'.format('", "'.join(fields)) # pylint: disable=consider-using-f-string
2023-01-06 19:36:14 +01:00
sql += f' FROM {self._quote_table_name(table)}'
params = {}
2021-10-06 21:33:25 +02:00
try:
sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op)
except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses):
log.error('Fail to add WHERE clauses', exc_info=True)
return False
if order_by:
if isinstance(order_by, str):
2023-01-06 19:36:14 +01:00
sql += f' ORDER BY {order_by}'
elif (
2022-01-20 19:14:16 +01:00
isinstance(order_by, (list, tuple)) and len(order_by) == 2
and isinstance(order_by[0], str)
and isinstance(order_by[1], str)
and order_by[1].upper() in ('ASC', 'UPPER')
):
2023-01-06 19:36:14 +01:00
sql += f' ORDER BY "{order_by[0]}" {order_by[1].upper()}'
else:
raise PgDBInvalidOrderByClause(order_by)
2021-10-06 21:33:25 +02:00
if just_try:
2021-11-07 21:37:18 +01:00
log.debug("Just-try mode: execute SELECT query : %s", sql)
2021-10-06 21:33:25 +02:00
return just_try
return self.doSelect(sql, params=params)
#
# Depreated helpers
#
@classmethod
def _quote_value(cls, value):
""" Quote a value for SQL query """
if value is None:
return 'NULL'
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, datetime.datetime):
value = cls._format_datetime(value)
elif isinstance(value, datetime.date):
value = cls._format_date(value)
2023-01-06 19:36:14 +01:00
# pylint: disable=consider-using-f-string
return "'{0}'".format(value.replace("'", "''"))
2021-10-06 21:33:25 +02:00
@classmethod
def _format_datetime(cls, value):
""" Format datetime object as string """
assert isinstance(value, datetime.datetime)
return value.strftime(cls.datetime_format)
@classmethod
def _format_date(cls, value):
""" Format date object as string """
assert isinstance(value, (datetime.date, datetime.datetime))
return value.strftime(cls.date_format)
@classmethod
def time2datetime(cls, time):
""" Convert timestamp to datetime string """
return cls._format_datetime(datetime.datetime.fromtimestamp(int(time)))
@classmethod
def time2date(cls, time):
""" Convert timestamp to date string """
return cls._format_date(datetime.date.fromtimestamp(int(time)))