173 lines
5 KiB
Python
173 lines
5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
""" PostgreSQL client """
|
|
|
|
import datetime
|
|
import logging
|
|
import sys
|
|
|
|
import psycopg2
|
|
|
|
from mylib.db import DB, DBFailToConnect
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class PgDB(DB):
|
|
""" PostgreSQL client """
|
|
|
|
_host = None
|
|
_user = None
|
|
_pwd = None
|
|
_db = None
|
|
|
|
date_format = '%Y-%m-%d'
|
|
datetime_format = '%Y-%m-%d %H:%M:%S'
|
|
|
|
def __init__(self, host, user, pwd, db, **kwargs):
|
|
self._host = host
|
|
self._user = user
|
|
self._pwd = pwd
|
|
self._db = db
|
|
super().__init__(**kwargs)
|
|
|
|
def connect(self, exit_on_error=True):
|
|
""" Connect to PostgreSQL server """
|
|
if self._conn is None:
|
|
try:
|
|
log.info(
|
|
'Connect on PostgreSQL server %s as %s on database %s',
|
|
self._host, self._user, self._db)
|
|
self._conn = psycopg2.connect(
|
|
dbname=self._db,
|
|
user=self._user,
|
|
host=self._host,
|
|
password=self._pwd
|
|
)
|
|
except psycopg2.Error as err:
|
|
log.fatal(
|
|
'An error occured during Postgresql database connection (%s@%s, database=%s).',
|
|
self._user, self._host, self._db, exc_info=1
|
|
)
|
|
if exit_on_error:
|
|
sys.exit(1)
|
|
else:
|
|
raise DBFailToConnect(f'{self._user}@{self._host}:{self._db}') from err
|
|
return True
|
|
|
|
def close(self):
|
|
""" Close connection with PostgreSQL server (if opened) """
|
|
if self._conn:
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
def setEncoding(self, enc):
|
|
""" Set connection encoding """
|
|
if self._conn:
|
|
try:
|
|
self._conn.set_client_encoding(enc)
|
|
return True
|
|
except psycopg2.Error:
|
|
log.error(
|
|
'An error occured setting Postgresql database connection encoding to "%s"',
|
|
enc, exc_info=1
|
|
)
|
|
return False
|
|
|
|
def doSQL(self, sql, params=None):
|
|
"""
|
|
Run SQL query and commit changes (rollback on error)
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: True on success, False otherwise
|
|
:rtype: bool
|
|
"""
|
|
if self.just_try:
|
|
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
|
|
return True
|
|
|
|
cursor = self._conn.cursor()
|
|
try:
|
|
self._log_query(sql, params)
|
|
if params is None:
|
|
cursor.execute(sql)
|
|
else:
|
|
cursor.execute(sql, params)
|
|
self._conn.commit()
|
|
return True
|
|
except psycopg2.Error:
|
|
self._log_query_exception(sql, params)
|
|
self._conn.rollback()
|
|
return False
|
|
|
|
def doSelect(self, sql, params=None):
|
|
"""
|
|
Run SELECT SQL query and return list of selected rows as dict
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: List of selected rows as dict on success, False otherwise
|
|
:rtype: list, bool
|
|
"""
|
|
cursor = self._conn.cursor()
|
|
try:
|
|
self._log_query(sql, params)
|
|
cursor.execute(sql, params)
|
|
results = cursor.fetchall()
|
|
return results
|
|
except psycopg2.Error:
|
|
self._log_query_exception(sql, params)
|
|
return False
|
|
|
|
@staticmethod
|
|
def _map_row_fields_by_index(fields, row):
|
|
return dict(
|
|
(field, row[idx])
|
|
for idx, field in enumerate(fields)
|
|
)
|
|
|
|
#
|
|
# Depreated helpers
|
|
#
|
|
|
|
@classmethod
|
|
def _quote_value(cls, value):
|
|
""" Quote a value for SQL query """
|
|
if value is None:
|
|
return 'NULL'
|
|
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
|
|
if isinstance(value, datetime.datetime):
|
|
value = cls._format_datetime(value)
|
|
elif isinstance(value, datetime.date):
|
|
value = cls._format_date(value)
|
|
|
|
# pylint: disable=consider-using-f-string
|
|
return "'{0}'".format(value.replace("'", "''"))
|
|
|
|
@classmethod
|
|
def _format_datetime(cls, value):
|
|
""" Format datetime object as string """
|
|
assert isinstance(value, datetime.datetime)
|
|
return value.strftime(cls.datetime_format)
|
|
|
|
@classmethod
|
|
def _format_date(cls, value):
|
|
""" Format date object as string """
|
|
assert isinstance(value, (datetime.date, datetime.datetime))
|
|
return value.strftime(cls.date_format)
|
|
|
|
@classmethod
|
|
def time2datetime(cls, time):
|
|
""" Convert timestamp to datetime string """
|
|
return cls._format_datetime(datetime.datetime.fromtimestamp(int(time)))
|
|
|
|
@classmethod
|
|
def time2date(cls, time):
|
|
""" Convert timestamp to date string """
|
|
return cls._format_date(datetime.date.fromtimestamp(int(time)))
|