LdapServer: fix some pylint warnings

This commit is contained in:
Benjamin Renard 2021-03-24 13:49:08 +01:00
parent 8a62a8545d
commit 45a0b99687

View file

@ -2,15 +2,16 @@
import copy import copy
import datetime import datetime
import logging
import dateutil.parser import dateutil.parser
import dateutil.tz import dateutil.tz
import ldap import ldap
from ldap.controls import SimplePagedResultsControl from ldap.controls import SimplePagedResultsControl
import ldap.modlist as modlist import ldap.modlist as modlist
import logging
import pytz import pytz
class LdapServer(object): class LdapServer(object): # pylint: disable=useless-object-inheritance
uri = None uri = None
dn = None dn = None
@ -34,17 +35,16 @@ class LdapServer(object):
def _error(self,error,level=logging.WARNING): def _error(self,error,level=logging.WARNING):
if self.raiseOnError: if self.raiseOnError:
raise LdapServerException(error) raise LdapServerException(error)
else: self.logger.log(level, error)
self.logger.log(level, error)
def connect(self): def connect(self):
if self.con == 0: if self.con == 0:
try: try:
con = ldap.initialize(self.uri) con = ldap.initialize(self.uri)
if self.v2: if self.v2:
con.protocol_version = ldap.VERSION2 con.protocol_version = ldap.VERSION2 # pylint: disable=no-member
else: else:
con.protocol_version = ldap.VERSION3 con.protocol_version = ldap.VERSION3 # pylint: disable=no-member
if self.dn: if self.dn:
con.simple_bind_s(self.dn,self.pwd) con.simple_bind_s(self.dn,self.pwd)
@ -53,18 +53,19 @@ class LdapServer(object):
self.con = con self.con = con
return True return True
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL) self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL)
return False return False
return True return True
def get_scope(self, scope): @staticmethod
def get_scope(scope):
if scope == 'base': if scope == 'base':
return ldap.SCOPE_BASE return ldap.SCOPE_BASE # pylint: disable=no-member
elif scope == 'one': if scope == 'one':
return ldap.SCOPE_ONELEVEL return ldap.SCOPE_ONELEVEL # pylint: disable=no-member
elif scope == 'sub': if scope == 'sub':
return ldap.SCOPE_SUBTREE return ldap.SCOPE_SUBTREE # pylint: disable=no-member
raise Exception("Unknown LDAP scope '%s'" % scope) raise Exception("Unknown LDAP scope '%s'" % scope)
def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None): def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None):
@ -80,9 +81,8 @@ class LdapServer(object):
res_type, res_data = self.con.result(res_id,0) res_type, res_data = self.con.result(res_id,0)
if res_data == [] or (sizelimit and c > sizelimit): if res_data == [] or (sizelimit and c > sizelimit):
break break
else: if res_type == ldap.RES_SEARCH_ENTRY: # pylint: disable=no-member
if res_type == ldap.RES_SEARCH_ENTRY: ret[res_data[0][0]] = res_data[0][1]
ret[res_data[0][0]] = res_data[0][1]
c += 1 c += 1
return ret return ret
@ -123,12 +123,12 @@ class LdapServer(object):
attrs, attrs,
serverctrls=[page_control] serverctrls=[page_control]
) )
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL) self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL)
return False return False
try: try:
rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) # pylint: disable=unused-variable
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL) self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL)
return False return False
@ -162,47 +162,59 @@ class LdapServer(object):
def add_object(self,dn,attrs): def add_object(self,dn,attrs):
ldif = modlist.addModlist(attrs) ldif = modlist.addModlist(attrs)
try: try:
self.logger.debug("LdapServer - Add %s" % dn) self.logger.debug("LdapServer - Add %s", dn)
self.con.add_s(dn,ldif) self.con.add_s(dn,ldif)
return True return True
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR) self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR)
return False return False
def update_object(self, dn, old, new, ignore_attrs=[]): def update_object(self, dn, old, new, ignore_attrs=None):
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) ldif = modlist.modifyModlist(
old, new,
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
if ldif == []: if ldif == []:
return True return True
try: try:
self.con.modify_s(dn,ldif) self.con.modify_s(dn,ldif)
return True return True
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR) self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR)
return False return False
def update_need(self, old, new, ignore_attrs=[]): @staticmethod
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) def update_need(old, new, ignore_attrs=None):
ldif = modlist.modifyModlist(
old, new,
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
if ldif == []: if ldif == []:
return False return False
return True return True
def get_changes(self, old, new, ignore_attrs=[]): @staticmethod
return modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs) def get_changes(old, new, ignore_attrs=None):
return modlist.modifyModlist(
old, new,
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
def format_changes(self, old, new, ignore_attrs=[], prefix=''): @staticmethod
def format_changes(old, new, ignore_attrs=None, prefix=None):
msg = [] msg = []
for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs): for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs if ignore_attrs else []):
if op == ldap.MOD_ADD: if op == ldap.MOD_ADD: # pylint: disable=no-member
op = 'ADD' op = 'ADD'
elif op == ldap.MOD_DELETE: elif op == ldap.MOD_DELETE: # pylint: disable=no-member
op = 'DELETE' op = 'DELETE'
elif op == ldap.MOD_REPLACE: elif op == ldap.MOD_REPLACE: # pylint: disable=no-member
op = 'REPLACE' op = 'REPLACE'
else: else:
op = 'UNKNOWN (=%s)' % op op = 'UNKNOWN (=%s)' % op
if val is None and op == 'DELETE': if val is None and op == 'DELETE':
msg.append('%s - %s %s' % (prefix, op, attr)) msg.append('%s - %s %s' % (prefix if prefix else '', op, attr))
else: else:
msg.append('%s - %s %s: %s' % (prefix, op, attr, val)) msg.append('%s - %s %s: %s' % (prefix, op, attr, val))
return '\n'.join(msg) return '\n'.join(msg)
@ -228,7 +240,7 @@ class LdapServer(object):
) )
self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old) self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old)
return True return True
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error( self._error(
"LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % ( "LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % (
dn, dn,
@ -242,20 +254,22 @@ class LdapServer(object):
return False return False
def drop_object(self,dn): def drop_object(self, dn):
try: try:
self.logger.debug("LdapServer - Delete %s" % dn) self.logger.debug("LdapServer - Delete %s", dn)
self.con.delete_s(dn) self.con.delete_s(dn)
return True return True
except ldap.LDAPError as e: except ldap.LDAPError as e: # pylint: disable=no-member
self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR) self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR)
return False return False
def get_dn(self,obj): @staticmethod
def get_dn(obj):
return obj[0][0] return obj[0][0]
def get_attr(self,obj,attr,all=None,default=None): @staticmethod
def get_attr(obj, attr, all=None, default=None):
if attr not in obj: if attr not in obj:
for k in obj: for k in obj:
if k.lower() == attr.lower(): if k.lower() == attr.lower():
@ -264,13 +278,10 @@ class LdapServer(object):
if all is not None: if all is not None:
if attr in obj: if attr in obj:
return obj[attr] return obj[attr]
else: return default or []
return default or [] if attr in obj:
else: return obj[attr][0]
if attr in obj: return default
return obj[attr][0]
else:
return default
class LdapServerException(BaseException): class LdapServerException(BaseException):
def __init__(self,msg): def __init__(self,msg):
@ -290,8 +301,8 @@ def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None):
the timezone (optional, default : server local timezone) the timezone (optional, default : server local timezone)
:param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP) :param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP)
""" """
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
assert default_timezone is None or isinstance(default_timezone, datetime.tzinfo) or isinstance(default_timezone, pytz.tzinfo.DstTzInfo) or isinstance(default_timezone, str), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone) assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone)
date = dateutil.parser.parse(value, dayfirst=False) date = dateutil.parser.parse(value, dayfirst=False)
if not date.tzinfo: if not date.tzinfo:
if naive: if naive:
@ -342,8 +353,8 @@ def format_datetime(value, from_timezone=None, to_timezone=None, naive=None):
:param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion) :param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion)
""" """
assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value) assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value)
assert from_timezone is None or isinstance(from_timezone, datetime.tzinfo) or isinstance(from_timezone, pytz.tzinfo.DstTzInfo) or isinstance(from_timezone, str), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone) assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone)
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
if not value.tzinfo and not naive: if not value.tzinfo and not naive:
if not from_timezone or from_timezone == 'local': if not from_timezone or from_timezone == 'local':
from_timezone = dateutil.tz.tzlocal() from_timezone = dateutil.tz.tzlocal()