From f541630a631a0366a9bf8301a88645fce999c57b Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Thu, 23 Jun 2022 18:38:21 +0200 Subject: [PATCH] ldap: code cleaning / fix pylint/flake8 warnings --- mylib/ldap.py | 79 +++++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/mylib/ldap.py b/mylib/ldap.py index cb18156..b35ec6d 100644 --- a/mylib/ldap.py +++ b/mylib/ldap.py @@ -10,10 +10,10 @@ import pytz import dateutil.parser import dateutil.tz import ldap +from ldap import modlist from ldap.controls import SimplePagedResultsControl from ldap.controls.simple import RelaxRulesControl from ldap.dn import escape_dn_chars, explode_dn -import ldap.modlist as modlist from mylib import pretty_format_dict @@ -83,6 +83,7 @@ class LdapServer: if self.con == 0: try: if not self.checkCert: + # pylint: disable=no-member ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) con = ldap.initialize(self.uri) if self.v2: @@ -98,7 +99,9 @@ class LdapServer: self.con = con return True except ldap.LDAPError as e: # pylint: disable=no-member - self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e, logging.CRITICAL) + self._error( + f'LdapServer - Error connecting and binding to LDAP server: {e}', + logging.CRITICAL) return False return True @@ -111,7 +114,7 @@ class LdapServer: return ldap.SCOPE_ONELEVEL # pylint: disable=no-member if scope == 'sub': return ldap.SCOPE_SUBTREE # pylint: disable=no-member - raise Exception("Unknown LDAP scope '%s'" % scope) + raise Exception(f'Unknown LDAP scope "{scope}"') def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None): """ Run a search on LDAP server """ @@ -175,12 +178,16 @@ class LdapServer: serverctrls=[page_control] ) except ldap.LDAPError as e: # pylint: disable=no-member - self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL) + self._error( + f'LdapServer - Error running paged search on LDAP server: {e}', + logging.CRITICAL) return False try: rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) # pylint: disable=unused-variable except ldap.LDAPError as e: # pylint: disable=no-member - self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL) + self._error( + f'LdapServer - Error pulling paged search result from LDAP server: {e}', + logging.CRITICAL) return False # Detect and catch PagedResultsControl answer from rctrls @@ -193,7 +200,9 @@ class LdapServer: # If PagedResultsControl answer not detected, paged serach if not result_page_control: - self._error('LdapServer - Server ignores RFC2696 control, paged search can not works', logging.CRITICAL) + self._error( + 'LdapServer - Server ignores RFC2696 control, paged search can not works', + logging.CRITICAL) return False # Store results of this page @@ -219,7 +228,7 @@ class LdapServer: self.con.add_s(dn, ldif) return True except ldap.LDAPError as e: # pylint: disable=no-member - self._error("LdapServer - Error adding %s : %s" % (dn, e), logging.ERROR) + self._error(f'LdapServer - Error adding {dn}: {e}', logging.ERROR) return False @@ -231,7 +240,7 @@ class LdapServer: encode_ldap_value(new) if encode else new, ignore_attr_types=ignore_attrs if ignore_attrs else [] ) - if ldif == []: + if not ldif: return True assert self.con or self.connect() try: @@ -241,7 +250,9 @@ class LdapServer: self.con.modify_s(dn, ldif) return True except ldap.LDAPError as e: # pylint: disable=no-member - self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR) + self._error( + f'LdapServer - Error updating {dn} : {e}\nOld: {old}\nNew: {new}', + logging.ERROR) return False @staticmethod @@ -252,7 +263,7 @@ class LdapServer: encode_ldap_value(new) if encode else new, ignore_attr_types=ignore_attrs if ignore_attrs else [] ) - if ldif == []: + if not ldif: return False return True @@ -282,11 +293,11 @@ class LdapServer: elif op == ldap.MOD_REPLACE: # pylint: disable=no-member op = 'REPLACE' else: - op = 'UNKNOWN (=%s)' % op + op = f'UNKNOWN (={op})' if val is None and op == 'DELETE': - msg.append('%s - %s %s' % (prefix, op, attr)) + msg.append(f'{prefix} - {op} {attr}') else: - msg.append('%s - %s %s: %s' % (prefix, op, attr, val)) + msg.append(f'{prefix} - {op} {attr}: {val}') return '\n'.join(msg) def rename_object(self, dn, new_rdn, new_sup=None, delete_old=True): @@ -314,13 +325,9 @@ class LdapServer: return True except ldap.LDAPError as e: # pylint: disable=no-member self._error( - "LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % ( - dn, - new_rdn, - "same" if new_sup is None else new_sup, - delete_old, - e - ), + f'LdapServer - Error renaming {dn} in {new_rdn} ' + f'(new superior: {"same" if new_sup is None else new_sup}, ' + f'delete old: {delete_old}): {e}', logging.ERROR ) @@ -334,7 +341,8 @@ class LdapServer: self.con.delete_s(dn) return True except ldap.LDAPError as e: # pylint: disable=no-member - self._error("LdapServer - Error deleting %s : %s" % (dn, e), logging.ERROR) + self._error( + f'LdapServer - Error deleting {dn}: {e}', logging.ERROR) return False @@ -405,7 +413,7 @@ class LdapClient: if self._config and self._config.defined(self._config_section, option): return self._config.get(self._config_section, option) - assert not required, "Options %s not defined" % option + assert not required, f'Options {option} not defined' return default @@ -589,7 +597,8 @@ class LdapClient: return None if len(ldap_data) > 1: - raise LdapClientException('More than one %s "%s": %s' % (type_name, object_name, ' / '.join(ldap_data.keys()))) + raise LdapClientException( + f'More than one {type_name} "{object_name}": {" / ".join(ldap_data.keys())}') dn = next(iter(ldap_data)) return self._get_obj(dn, ldap_data[dn]) @@ -678,7 +687,9 @@ class LdapClient: log.debug('No %s found with %s="%s"', type_name, attr, value) return None if len(matched) > 1: - raise LdapClientException('More than one %s with %s="%s" found: %s' % (type_name, attr, value, ' / '.join(matched.keys()))) + raise LdapClientException( + f'More than one {type_name} with {attr}="{value}" found: ' + f'{" / ".join(matched.keys())}') dn = next(iter(matched)) return matched[dn] @@ -693,7 +704,7 @@ class LdapClient: """ old = {} new = {} - protected_attrs = [a.lower() for a in protected_attrs or list()] + protected_attrs = [a.lower() for a in protected_attrs or []] protected_attrs.append('dn') # New/updated attributes for attr in attrs: @@ -775,7 +786,7 @@ class LdapClient: :param protected_attrs: An optional list of protected attributes :param rdn_attr: The LDAP object RDN attribute (to detect renaming, default: auto-detected) """ - assert isinstance(changes, (list, tuple)) and len(changes) == 2 and isinstance(changes[0], dict) and isinstance(changes[1], dict), "changes parameter must be a result of get_changes() method (%s given)" % type(changes) + assert isinstance(changes, (list, tuple)) and len(changes) == 2 and isinstance(changes[0], dict) and isinstance(changes[1], dict), f'changes parameter must be a result of get_changes() method ({type(changes)} given)' # In case of RDN change, we need to modify passed changes, copy it to make it unchanged in # this case _changes = copy.deepcopy(changes) @@ -799,8 +810,8 @@ class LdapClient: # Compute new object DN dn_parts = explode_dn(self.decode(ldap_obj['dn'])) basedn = ','.join(dn_parts[1:]) - new_rdn = '%s=%s' % (rdn_attr, escape_dn_chars(self.decode(new_rdn_values[0]))) - new_dn = '%s,%s' % (new_rdn, basedn) + new_rdn = f'{rdn_attr}={escape_dn_chars(self.decode(new_rdn_values[0]))}' + new_dn = f'{new_rdn},{basedn}' # Rename object log.debug('%s: Rename to %s', ldap_obj['dn'], new_dn) @@ -900,8 +911,8 @@ def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None): the timezone (optional, default : server local timezone) :param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP) """ - assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) - assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone) + assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), f'to_timezone must be None, a datetime.tzinfo object or a string (not {type(to_timezone)})' + assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), f'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not {type(default_timezone)})' date = dateutil.parser.parse(value, dayfirst=False) if not date.tzinfo: if naive: @@ -953,9 +964,9 @@ def format_datetime(value, from_timezone=None, to_timezone=None, naive=None): :param to_timezone: The timezone used in LDAP (optional, default : UTC) :param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion) """ - assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value) - assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone) - assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) + assert isinstance(value, datetime.datetime), f'First parameter must be an datetime.datetime object (not {type(value)})' + assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), f'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not {type(from_timezone)})' + assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), f'to_timezone must be None, a datetime.tzinfo object or a string (not {type(to_timezone)})' if not value.tzinfo and not naive: if not from_timezone or from_timezone == 'local': from_timezone = dateutil.tz.tzlocal() @@ -995,5 +1006,5 @@ def format_date(value, from_timezone=None, to_timezone=None, naive=True): :param naive: Use naive datetime : do not handle timezone conversion before formating and return datetime as UTC (because LDAP required a timezone) """ - assert isinstance(value, datetime.date), 'First parameter must be an datetime.date object (not %s)' % type(value) + assert isinstance(value, datetime.date), f'First parameter must be an datetime.date object (not {type(value)})' return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive)