Compare commits

...

2 commits

Author SHA1 Message Date
Benjamin Renard
31eeff367c ldap.LdapClient.get_objects: add paged_search & pagesize parameters
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2022-07-20 10:52:10 +02:00
Benjamin Renard
86eae5dae5 ldap.LdapServer: make parameters accepted by search and paged_search methods identical 2022-07-20 10:50:58 +02:00

View file

@ -116,7 +116,7 @@ class LdapServer:
return ldap.SCOPE_SUBTREE # pylint: disable=no-member return ldap.SCOPE_SUBTREE # pylint: disable=no-member
raise Exception(f'Unknown LDAP scope "{scope}"') raise Exception(f'Unknown LDAP scope "{scope}"')
def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None): def search(self, basedn, filterstr=None, attrs=None, sizelimit=None, scope=None):
""" Run a search on LDAP server """ """ Run a search on LDAP server """
assert self.con or self.connect() assert self.con or self.connect()
res_id = self.con.search( res_id = self.con.search(
@ -141,10 +141,18 @@ class LdapServer:
result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs) result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs)
return result[dn] if dn in result else None return result[dn] if dn in result else None
def paged_search(self, basedn, filterstr, attrs, scope='sub', pagesize=500): def paged_search(self, basedn, filterstr=None, attrs=None, scope=None, pagesize=None,
sizelimit=None):
""" Run a paged search on LDAP server """ """ Run a paged search on LDAP server """
assert not self.v2, "Paged search is not available on LDAP version 2" assert not self.v2, "Paged search is not available on LDAP version 2"
assert self.con or self.connect() assert self.con or self.connect()
# Set parameters default values (if not defined)
filterstr = filterstr if filterstr else '(objectClass=*)'
attrs = attrs if attrs else []
scope = scope if scope else 'sub'
pagesize = pagesize if pagesize else 500
# Initialize SimplePagedResultsControl object # Initialize SimplePagedResultsControl object
page_control = SimplePagedResultsControl( page_control = SimplePagedResultsControl(
True, True,
@ -208,6 +216,13 @@ class LdapServer:
# Store results of this page # Store results of this page
for obj_dn, obj_attrs in rdata: for obj_dn, obj_attrs in rdata:
ret[obj_dn] = obj_attrs ret[obj_dn] = obj_attrs
# If sizelimit reached, stop
if sizelimit and len(ret) >= sizelimit:
break
# If sizelimit reached, stop
if sizelimit and len(ret) >= sizelimit:
break
# If no cookie returned, we are done # If no cookie returned, we are done
if not result_page_control.cookie: if not result_page_control.cookie:
@ -523,7 +538,8 @@ class LdapClient:
return vals if all_values else vals[0] return vals if all_values else vals[0]
return default if default or not all_values else [] return default if default or not all_values else []
def get_objects(self, name, filterstr, basedn, attrs, key_attr=None, warn=True): def get_objects(self, name, filterstr, basedn, attrs, key_attr=None, warn=True,
paged_search=False, pagesize=None):
""" """
Retrieve objects from LDAP Retrieve objects from LDAP
@ -536,16 +552,24 @@ class LdapClient:
:param warn: If True, a warning message will be logged if no object is found :param warn: If True, a warning message will be logged if no object is found
in LDAP directory (otherwise, it will be just a debug message) in LDAP directory (otherwise, it will be just a debug message)
(optional, default: True) (optional, default: True)
:param paged_search: If True, use paged search to list objects from LDAP directory
(optional, default: False)
:param pagesize: When using paged search, the page size
(optional, default: see LdapServer.paged_search)
""" """
if name in self._cached_objects: if name in self._cached_objects:
log.debug('Retreived %s objects from cache', name) log.debug('Retreived %s objects from cache', name)
else: else:
assert self._conn or self.initialize() assert self._conn or self.initialize()
log.debug('Looking for LDAP %s with (filter="%s" / basedn="%s")', name, filterstr, basedn) log.debug('Looking for LDAP %s with (filter="%s" / basedn="%s")', name, filterstr, basedn)
if paged_search:
ldap_data = self._conn.paged_search(
basedn=basedn, filterstr=filterstr, attrs=attrs,
pagesize=pagesize
)
else:
ldap_data = self._conn.search( ldap_data = self._conn.search(
basedn=basedn, basedn=basedn, filterstr=filterstr, attrs=attrs,
filterstr=filterstr,
attrs=attrs
) )
if not ldap_data: if not ldap_data: