# -*- coding: utf-8 -*- """ PostgreSQL client """ import datetime import logging import sys import psycopg2 log = logging.getLogger(__name__) # # Exceptions # class PgDBException(Exception): """ That is the base exception class for all the other exceptions provided by this module. """ def __init__(self, error, *args, **kwargs): for arg, value in kwargs.items(): setattr(self, arg, value) super().__init__(error.format(*args, **kwargs)) class PgDBFailToConnect(PgDBException, RuntimeError): """ Raised on connecting error occurred """ def __init__(self, host, user, db): super().__init__( "An error occured during Postgresql database connection ({user}@{host}, database={db})", user=user, host=host, db=db ) class PgDBDuplicatedSQLParameter(PgDBException, KeyError): """ Raised when trying to set a SQL query parameter and an other parameter with the same name is already set """ def __init__(self, parameter_name): super().__init__( "Duplicated SQL parameter '{parameter_name}'", parameter_name=parameter_name ) class PgDBUnsupportedWHEREClauses(PgDBException, TypeError): """ Raised when trying to execute query with unsupported WHERE clauses provided """ def __init__(self, where_clauses): super().__init__( "Unsupported WHERE clauses: {where_clauses}", where_clauses=where_clauses ) class PgDBInvalidOrderByClause(PgDBException, TypeError): """ Raised when trying to select on table with invalid ORDER BY clause provided """ def __init__(self, order_by): super().__init__( "Invalid ORDER BY clause: {order_by}. Must be a string or a list of two values (ordering field name and direction)", order_by=order_by ) class PgDB: """ PostgreSQL client """ date_format = '%Y-%m-%d' datetime_format = '%Y-%m-%d %H:%M:%S' def __init__(self, host, user, pwd, db, just_try=False): self._host = host self._user = user self._pwd = pwd self._db = db self._conn = None self.just_try = just_try def connect(self, exit_on_error=True): """ Connect to PostgreSQL server """ if self._conn is None: try: log.info( 'Connect on PostgreSQL server %s as %s on database %s', self._host, self._user, self._db) self._conn = psycopg2.connect( dbname=self._db, user=self._user, host=self._host, password=self._pwd ) except Exception as err: log.fatal( 'An error occured during Postgresql database connection (%s@%s, database=%s).', self._user, self._host, self._db, exc_info=1 ) if exit_on_error: sys.exit(1) else: raise PgDBFailToConnect(self._host, self._user, self._db) from err return True def close(self): """ Close connection with PostgreSQL server (if opened) """ if self._conn: self._conn.close() self._conn = None def setEncoding(self, enc): """ Set connection encoding """ if self._conn: try: self._conn.set_client_encoding(enc) return True except Exception: log.error( 'An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1 ) return False def doSQL(self, sql, params=None): """ Run SQL query and commit changes (rollback on error) :param sql: The SQL query :param params: The SQL query's parameters as dict (optional) :return: True on success, False otherwise :rtype: bool """ if self.just_try: log.debug("Just-try mode : do not really execute SQL query '%s'", sql) return True cursor = self._conn.cursor() try: log.debug( 'Run SQL query "%s" %s', sql, "with params = %s" % ', '.join([ "%s = %s" % (key, value) for key, value in params.items() ]) if params else "without params" ) if params is None: cursor.execute(sql) else: cursor.execute(sql, params) self._conn.commit() return True except Exception: log.error( 'Error during SQL query "%s" %s', sql, "with params = %s" % ', '.join([ "%s = %s" % (key, value) for key, value in params.items() ]) if params else "without params", exc_info=True ) self._conn.rollback() return False def doSelect(self, sql, params=None): """ Run SELECT SQL query and return list of selected rows as dict :param sql: The SQL query :param params: The SQL query's parameters as dict (optional) :return: List of selected rows as dict on success, False otherwise :rtype: list, bool """ cursor = self._conn.cursor() try: log.debug( 'Run SQL SELECT query "%s" %s', sql, "with params = %s" % ', '.join([ "%s = %s" % (key, value) for key, value in params.items() ]) if params else "without params" ) cursor.execute(sql, params) results = cursor.fetchall() return results except Exception: log.error( 'Error during SQL query "%s" %s', sql, "with params = %s" % ', '.join([ "%s = %s" % (key, value) for key, value in params.items() ]) if params else "without params", exc_info=True ) return False # # SQL helpers # @staticmethod def format_param(param): return '%({0})s'.format(param) @classmethod def _combine_params(cls, params, to_add=None, **kwargs): if to_add: assert isinstance(to_add, dict), "to_add must be a dict or None" params = cls._combine_params(params, **to_add) for param, value in kwargs.items(): if param in params: raise PgDBDuplicatedSQLParameter(param) params[param] = value return params @classmethod def _format_where_clauses(cls, where_clauses, params=None, where_op=None): """ Format WHERE clauses :param where_clauses: The WHERE clauses. Could be: - a raw SQL WHERE clause as string - a tuple of two elements: a raw WHERE clause and its parameters as dict - a dict of WHERE clauses with field name as key and WHERE clause value as value - a list of any of previous valid WHERE clauses :param params: Dict of other already set SQL query parameters (optional) :param where_op: SQL operator used to combine WHERE clauses together (optional, default: AND) :return: A tuple of two elements: raw SQL WHERE combined clauses and parameters on success :rtype: string, bool """ if params is None: params = dict() if where_op is None: where_op = 'AND' if isinstance(where_clauses, str): return (where_clauses, params) if isinstance(where_clauses, tuple) and len(where_clauses) == 2 and isinstance(where_clauses[1], dict): cls._combine_params(params, where_clauses[1]) return (where_clauses[0], params) if isinstance(where_clauses, (list, tuple)): sql_where_clauses = [] for where_clause in where_clauses: sql2, params = cls._format_where_clauses(where_clause, params=params, where_op=where_op) sql_where_clauses.append(sql2) return ( (" %s " % where_op).join(sql_where_clauses), params ) if isinstance(where_clauses, dict): sql_where_clauses = [] for field, value in where_clauses.items(): param = field if field in params: idx = 1 while param in params: param = '%s_%d' % (field, idx) idx += 1 cls._combine_params(params, {param: value}) sql_where_clauses.append( '"{field}" = {param}'.format(field=field, param=cls.format_param(param)) ) return ( (" %s " % where_op).join(sql_where_clauses), params ) raise PgDBUnsupportedWHEREClauses(where_clauses) @classmethod def _add_where_clauses(cls, sql, params, where_clauses, where_op=None): """ Add WHERE clauses to an SQL query :param sql: The SQL query to complete :param params: The dict of parameters of the SQL query to complete :param where_clauses: The WHERE clause (see _format_where_clauses()) :param where_op: SQL operator used to combine WHERE clauses together (optional, default: see _format_where_clauses()) :return: :rtype: A tuple of two elements: raw SQL WHERE combined clauses and parameters """ if where_clauses: sql_where, params = cls._format_where_clauses(where_clauses, params=params, where_op=where_op) sql += " WHERE " + sql_where return (sql, params) @staticmethod def _quote_table_name(table): """ Quote table name """ return '"{0}"'.format( '"."'.join( table.split('.') ) ) def insert(self, table, values, just_try=False): """ Run INSERT SQL query """ sql = 'INSERT INTO {0} ("{1}") VALUES ({2})'.format( self._quote_table_name(table), '", "'.join(values.keys()), ", ".join([ self.format_param(key) for key in values ]) ) if just_try: log.debug("Just-try mode: execute INSERT query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=values): log.error("Fail to execute INSERT query (SQL: %s)", sql) return False return True def update(self, table, values, where_clauses, where_op=None, just_try=False): """ Run UPDATE SQL query """ sql = 'UPDATE {0} SET {1}'.format( self._quote_table_name(table), ", ".join([ '"{0}" = {1}'.format(key, self.format_param(key)) for key in values ]) ) params = values try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): log.error('Fail to add WHERE clauses', exc_info=True) return False if just_try: log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def delete(self, table, where_clauses, where_op='AND', just_try=False): """ Run DELETE SQL query """ sql = 'DELETE FROM {0}'.format(self._quote_table_name(table)) params = dict() try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): log.error('Fail to add WHERE clauses', exc_info=True) return False if just_try: log.debug("Just-try mode: execute UPDATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql, params=params): log.error("Fail to execute UPDATE query (SQL: %s)", sql) return False return True def truncate(self, table, just_try=False): """ Run TRUNCATE SQL query """ sql = 'TRUNCATE {0}'.format(self._quote_table_name(table)) if just_try: log.debug("Just-try mode: execute TRUNCATE query: %s", sql) return True log.debug(sql) if not self.doSQL(sql): log.error("Fail to execute TRUNCATE query (SQL: %s)", sql) return False return True def select(self, table, where_clauses=None, fields=None, where_op='AND', order_by=None, just_try=False): """ Run SELECT SQL query """ sql = "SELECT " if fields is None: sql += "*" elif isinstance(fields, str): sql += '"{0}"'.format(fields) else: sql += '"{0}"'.format('", "'.join(fields)) sql += ' FROM {0}'.format(self._quote_table_name(table)) params = dict() try: sql, params = self._add_where_clauses(sql, params, where_clauses, where_op=where_op) except (PgDBDuplicatedSQLParameter, PgDBUnsupportedWHEREClauses): log.error('Fail to add WHERE clauses', exc_info=True) return False if order_by: if isinstance(order_by, str): sql += ' ORDER BY {0}'.format(order_by) elif ( isinstance(order_by, (list, tuple)) and len(order_by) == 2 and isinstance(order_by[0], str) and isinstance(order_by[1], str) and order_by[1].upper() in ('ASC', 'UPPER') ): sql += ' ORDER BY "{0}" {1}'.format(order_by[0], order_by[1].upper()) else: raise PgDBInvalidOrderByClause(order_by) if just_try: log.debug("Just-try mode: execute SELECT query : %s", sql) return just_try return self.doSelect(sql, params=params) # # Depreated helpers # @classmethod def _quote_value(cls, value): """ Quote a value for SQL query """ if value is None: return 'NULL' if isinstance(value, (int, float)): return str(value) if isinstance(value, datetime.datetime): value = cls._format_datetime(value) elif isinstance(value, datetime.date): value = cls._format_date(value) return "'%s'" % value.replace("'", "''") @classmethod def _format_datetime(cls, value): """ Format datetime object as string """ assert isinstance(value, datetime.datetime) return value.strftime(cls.datetime_format) @classmethod def _format_date(cls, value): """ Format date object as string """ assert isinstance(value, (datetime.date, datetime.datetime)) return value.strftime(cls.date_format) @classmethod def time2datetime(cls, time): """ Convert timestamp to datetime string """ return cls._format_datetime(datetime.datetime.fromtimestamp(int(time))) @classmethod def time2date(cls, time): """ Convert timestamp to date string """ return cls._format_date(datetime.date.fromtimestamp(int(time)))