diff --git a/LdapServer.py b/LdapServer.py index 87632ce..692eef3 100644 --- a/LdapServer.py +++ b/LdapServer.py @@ -2,15 +2,16 @@ import copy import datetime +import logging + import dateutil.parser import dateutil.tz import ldap from ldap.controls import SimplePagedResultsControl import ldap.modlist as modlist -import logging import pytz -class LdapServer(object): +class LdapServer(object): # pylint: disable=useless-object-inheritance uri = None dn = None @@ -34,17 +35,16 @@ class LdapServer(object): def _error(self,error,level=logging.WARNING): if self.raiseOnError: raise LdapServerException(error) - else: - self.logger.log(level, error) + self.logger.log(level, error) def connect(self): if self.con == 0: try: con = ldap.initialize(self.uri) if self.v2: - con.protocol_version = ldap.VERSION2 + con.protocol_version = ldap.VERSION2 # pylint: disable=no-member else: - con.protocol_version = ldap.VERSION3 + con.protocol_version = ldap.VERSION3 # pylint: disable=no-member if self.dn: con.simple_bind_s(self.dn,self.pwd) @@ -53,18 +53,19 @@ class LdapServer(object): self.con = con return True - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL) return False return True - def get_scope(self, scope): + @staticmethod + def get_scope(scope): if scope == 'base': - return ldap.SCOPE_BASE - elif scope == 'one': - return ldap.SCOPE_ONELEVEL - elif scope == 'sub': - return ldap.SCOPE_SUBTREE + return ldap.SCOPE_BASE # pylint: disable=no-member + if scope == 'one': + return ldap.SCOPE_ONELEVEL # pylint: disable=no-member + if scope == 'sub': + return ldap.SCOPE_SUBTREE # pylint: disable=no-member raise Exception("Unknown LDAP scope '%s'" % scope) def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None): @@ -80,9 +81,8 @@ class LdapServer(object): res_type, res_data = self.con.result(res_id,0) if res_data == [] or (sizelimit and c > sizelimit): break - else: - if res_type == ldap.RES_SEARCH_ENTRY: - ret[res_data[0][0]] = res_data[0][1] + if res_type == ldap.RES_SEARCH_ENTRY: # pylint: disable=no-member + ret[res_data[0][0]] = res_data[0][1] c += 1 return ret @@ -123,12 +123,12 @@ class LdapServer(object): attrs, serverctrls=[page_control] ) - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL) return False try: - rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) - except ldap.LDAPError as e: + rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) # pylint: disable=unused-variable + except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL) return False @@ -162,47 +162,59 @@ class LdapServer(object): def add_object(self,dn,attrs): ldif = modlist.addModlist(attrs) try: - self.logger.debug("LdapServer - Add %s" % dn) + self.logger.debug("LdapServer - Add %s", dn) self.con.add_s(dn,ldif) return True - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR) return False - def update_object(self, dn, old, new, ignore_attrs=[]): - ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) + def update_object(self, dn, old, new, ignore_attrs=None): + ldif = modlist.modifyModlist( + old, new, + ignore_attr_types=ignore_attrs if ignore_attrs else [] + ) if ldif == []: return True try: self.con.modify_s(dn,ldif) return True - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR) return False - def update_need(self, old, new, ignore_attrs=[]): - ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) + @staticmethod + def update_need(old, new, ignore_attrs=None): + ldif = modlist.modifyModlist( + old, new, + ignore_attr_types=ignore_attrs if ignore_attrs else [] + ) if ldif == []: return False return True - def get_changes(self, old, new, ignore_attrs=[]): - return modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) + @staticmethod + def get_changes(old, new, ignore_attrs=None): + return modlist.modifyModlist( + old, new, + ignore_attr_types=ignore_attrs if ignore_attrs else [] + ) - def format_changes(self, old, new, ignore_attrs=[], prefix=''): + @staticmethod + def format_changes(old, new, ignore_attrs=None, prefix=None): msg = [] - for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs): - if op == ldap.MOD_ADD: + for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs if ignore_attrs else []): + if op == ldap.MOD_ADD: # pylint: disable=no-member op = 'ADD' - elif op == ldap.MOD_DELETE: + elif op == ldap.MOD_DELETE: # pylint: disable=no-member op = 'DELETE' - elif op == ldap.MOD_REPLACE: + elif op == ldap.MOD_REPLACE: # pylint: disable=no-member op = 'REPLACE' else: op = 'UNKNOWN (=%s)' % op if val is None and op == 'DELETE': - msg.append('%s - %s %s' % (prefix, op, attr)) + msg.append('%s - %s %s' % (prefix if prefix else '', op, attr)) else: msg.append('%s - %s %s: %s' % (prefix, op, attr, val)) return '\n'.join(msg) @@ -228,7 +240,7 @@ class LdapServer(object): ) self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old) return True - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error( "LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % ( dn, @@ -242,20 +254,22 @@ class LdapServer(object): return False - def drop_object(self,dn): + def drop_object(self, dn): try: - self.logger.debug("LdapServer - Delete %s" % dn) + self.logger.debug("LdapServer - Delete %s", dn) self.con.delete_s(dn) return True - except ldap.LDAPError as e: + except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR) return False - def get_dn(self,obj): + @staticmethod + def get_dn(obj): return obj[0][0] - def get_attr(self,obj,attr,all=None,default=None): + @staticmethod + def get_attr(obj, attr, all=None, default=None): if attr not in obj: for k in obj: if k.lower() == attr.lower(): @@ -264,13 +278,10 @@ class LdapServer(object): if all is not None: if attr in obj: return obj[attr] - else: - return default or [] - else: - if attr in obj: - return obj[attr][0] - else: - return default + return default or [] + if attr in obj: + return obj[attr][0] + return default class LdapServerException(BaseException): def __init__(self,msg): @@ -290,8 +301,8 @@ def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None): the timezone (optional, default : server local timezone) :param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP) """ - assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) - assert default_timezone is None or isinstance(default_timezone, datetime.tzinfo) or isinstance(default_timezone, pytz.tzinfo.DstTzInfo) or isinstance(default_timezone, str), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone) + assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) + assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone) date = dateutil.parser.parse(value, dayfirst=False) if not date.tzinfo: if naive: @@ -342,8 +353,8 @@ def format_datetime(value, from_timezone=None, to_timezone=None, naive=None): :param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion) """ assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value) - assert from_timezone is None or isinstance(from_timezone, datetime.tzinfo) or isinstance(from_timezone, pytz.tzinfo.DstTzInfo) or isinstance(from_timezone, str), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone) - assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) + assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone) + assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) if not value.tzinfo and not naive: if not from_timezone or from_timezone == 'local': from_timezone = dateutil.tz.tzlocal()