# -*- coding: utf-8 -*- """ PostgreSQL client """ import datetime import logging import sys import psycopg2 log = logging.getLogger(__name__) class PgDB: """ PostgreSQL client """ host = "" user = "" pwd = "" db = "" con = 0 date_format = '%Y-%m-%d' datetime_format = '%Y-%m-%d %H:%M:%S' def __init__(self, host, user, pwd, db, just_try=False): self.host = host self.user = user self.pwd = pwd self.db = db self.just_try = just_try def connect(self): """ Connect to PostgreSQL server """ if self.con == 0: try: con = psycopg2.connect( dbname=self.db, user=self.user, host=self.host, password=self.pwd ) self.con = con except Exception: logging.fatal( 'An error occured during Postgresql database connection (%s@%s, database=%s).', self.user, self.host, self.db, exc_info=1 ) sys.exit(1) return True def close(self): """ Close connection with PostgreSQL server (if opened) """ if self.con: self.con.close() def setEncoding(self, enc): """ Set connection encoding """ if self.con: try: self.con.set_client_encoding(enc) return True except Exception: log.error('An error occured setting Postgresql database connection encoding to "%s"', enc, exc_info=1) return False def doSQL(self, sql, params=None): """ Run SELECT SQL query and return result as dict """ if self.just_try: log.debug(u"Just-try mode : do not really execute SQL query '%s'", sql) return True cursor = self.con.cursor() try: if params is None: cursor.execute(sql) else: cursor.execute(sql, params) self.con.commit() return True except Exception: log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) self.con.rollback() return False def doSelect(self, sql, params): """ Run SELECT SQL query and return result as dict """ cursor = self.con.cursor() try: cursor.execute(sql, params) results = cursor.fetchall() return results except Exception: log.error(u'Error during SQL request "%s"', sql.decode('utf-8', 'ignore'), exc_info=1) return False # # SQL helpers # def _quote_value(self, value): """ Quote a value for SQL query """ if isinstance(value, (int, float)): return str(value) if isinstance(value, datetime.datetime): value = self._format_datetime(value) elif isinstance(value, datetime.date): value = self._format_date(value) return u"'%s'" % value.replace(u"'", u"''") def _format_where_clauses(self, where_clauses, where_op=u'AND'): """ Format WHERE clauses """ if isinstance(where_clauses, str): return where_clauses if isinstance(where_clauses, list): return (u" %s " % where_op).join(where_clauses) if isinstance(where_clauses, dict): return (u" %s " % where_op).join(map(lambda x: "%s=%s" % (x, self._quote_value(where_clauses[x])), where_clauses)) log.error('Unsupported where clauses type %s', type(where_clauses)) return False def _format_datetime(self, value): """ Format datetime object as string """ assert isinstance(value, datetime.datetime) return value.strftime(self.datetime_format) def _format_date(self, value): """ Format date object as string """ assert isinstance(value, (datetime.date, datetime.datetime)) return value.strftime(self.date_format) def time2datetime(self, time): """ Convert timestamp to datetime string """ return self._format_datetime(datetime.datetime.fromtimestamp(int(time))) def time2date(self, time): """ Convert timestamp to date string """ return self._format_date(datetime.date.fromtimestamp(int(time))) def insert(self, table, values, just_try=False): """ Run INSERT SQL query """ sql = u"INSERT INTO %s (%s) VALUES (%s)" % (table, u', '.join(values.keys()), u", ".join(map(lambda x: self._quote_value(values[x]), values))) if just_try: log.debug(u"Just-try mode : execute INSERT query : %s", sql) return True log.debug(sql) if not self.doSQL(sql): log.error(u"Fail to execute INSERT query (SQL : %s)", sql) return False return True def update(self, table, values, where_clauses, where_op=u'AND', just_try=False): """ Run UPDATE SQL query """ where = self._format_where_clauses(where_clauses, where_op=where_op) if not where: return False sql = u"UPDATE %s SET %s WHERE %s" % (table, u", ".join(map(lambda x: "%s=%s" % (x, self._quote_value(values[x])), values)), where) if just_try: log.debug(u"Just-try mode : execute UPDATE query : %s", sql) return True log.debug(sql) if not self.doSQL(sql): log.error(u"Fail to execute UPDATE query (SQL : %s)", sql) return False return True def delete(self, table, where_clauses, where_op=u'AND', just_try=False): """ Run DELETE SQL query """ where = self._format_where_clauses(where_clauses, where_op=where_op) if not where: return False sql = u"DELETE FROM %s WHERE %s" % (table, where) if just_try: log.debug(u"Just-try mode : execute DELETE query : %s", sql) return True log.debug(sql) if not self.doSQL(sql): log.error(u"Fail to execute DELETE query (SQL : %s)", sql) return False return True def select(self, table, where_clauses=None, fields=None, where_op=u'AND', order_by=None): """ Run SELECT SQL query """ sql = u"SELECT " if fields is None: sql += "*" elif isinstance(fields, str): sql += fields else: sql += u", ".join(fields) sql += u" FROM " + table if where_clauses: where = self._format_where_clauses(where_clauses, where_op=where_op) if not where: return False sql += u" WHERE " + where if order_by: sql += u"ORDER %s" % order_by return self.doSelect(sql)