204 lines
6.5 KiB
Python
204 lines
6.5 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
""" PostgreSQL client """
|
||
|
|
||
|
import datetime
|
||
|
import logging
|
||
|
import sys
|
||
|
|
||
|
import psycopg2
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
class PgDB:
|
||
|
""" PostgreSQL client """
|
||
|
|
||
|
host = ""
|
||
|
user = ""
|
||
|
pwd = ""
|
||
|
db = ""
|
||
|
|
||
|
con = 0
|
||
|
|
||
|
date_format = '%Y-%m-%d'
|
||
|
datetime_format = '%Y-%m-%d %H:%M:%S'
|
||
|
|
||
|
def __init__(self, host, user, pwd, db, just_try=False):
|
||
|
self.host = host
|
||
|
self.user = user
|
||
|
self.pwd = pwd
|
||
|
self.db = db
|
||
|
self.just_try = just_try
|
||
|
|
||
|
def connect(self):
|
||
|
""" Connect to PostgreSQL server """
|
||
|
if self.con == 0:
|
||
|
try:
|
||
|
con = psycopg2.connect("dbname='%s' user='%s' host='%s' password='%s'" % (self.db,self.user,self.host,self.pwd))
|
||
|
self.con = con
|
||
|
except Exception:
|
||
|
log.fatal('An error occured during Postgresql database connection.', exc_info=1)
|
||
|
sys.exit(1)
|
||
|
|
||
|
def close(self):
|
||
|
""" Close connection with PostgreSQL server (if opened) """
|
||
|
if self.con:
|
||
|
self.con.close()
|
||
|
|
||
|
def setEncoding(self, enc):
|
||
|
""" Set connection encoding """
|
||
|
if self.con:
|
||
|
try:
|
||
|
self.con.set_client_encoding(enc)
|
||
|
return True
|
||
|
except Exception:
|
||
|
log.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1)
|
||
|
return False
|
||
|
|
||
|
def doSQL(self, sql, params=None):
|
||
|
""" Run SELECT SQL query and return result as dict """
|
||
|
if self.just_try:
|
||
|
log.debug(u"Just-try mode : do not really execute SQL query '%s'", sql)
|
||
|
return True
|
||
|
|
||
|
cursor = self.con.cursor()
|
||
|
try:
|
||
|
if params is None:
|
||
|
cursor.execute(sql)
|
||
|
else:
|
||
|
cursor.execute(sql, params)
|
||
|
self.con.commit()
|
||
|
return True
|
||
|
except Exception:
|
||
|
log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1)
|
||
|
self.con.rollback()
|
||
|
return False
|
||
|
|
||
|
def doSelect(self, sql):
|
||
|
""" Run SELECT SQL query and return result as dict """
|
||
|
cursor = self.con.cursor()
|
||
|
try:
|
||
|
cursor.execute(sql)
|
||
|
results = cursor.fetchall()
|
||
|
return results
|
||
|
except Exception:
|
||
|
log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1)
|
||
|
return False
|
||
|
|
||
|
#
|
||
|
# SQL helpers
|
||
|
#
|
||
|
def _quote_value(self, value):
|
||
|
""" Quote a value for SQL query """
|
||
|
if isinstance(value, (int, float)):
|
||
|
return str(value)
|
||
|
|
||
|
if isinstance(value, datetime.datetime):
|
||
|
value = self._format_datetime(value)
|
||
|
elif isinstance(value, datetime.date):
|
||
|
value = self._format_date(value)
|
||
|
|
||
|
return u"'%s'" % value.replace(u"'", u"''")
|
||
|
|
||
|
def _format_where_clauses(self, where_clauses, where_op=u'AND'):
|
||
|
""" Format WHERE clauses """
|
||
|
if isinstance(where_clauses, str):
|
||
|
return where_clauses
|
||
|
if isinstance(where_clauses, list):
|
||
|
return (u" %s " % where_op).join(where_clauses)
|
||
|
if isinstance(where_clauses, dict):
|
||
|
return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses))
|
||
|
log.error('Unsupported where clauses type %s', type(where_clauses))
|
||
|
return False
|
||
|
|
||
|
def _format_datetime(self, value):
|
||
|
""" Format datetime object as string """
|
||
|
assert isinstance(value, datetime.datetime)
|
||
|
return value.strftime(self.datetime_format)
|
||
|
|
||
|
def _format_date(self, value):
|
||
|
""" Format date object as string """
|
||
|
assert isinstance(value, (datetime.date, datetime.datetime))
|
||
|
return value.strftime(self.date_format)
|
||
|
|
||
|
def time2datetime(self, time):
|
||
|
""" Convert timestamp to datetime string """
|
||
|
return self._format_datetime(datetime.datetime.fromtimestamp(int(time)))
|
||
|
|
||
|
def time2date(self, time):
|
||
|
""" Convert timestamp to date string """
|
||
|
return self._format_date(datetime.date.fromtimestamp(int(time)))
|
||
|
|
||
|
def insert(self, table, values, just_try=False):
|
||
|
""" Run INSERT SQL query """
|
||
|
sql=u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values)))
|
||
|
|
||
|
if just_try:
|
||
|
log.debug(u"Just-try mode : execute INSERT query : %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql):
|
||
|
log.error(u"Fail to execute INSERT query (SQL : %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def update(self, table, values, where_clauses, where_op=u'AND', just_try=False):
|
||
|
""" Run UPDATE SQL query """
|
||
|
where=self._format_where_clauses(where_clauses, where_op=where_op)
|
||
|
if not where:
|
||
|
return False
|
||
|
|
||
|
sql=u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where)
|
||
|
|
||
|
if just_try:
|
||
|
log.debug(u"Just-try mode : execute UPDATE query : %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql):
|
||
|
log.error(u"Fail to execute UPDATE query (SQL : %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def delete(self, table, where_clauses, where_op=u'AND', just_try=False):
|
||
|
""" Run DELETE SQL query """
|
||
|
where=self._format_where_clauses(where_clauses, where_op=where_op)
|
||
|
if not where:
|
||
|
return False
|
||
|
|
||
|
sql=u"DELETE FROM %s WHERE %s" % (table, where)
|
||
|
|
||
|
if just_try:
|
||
|
log.debug(u"Just-try mode : execute DELETE query : %s", sql)
|
||
|
return True
|
||
|
|
||
|
log.debug(sql)
|
||
|
if not self.doSQL(sql):
|
||
|
log.error(u"Fail to execute DELETE query (SQL : %s)", sql)
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None):
|
||
|
""" Run SELECT SQL query """
|
||
|
sql = u"SELECT "
|
||
|
if fields is None:
|
||
|
sql += "*"
|
||
|
elif isinstance(fields, str):
|
||
|
sql += fields
|
||
|
else:
|
||
|
sql += u", ".join(fields)
|
||
|
|
||
|
sql += u" FROM " + table
|
||
|
if where_clauses:
|
||
|
where=self._format_where_clauses(where_clauses, where_op=where_op)
|
||
|
if not where:
|
||
|
return False
|
||
|
|
||
|
sql += u" WHERE " + where
|
||
|
|
||
|
if order_by:
|
||
|
sql += u"ORDER %s" % order_by
|
||
|
|
||
|
return self.doSelect(sql)
|