169 lines
5 KiB
Python
169 lines
5 KiB
Python
""" PostgreSQL client """
|
|
|
|
import datetime
|
|
import logging
|
|
import sys
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
from mylib.db import DB, DBFailToConnect
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class PgDB(DB):
|
|
"""PostgreSQL client"""
|
|
|
|
_host = None
|
|
_user = None
|
|
_pwd = None
|
|
_db = None
|
|
|
|
date_format = "%Y-%m-%d"
|
|
datetime_format = "%Y-%m-%d %H:%M:%S"
|
|
|
|
def __init__(self, host, user, pwd, db, **kwargs):
|
|
self._host = host
|
|
self._user = user
|
|
self._pwd = pwd
|
|
self._db = db
|
|
super().__init__(**kwargs)
|
|
|
|
def connect(self, exit_on_error=True):
|
|
"""Connect to PostgreSQL server"""
|
|
if self._conn is None:
|
|
try:
|
|
log.info(
|
|
"Connect on PostgreSQL server %s as %s on database %s",
|
|
self._host,
|
|
self._user,
|
|
self._db,
|
|
)
|
|
self._conn = psycopg2.connect(
|
|
dbname=self._db, user=self._user, host=self._host, password=self._pwd
|
|
)
|
|
except psycopg2.Error as err:
|
|
log.fatal(
|
|
"An error occurred during Postgresql database connection (%s@%s, database=%s).",
|
|
self._user,
|
|
self._host,
|
|
self._db,
|
|
exc_info=1,
|
|
)
|
|
if exit_on_error:
|
|
sys.exit(1)
|
|
else:
|
|
raise DBFailToConnect(f"{self._user}@{self._host}:{self._db}") from err
|
|
return True
|
|
|
|
def close(self):
|
|
"""Close connection with PostgreSQL server (if opened)"""
|
|
if self._conn:
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
def setEncoding(self, enc):
|
|
"""Set connection encoding"""
|
|
if self._conn:
|
|
try:
|
|
self._conn.set_client_encoding(enc)
|
|
return True
|
|
except psycopg2.Error:
|
|
log.error(
|
|
'An error occurred setting Postgresql database connection encoding to "%s"',
|
|
enc,
|
|
exc_info=1,
|
|
)
|
|
return False
|
|
|
|
def doSQL(self, sql, params=None):
|
|
"""
|
|
Run SQL query and commit changes (rollback on error)
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: True on success, False otherwise
|
|
:rtype: bool
|
|
"""
|
|
if self.just_try:
|
|
log.debug("Just-try mode : do not really execute SQL query '%s'", sql)
|
|
return True
|
|
|
|
cursor = self._conn.cursor()
|
|
try:
|
|
self._log_query(sql, params)
|
|
if params is None:
|
|
cursor.execute(sql)
|
|
else:
|
|
cursor.execute(sql, params)
|
|
self._conn.commit()
|
|
return True
|
|
except psycopg2.Error:
|
|
self._log_query_exception(sql, params)
|
|
self._conn.rollback()
|
|
return False
|
|
|
|
def doSelect(self, sql, params=None):
|
|
"""
|
|
Run SELECT SQL query and return list of selected rows as dict
|
|
|
|
:param sql: The SQL query
|
|
:param params: The SQL query's parameters as dict (optional)
|
|
|
|
:return: List of selected rows as dict on success, False otherwise
|
|
:rtype: list, bool
|
|
"""
|
|
cursor = self._conn.cursor(cursor_factory=RealDictCursor)
|
|
try:
|
|
self._log_query(sql, params)
|
|
cursor.execute(sql, params)
|
|
results = cursor.fetchall()
|
|
return list(map(dict, results))
|
|
except psycopg2.Error:
|
|
self._log_query_exception(sql, params)
|
|
return False
|
|
|
|
#
|
|
# Deprecated helpers
|
|
#
|
|
|
|
@classmethod
|
|
def _quote_value(cls, value):
|
|
"""Quote a value for SQL query"""
|
|
if value is None:
|
|
return "NULL"
|
|
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
|
|
if isinstance(value, datetime.datetime):
|
|
value = cls._format_datetime(value)
|
|
elif isinstance(value, datetime.date):
|
|
value = cls._format_date(value)
|
|
|
|
# pylint: disable=consider-using-f-string
|
|
return "'{}'".format(value.replace("'", "''"))
|
|
|
|
@classmethod
|
|
def _format_datetime(cls, value):
|
|
"""Format datetime object as string"""
|
|
assert isinstance(value, datetime.datetime)
|
|
return value.strftime(cls.datetime_format)
|
|
|
|
@classmethod
|
|
def _format_date(cls, value):
|
|
"""Format date object as string"""
|
|
assert isinstance(value, (datetime.date, datetime.datetime))
|
|
return value.strftime(cls.date_format)
|
|
|
|
@classmethod
|
|
def time2datetime(cls, time):
|
|
"""Convert timestamp to datetime string"""
|
|
return cls._format_datetime(datetime.datetime.fromtimestamp(int(time)))
|
|
|
|
@classmethod
|
|
def time2date(cls, time):
|
|
"""Convert timestamp to date string"""
|
|
return cls._format_date(datetime.date.fromtimestamp(int(time)))
|