python-mylib/PgDB.py

180 lines
4.7 KiB
Python
Raw Normal View History

2013-06-07 12:13:03 +02:00
#!/usr/bin/python
2018-12-12 17:17:56 +01:00
# -*- coding: utf-8 -*-
2013-06-07 12:13:03 +02:00
import psycopg2
import logging
import sys
2018-12-12 17:17:56 +01:00
import traceback
import datetime
2013-06-07 12:13:03 +02:00
class PgDB(object):
host = ""
user = ""
pwd = ""
db = ""
con = 0
2018-12-12 17:17:56 +01:00
date_format = '%Y-%m-%d'
datetime_format = '%Y-%m-%d %H:%M:%S'
2013-06-07 12:13:03 +02:00
def __init__(self,host,user,pwd,db):
self.host = host
self.user = user
self.pwd = pwd
self.db = db
def connect(self):
if self.con == 0:
try:
con = psycopg2.connect("dbname='%s' user='%s' host='%s' password='%s'" % (self.db,self.user,self.host,self.pwd))
self.con = con
2019-03-11 14:32:18 +01:00
except Exception:
logging.fatal('An error occured during Postgresql database connection.', exc_info=1)
2013-06-07 12:13:03 +02:00
sys.exit(1)
2015-04-01 17:52:32 +02:00
def close(self):
if self.con:
self.con.close()
2013-06-07 12:13:03 +02:00
def setEncoding(self,enc):
if self.con:
try:
self.con.set_client_encoding(enc)
return True
2019-03-11 14:32:18 +01:00
except Exception:
logging.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1)
2013-06-07 12:13:03 +02:00
return False
2015-04-01 17:52:32 +02:00
def doSQL(self,sql,params=None):
cursor = self.con.cursor()
try:
if params is None:
cursor.execute(sql)
else:
cursor.execute(sql,params)
self.con.commit()
return True
2019-03-11 14:32:18 +01:00
except Exception:
logging.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1)
2015-04-01 17:52:32 +02:00
self.con.rollback()
return False
2013-06-07 12:13:03 +02:00
def doSelect(self,sql):
cursor = self.con.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
2019-03-11 14:32:18 +01:00
except Exception:
logging.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1)
2018-12-12 17:17:56 +01:00
return False
#
# SQL helpers
#
def _quote_value(self, value):
if isinstance(value, int) or isinstance(value, float):
2018-12-12 17:17:56 +01:00
return unicode(value)
if isinstance(value, str):
2018-12-12 17:17:56 +01:00
value = unicode(value)
elif isinstance(value, datetime.datetime):
2018-12-12 17:17:56 +01:00
value = unicode(self._format_datetime(value))
elif isinstance(value, datetime.date):
2018-12-12 17:17:56 +01:00
value = unicode(self._format_date(value))
return u"'%s'" % value.replace(u"'",u"''")
def _format_where_clauses(self, where_clauses, where_op=u'AND'):
if isinstance(where_clauses, str):
2018-12-12 17:17:56 +01:00
return where_clauses
elif isinstance(where_clauses, list):
2018-12-12 17:17:56 +01:00
return (u" %s " % where_op).join(where_clauses)
elif isinstance(where_clauses, dict):
return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses))
2019-03-11 14:32:18 +01:00
logging.error('Unsupported where clauses type %s', type(where_clauses))
2018-12-12 17:17:56 +01:00
return False
def _format_datetime(self, datetime):
return datetime.strftime(self.datetime_format)
def _format_date(self, date):
return date.strftime(self.date_format)
def time2datetime(self, time):
return self._format_datetime(datetime.fromtimestamp(int(time)))
def time2date(self, time):
return self._format_date(datetime.fromtimestamp(int(time)))
def insert(self, table, values, just_try=False):
sql=u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values)))
if just_try:
2019-03-11 14:32:18 +01:00
logging.debug(u"Just-try mode : execute INSERT query : %s", sql)
2018-12-12 17:17:56 +01:00
return True
logging.debug(sql)
if not self.doSQL(sql):
logging.error(u"Fail to execute INSERT query (SQL : %s)" % sql)
return False
return True
def update(self, table, values, where_clauses, where_op=u'AND', just_try=False):
where=self._format_where_clauses(where_clauses, where_op=where_op)
if not where:
2013-06-07 12:13:03 +02:00
return False
2018-12-12 17:17:56 +01:00
sql=u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where)
if just_try:
2019-03-11 14:32:18 +01:00
logging.debug(u"Just-try mode : execute UPDATE query : %s", sql)
2018-12-12 17:17:56 +01:00
return True
logging.debug(sql)
if not self.doSQL(sql):
2019-03-11 14:32:18 +01:00
logging.error(u"Fail to execute UPDATE query (SQL : %s)", sql)
2018-12-12 17:17:56 +01:00
return False
return True
def delete(self, table, where_clauses, where_op=u'AND', just_try=False):
where=self._format_where_clauses(where_clauses, where_op=where_op)
if not where:
return False
sql=u"DELETE FROM %s WHERE %s" % (table, where)
if just_try:
2019-03-11 14:32:18 +01:00
logging.debug(u"Just-try mode : execute DELETE query : %s", sql)
2018-12-12 17:17:56 +01:00
return True
logging.debug(sql)
if not self.doSQL(sql):
2019-03-11 14:32:18 +01:00
logging.error(u"Fail to execute DELETE query (SQL : %s)", sql)
2018-12-12 17:17:56 +01:00
return False
return True
def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None, just_try=False):
sql = u"SELECT "
if fields is None:
sql += "*"
elif isinstance(fields, str) or isinstance(fields, unicode):
2018-12-12 17:17:56 +01:00
sql += fields
else:
sql += u", ".join(fields)
sql += u" FROM " + table
if where_clauses:
where=self._format_where_clauses(where_clauses, where_op=where_op)
if not where:
return False
sql += u" WHERE " + where
if order_by:
sql += u"ORDER %s" % order_by
return self.doSelect(sql)