python-mylib/mylib/ldap.py

1026 lines
40 KiB
Python
Raw Normal View History

2021-05-19 19:19:57 +02:00
# -*- coding: utf-8 -*-
""" LDAP server connection helper """
2013-06-07 12:13:03 +02:00
2019-06-13 19:15:13 +02:00
import copy
import datetime
2021-03-24 13:49:08 +01:00
import logging
2021-11-03 17:41:15 +01:00
import pytz
2021-03-24 13:49:08 +01:00
import dateutil.parser
import dateutil.tz
2013-06-07 12:13:03 +02:00
import ldap
from ldap import modlist
2020-12-15 18:24:50 +01:00
from ldap.controls import SimplePagedResultsControl
from ldap.controls.simple import RelaxRulesControl
from ldap.dn import escape_dn_chars, explode_dn
2013-06-07 12:13:03 +02:00
2021-06-02 18:59:09 +02:00
from mylib import pretty_format_dict
log = logging.getLogger(__name__)
2021-11-03 17:41:15 +01:00
DEFAULT_ENCODING = 'utf-8'
2021-06-02 18:59:09 +02:00
2021-05-19 19:19:57 +02:00
def decode_ldap_value(value, encoding='utf-8'):
""" Decoding LDAP attribute values helper """
if isinstance(value, bytes):
2022-06-23 18:56:58 +02:00
return value.decode(encoding)
if isinstance(value, list):
return [decode_ldap_value(v) for v in value]
if isinstance(value, dict):
return dict(
(key, decode_ldap_value(values))
for key, values in value.items()
)
return value
def encode_ldap_value(value, encoding='utf-8'):
""" Encoding LDAP attribute values helper """
if isinstance(value, str):
2022-06-23 18:56:58 +02:00
return value.encode(encoding)
if isinstance(value, list):
return [encode_ldap_value(v) for v in value]
if isinstance(value, dict):
return dict(
(key, encode_ldap_value(values))
for key, values in value.items()
)
return value
2021-05-19 19:19:57 +02:00
class LdapServer:
""" LDAP server connection helper """ # pylint: disable=useless-object-inheritance
2013-06-07 12:13:03 +02:00
uri = None
dn = None
pwd = None
v2 = None
con = 0
def __init__(self, uri, dn=None, pwd=None, v2=None,
raiseOnError=False, logger=False, checkCert=True):
2021-05-19 19:19:57 +02:00
self.uri = uri
self.dn = dn
self.pwd = pwd
self.raiseOnError = raiseOnError
self.checkCert = checkCert
if v2:
2021-05-19 19:19:57 +02:00
self.v2 = True
if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
2021-05-19 19:19:57 +02:00
def _error(self, error, level=logging.WARNING):
if self.raiseOnError:
raise LdapServerException(error)
2021-03-24 13:49:08 +01:00
self.logger.log(level, error)
def connect(self):
2021-05-19 19:19:57 +02:00
""" Start connection to LDAP server """
if self.con == 0:
try:
if not self.checkCert:
# pylint: disable=no-member
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
con = ldap.initialize(self.uri)
if self.v2:
2021-03-24 13:49:08 +01:00
con.protocol_version = ldap.VERSION2 # pylint: disable=no-member
else:
2021-03-24 13:49:08 +01:00
con.protocol_version = ldap.VERSION3 # pylint: disable=no-member
if self.dn:
2021-05-19 19:19:57 +02:00
con.simple_bind_s(self.dn, self.pwd)
elif self.uri.startswith('ldapi://'):
con.sasl_interactive_bind_s("", ldap.sasl.external())
self.con = con
return True
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error connecting and binding to LDAP server: {e}',
logging.CRITICAL)
return False
return True
2021-03-24 13:49:08 +01:00
@staticmethod
def get_scope(scope):
2021-05-19 19:19:57 +02:00
""" Map scope parameter to python-ldap value """
if scope == 'base':
2021-03-24 13:49:08 +01:00
return ldap.SCOPE_BASE # pylint: disable=no-member
if scope == 'one':
return ldap.SCOPE_ONELEVEL # pylint: disable=no-member
if scope == 'sub':
return ldap.SCOPE_SUBTREE # pylint: disable=no-member
raise Exception(f'Unknown LDAP scope "{scope}"')
def search(self, basedn, filterstr=None, attrs=None, sizelimit=None, scope=None):
2021-05-19 19:19:57 +02:00
""" Run a search on LDAP server """
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
res_id = self.con.search(
basedn,
self.get_scope(scope if scope else 'sub'),
filterstr if filterstr else '(objectClass=*)',
attrs if attrs else []
)
ret = {}
c = 0
while True:
2021-05-19 19:19:57 +02:00
res_type, res_data = self.con.result(res_id, 0)
if res_data == [] or (sizelimit and c > sizelimit):
break
2021-03-24 13:49:08 +01:00
if res_type == ldap.RES_SEARCH_ENTRY: # pylint: disable=no-member
ret[res_data[0][0]] = res_data[0][1]
c += 1
return ret
def get_object(self, dn, filterstr=None, attrs=None):
2021-05-19 19:19:57 +02:00
""" Retrieve a LDAP object specified by its DN """
result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs)
return result[dn] if dn in result else None
def paged_search(self, basedn, filterstr=None, attrs=None, scope=None, pagesize=None,
sizelimit=None):
2021-05-19 19:19:57 +02:00
""" Run a paged search on LDAP server """
assert not self.v2, "Paged search is not available on LDAP version 2"
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
# Set parameters default values (if not defined)
filterstr = filterstr if filterstr else '(objectClass=*)'
attrs = attrs if attrs else []
scope = scope if scope else 'sub'
pagesize = pagesize if pagesize else 500
# Initialize SimplePagedResultsControl object
page_control = SimplePagedResultsControl(
True,
size=pagesize,
cookie='' # Start without cookie
)
ret = {}
pages_count = 0
self.logger.debug(
"LdapServer - Paged search with base DN '%s', filter '%s', scope '%s', pagesize=%d and attrs=%s",
basedn,
filterstr,
scope,
pagesize,
attrs
)
while True:
pages_count += 1
self.logger.debug(
"LdapServer - Paged search: request page %d with a maximum of %d objects (current total count: %d)",
pages_count,
pagesize,
len(ret)
)
try:
res_id = self.con.search_ext(
basedn,
self.get_scope(scope),
filterstr,
attrs,
serverctrls=[page_control]
)
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error running paged search on LDAP server: {e}',
logging.CRITICAL)
return False
try:
2021-03-24 13:49:08 +01:00
rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) # pylint: disable=unused-variable
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error pulling paged search result from LDAP server: {e}',
logging.CRITICAL)
return False
# Detect and catch PagedResultsControl answer from rctrls
result_page_control = None
if rctrls:
for rctrl in rctrls:
if rctrl.controlType == SimplePagedResultsControl.controlType:
result_page_control = rctrl
break
# If PagedResultsControl answer not detected, paged serach
if not result_page_control:
self._error(
'LdapServer - Server ignores RFC2696 control, paged search can not works',
logging.CRITICAL)
return False
# Store results of this page
for obj_dn, obj_attrs in rdata:
ret[obj_dn] = obj_attrs
# If sizelimit reached, stop
if sizelimit and len(ret) >= sizelimit:
break
# If sizelimit reached, stop
if sizelimit and len(ret) >= sizelimit:
break
# If no cookie returned, we are done
if not result_page_control.cookie:
break
# Otherwise, set cookie for the next search
page_control.cookie = result_page_control.cookie
self.logger.debug("LdapServer - Paged search end: %d object(s) retreived in %d page(s) of %d object(s)", len(ret), pages_count, pagesize)
return ret
def add_object(self, dn, attrs, encode=False):
2021-05-19 19:19:57 +02:00
""" Add an object in LDAP directory """
ldif = modlist.addModlist(encode_ldap_value(attrs) if encode else attrs)
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
try:
2021-03-24 13:49:08 +01:00
self.logger.debug("LdapServer - Add %s", dn)
2021-05-19 19:19:57 +02:00
self.con.add_s(dn, ldif)
return True
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(f'LdapServer - Error adding {dn}: {e}', logging.ERROR)
return False
def update_object(self, dn, old, new, ignore_attrs=None, relax=False, encode=False):
2021-05-19 19:19:57 +02:00
""" Update an object in LDAP directory """
assert not relax or not self.v2, "Relax modification is not available on LDAP version 2"
2021-03-24 13:49:08 +01:00
ldif = modlist.modifyModlist(
encode_ldap_value(old) if encode else old,
encode_ldap_value(new) if encode else new,
2021-03-24 13:49:08 +01:00
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
if not ldif:
return True
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
try:
if relax:
self.con.modify_ext_s(dn, ldif, serverctrls=[RelaxRulesControl()])
else:
self.con.modify_s(dn, ldif)
return True
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error updating {dn} : {e}\nOld: {old}\nNew: {new}',
logging.ERROR)
return False
2021-03-24 13:49:08 +01:00
@staticmethod
def update_need(old, new, ignore_attrs=None, encode=False):
2021-05-19 19:19:57 +02:00
""" Check if an update is need on a LDAP object based on its old and new attributes values """
2021-03-24 13:49:08 +01:00
ldif = modlist.modifyModlist(
encode_ldap_value(old) if encode else old,
encode_ldap_value(new) if encode else new,
2021-03-24 13:49:08 +01:00
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
if not ldif:
return False
return True
2021-03-24 13:49:08 +01:00
@staticmethod
def get_changes(old, new, ignore_attrs=None, encode=False):
2021-05-19 19:19:57 +02:00
""" Retrieve changes (as modlist) on an object based on its old and new attributes values """
2021-03-24 13:49:08 +01:00
return modlist.modifyModlist(
encode_ldap_value(old) if encode else old,
encode_ldap_value(new) if encode else new,
2021-03-24 13:49:08 +01:00
ignore_attr_types=ignore_attrs if ignore_attrs else []
)
2021-03-24 13:49:08 +01:00
@staticmethod
def format_changes(old, new, ignore_attrs=None, prefix=None, encode=False):
2021-05-19 19:19:57 +02:00
""" Format changes (modlist) on an object based on its old and new attributes values to display/log it """
msg = []
2021-07-12 13:37:41 +02:00
prefix = prefix if prefix else ''
for (op, attr, val) in modlist.modifyModlist(
encode_ldap_value(old) if encode else old,
encode_ldap_value(new) if encode else new,
ignore_attr_types=ignore_attrs if ignore_attrs else []
):
2021-03-24 13:49:08 +01:00
if op == ldap.MOD_ADD: # pylint: disable=no-member
op = 'ADD'
2021-03-24 13:49:08 +01:00
elif op == ldap.MOD_DELETE: # pylint: disable=no-member
op = 'DELETE'
2021-03-24 13:49:08 +01:00
elif op == ldap.MOD_REPLACE: # pylint: disable=no-member
op = 'REPLACE'
else:
op = f'UNKNOWN (={op})'
if val is None and op == 'DELETE':
msg.append(f'{prefix} - {op} {attr}')
else:
msg.append(f'{prefix} - {op} {attr}: {val}')
return '\n'.join(msg)
def rename_object(self, dn, new_rdn, new_sup=None, delete_old=True):
2021-05-19 19:19:57 +02:00
""" Rename an object in LDAP directory """
# If new_rdn is a complete DN, split new RDN and new superior DN
if len(explode_dn(new_rdn)) > 1:
self.logger.debug(
"LdapServer - Rename with a full new DN detected (%s): split new RDN and new superior DN",
new_rdn
)
assert new_sup is None, "You can't provide a complete DN as new_rdn and also provide new_sup parameter"
new_dn_parts = explode_dn(new_rdn)
new_sup = ','.join(new_dn_parts[1:])
new_rdn = new_dn_parts[0]
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
try:
self.logger.debug(
"LdapServer - Rename %s in %s (new superior: %s, delete old: %s)",
dn,
new_rdn,
"same" if new_sup is None else new_sup,
delete_old
)
self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old)
return True
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error renaming {dn} in {new_rdn} '
f'(new superior: {"same" if new_sup is None else new_sup}, '
f'delete old: {delete_old}): {e}',
logging.ERROR
)
return False
2021-03-24 13:49:08 +01:00
def drop_object(self, dn):
2021-05-19 19:19:57 +02:00
""" Drop an object in LDAP directory """
2021-11-03 17:41:15 +01:00
assert self.con or self.connect()
try:
2021-03-24 13:49:08 +01:00
self.logger.debug("LdapServer - Delete %s", dn)
self.con.delete_s(dn)
return True
2021-03-24 13:49:08 +01:00
except ldap.LDAPError as e: # pylint: disable=no-member
self._error(
f'LdapServer - Error deleting {dn}: {e}', logging.ERROR)
return False
2021-03-24 13:49:08 +01:00
@staticmethod
def get_dn(obj):
2021-05-19 19:19:57 +02:00
""" Retreive an on object DN from its entry in LDAP search result """
return obj[0][0]
2021-03-24 13:49:08 +01:00
@staticmethod
def get_attr(obj, attr, all_values=None, default=None, decode=False):
2021-05-19 19:19:57 +02:00
""" Retreive an on object attribute value(s) from the object entry in LDAP search result """
if attr not in obj:
for k in obj:
if k.lower() == attr.lower():
attr = k
break
if all_values:
if attr in obj:
return decode_ldap_value(obj[attr]) if decode else obj[attr]
2021-03-24 13:49:08 +01:00
return default or []
if attr in obj:
return decode_ldap_value(obj[attr][0]) if decode else obj[attr][0]
2021-03-24 13:49:08 +01:00
return default
2021-05-19 19:19:57 +02:00
class LdapServerException(BaseException):
2021-05-19 19:19:57 +02:00
""" Generic exception raised by LdapServer """
def __init__(self, msg):
BaseException.__init__(self, msg)
2019-06-13 19:15:13 +02:00
2021-06-02 18:59:09 +02:00
class LdapClientException(LdapServerException):
""" Generic exception raised by LdapServer """
def __init__(self, msg):
LdapServerException.__init__(self, msg)
class LdapClient:
""" LDAP Client (based on python-mylib.LdapServer) """
2021-11-03 17:41:15 +01:00
_options = {}
_config = None
_config_section = None
# Connection
_conn = None
2021-06-02 18:59:09 +02:00
# Cache objects
_cached_objects = None
2021-06-02 18:59:09 +02:00
2021-11-03 17:41:15 +01:00
def __init__(self, options=None, options_prefix=None, config=None, config_section=None, initialize=False):
self._options = options if options else {}
self._options_prefix = options_prefix if options_prefix else 'ldap_'
self._config = config if config else None
self._config_section = config_section if config_section else 'ldap'
self._cached_objects = {}
2021-11-03 17:41:15 +01:00
if initialize:
self.initialize()
2021-06-02 18:59:09 +02:00
def _get_option(self, option, default=None, required=False):
2021-11-03 17:41:15 +01:00
""" Retreive option value """
if self._options and hasattr(self._options, self._options_prefix + option):
return getattr(self._options, self._options_prefix + option)
if self._config and self._config.defined(self._config_section, option):
return self._config.get(self._config_section, option)
assert not required, f'Options {option} not defined'
2021-11-03 17:41:15 +01:00
return default
@property
def _just_try(self):
""" Check if just-try mode is enabled """
return self._get_option(
'just_try', default=(
self._config.get_option('just_try') if self._config
else False
)
)
2021-11-03 17:41:15 +01:00
def configure(self, comment=None, **kwargs):
""" Configure options on registered mylib.Config object """
assert self._config, "mylib.Config object not registered. Must be passed to __init__ as config keyword argument."
2021-11-23 13:08:44 +01:00
# Load configuration option types only here to avoid global
# dependency of ldap module with config one.
# pylint: disable=import-outside-toplevel
from mylib.config import BooleanOption, StringOption, PasswordOption
2021-11-03 17:41:15 +01:00
section = self._config.add_section(
self._config_section,
comment=comment if comment else 'LDAP connection',
loaded_callback=self.initialize, **kwargs)
section.add_option(
StringOption, 'uri', default='ldap://localhost',
comment='LDAP server URI')
section.add_option(
StringOption, 'binddn', comment='LDAP Bind DN')
section.add_option(
PasswordOption, 'bindpwd',
comment='LDAP Bind password (set to "keyring" to use XDG keyring)',
username_option='binddn', keyring_value='keyring')
section.add_option(
BooleanOption, 'checkcert', default=True,
comment='Check LDAP certificate')
2021-11-03 17:41:15 +01:00
return section
def initialize(self, loaded_config=None):
""" Initialize LDAP connection """
if loaded_config:
self.config = loaded_config
uri = self._get_option('uri', required=True)
binddn = self._get_option('binddn')
2021-11-03 17:41:15 +01:00
log.info("Connect to LDAP server %s as %s", uri, binddn if binddn else 'annonymous')
self._conn = LdapServer(
uri, dn=binddn, pwd=self._get_option('bindpwd'),
checkCert=self._get_option('checkcert'), raiseOnError=True
2021-11-03 17:41:15 +01:00
)
# Reset cache
self._cached_objects = {}
2021-11-03 17:41:15 +01:00
return self._conn.connect()
def decode(self, value):
""" Decode LDAP attribute value """
2021-06-02 18:59:09 +02:00
if isinstance(value, list):
2021-11-03 17:41:15 +01:00
return [self.decode(v) for v in value]
2021-06-02 18:59:09 +02:00
if isinstance(value, str):
return value
2021-11-03 17:41:15 +01:00
return value.decode(
self._get_option('encoding', default=DEFAULT_ENCODING),
self._get_option('encoding_error_policy', default='ignore')
2021-11-03 17:41:15 +01:00
)
2021-06-02 18:59:09 +02:00
2021-11-03 17:41:15 +01:00
def encode(self, value):
""" Encode LDAP attribute value """
2021-06-02 18:59:09 +02:00
if isinstance(value, list):
2021-11-03 17:41:15 +01:00
return [self.encode(v) for v in value]
2021-06-02 18:59:09 +02:00
if isinstance(value, bytes):
return value
return value.encode(self._get_option('encoding', default=DEFAULT_ENCODING))
2021-06-02 18:59:09 +02:00
2021-11-03 17:41:15 +01:00
def _get_obj(self, dn, attrs):
"""
Build and return LDAP object as dict
:param dn: The object DN
:param attrs: The object attributes as return by python-ldap search
"""
2021-06-02 18:59:09 +02:00
obj = dict(dn=dn)
for attr in attrs:
obj[attr] = [self.decode(v) for v in self._conn.get_attr(attrs, attr, all_values=True)]
2021-06-02 18:59:09 +02:00
return obj
@staticmethod
def get_attr(obj, attr, default="", all_values=False):
2021-11-03 17:41:15 +01:00
"""
Get LDAP object attribute value(s)
:param obj: The LDAP object as returned by get_object()/get_objects
:param attr: The attribute name
:param all_values: If True, all values of the attribute will be
returned instead of the first value only
(optinal, default: False)
"""
if attr not in obj:
for k in obj:
if k.lower() == attr.lower():
attr = k
break
2021-06-02 18:59:09 +02:00
vals = obj.get(attr, [])
if vals:
return vals if all_values else vals[0]
return default if default or not all_values else []
def get_objects(self, name, filterstr, basedn, attrs, key_attr=None, warn=True):
2021-11-03 17:41:15 +01:00
"""
Retrieve objects from LDAP
:param name: The object type name
:param filterstr: The LDAP filter to use to search objects on LDAP directory
:param basedn: The base DN of the search
:param attrs: The list of attribute names to retreive
:param key_attr: The attribute name or 'dn' to use as key in result
(optional, if leave to None, the result will be a list)
:param warn: If True, a warning message will be logged if no object is found
in LDAP directory (otherwise, it will be just a debug message)
(optional, default: True)
"""
2021-06-02 18:59:09 +02:00
if name in self._cached_objects:
log.debug('Retreived %s objects from cache', name)
else:
2021-11-03 17:41:15 +01:00
assert self._conn or self.initialize()
2021-06-02 18:59:09 +02:00
log.debug('Looking for LDAP %s with (filter="%s" / basedn="%s")', name, filterstr, basedn)
2021-11-03 17:41:15 +01:00
ldap_data = self._conn.search(
2021-06-02 18:59:09 +02:00
basedn=basedn,
filterstr=filterstr,
attrs=attrs
)
if not ldap_data:
if warn:
log.warning('No %s found in LDAP', name)
else:
log.debug('No %s found in LDAP', name)
return {}
objects = {}
for obj_dn, obj_attrs in ldap_data.items():
2021-11-03 17:41:15 +01:00
objects[obj_dn] = self._get_obj(obj_dn, obj_attrs)
2021-06-02 18:59:09 +02:00
self._cached_objects[name] = objects
if not key_attr or key_attr == 'dn':
return self._cached_objects[name]
return dict(
(self.get_attr(self._cached_objects[name][dn], key_attr), self._cached_objects[name][dn])
for dn in self._cached_objects[name]
)
def get_object(self, type_name, object_name, filterstr, basedn, attrs, warn=True):
2021-11-03 17:41:15 +01:00
"""
Retrieve an object from LDAP specified using LDAP search parameters
Only one object is excepted to be returned by the LDAP search, otherwise, one
LdapClientException will be raised.
:param type_name: The object type name
:param object_name: The object name (only use in log messages)
:param filterstr: The LDAP filter to use to search the object on LDAP directory
:param basedn: The base DN of the search
:param attrs: The list of attribute names to retreive
:param warn: If True, a warning message will be logged if no object is found
in LDAP directory (otherwise, it will be just a debug message)
(optional, default: True)
"""
assert self._conn or self.initialize()
2021-06-02 18:59:09 +02:00
log.debug('Looking for LDAP %s "%s" with (filter="%s" / basedn="%s")', type_name, object_name, filterstr, basedn)
2021-11-03 17:41:15 +01:00
ldap_data = self._conn.search(
2021-06-02 18:59:09 +02:00
basedn=basedn, filterstr=filterstr,
attrs=attrs
)
if not ldap_data:
if warn:
log.warning('No %s "%s" found in LDAP', type_name, object_name)
else:
log.debug('No %s "%s" found in LDAP', type_name, object_name)
return None
if len(ldap_data) > 1:
raise LdapClientException(
f'More than one {type_name} "{object_name}": {" / ".join(ldap_data.keys())}')
2021-06-02 18:59:09 +02:00
dn = next(iter(ldap_data))
2021-11-03 17:41:15 +01:00
return self._get_obj(dn, ldap_data[dn])
2021-06-02 18:59:09 +02:00
def get_object_by_dn(self, type_name, dn, populate_cache_method=None, warn=True):
2021-11-03 17:41:15 +01:00
"""
Retrieve an LDAP object specified by its DN from cache
:param type_name: The object type name
:param dn: The object DN
:param populate_cache_method: The method to use is cache of LDAP object type
is not already populated (optional, default,
False is returned)
:param warn: If True, a warning message will be logged if object is not found
in cache (otherwise, it will be just a debug message)
(optional, default: True)
"""
2021-06-02 18:59:09 +02:00
if type_name not in self._cached_objects:
if not populate_cache_method:
return False
populate_cache_method()
if type_name not in self._cached_objects:
if warn:
log.warning('No %s found in LDAP', type_name)
else:
log.debug('No %s found in LDAP', type_name)
return None
2021-06-02 18:59:09 +02:00
if dn not in self._cached_objects[type_name]:
if warn:
log.warning('No %s found with DN "%s"', type_name, dn)
else:
log.debug('No %s found with DN "%s"', type_name, dn)
return None
return self._cached_objects[type_name][dn]
@classmethod
def object_attr_mached(cls, obj, attr, value, case_sensitive=False):
2021-11-03 17:41:15 +01:00
"""
Determine if object's attribute matched with specified value
:param obj: The LDAP object (as returned by get_object/get_objects)
:param attr: The attribute name
:param value: The value for the match test
:param case_sensitive: If True, the match test will be case-sensitive
(optional, default: False)
"""
2021-06-02 18:59:09 +02:00
if case_sensitive:
return value in cls.get_attr(obj, attr, all_values=True)
return value.lower() in [v.lower() for v in cls.get_attr(obj, attr, all_values=True)]
def get_object_by_attr(self, type_name, attr, value, populate_cache_method=None, case_sensitive=False, warn=True):
2021-11-03 17:41:15 +01:00
"""
Retrieve an LDAP object specified by one of its attribute
:param type_name: The object type name
:param attr: The attribute name
:param value: The value for the match test
:param populate_cache_method: The method to use is cache of LDAP object type
is not already populated (optional, default,
False is returned)
:param case_sensitive: If True, the match test will be case-sensitive
(optional, default: False)
:param warn: If True, a warning message will be logged if object is not found
in cache (otherwise, it will be just a debug message)
(optional, default: True)
"""
2021-06-02 18:59:09 +02:00
if type_name not in self._cached_objects:
if not populate_cache_method:
return False
populate_cache_method()
if type_name not in self._cached_objects:
if warn:
log.warning('No %s found in LDAP', type_name)
else:
log.debug('No %s found in LDAP', type_name)
return None
2021-06-02 18:59:09 +02:00
matched = dict(
(dn, obj)
for dn, obj in self._cached_objects[type_name].items()
if self.object_attr_mached(obj, attr, value, case_sensitive=case_sensitive)
)
if not matched:
if warn:
log.warning('No %s found with %s="%s"', type_name, attr, value)
else:
log.debug('No %s found with %s="%s"', type_name, attr, value)
return None
if len(matched) > 1:
raise LdapClientException(
f'More than one {type_name} with {attr}="{value}" found: '
f'{" / ".join(matched.keys())}')
2021-06-02 18:59:09 +02:00
dn = next(iter(matched))
return matched[dn]
def get_changes(self, ldap_obj, attrs, protected_attrs=None):
"""
Retrieve changes on a LDAP object
:param ldap_obj: The original LDAP object
:param attrs: The new LDAP object attributes values
:param protected_attrs: Optional list of protected attributes
"""
old = {}
new = {}
protected_attrs = [a.lower() for a in protected_attrs or []]
2021-06-02 18:59:09 +02:00
protected_attrs.append('dn')
# New/updated attributes
for attr in attrs:
if protected_attrs and attr.lower() in protected_attrs:
continue
if attr in ldap_obj and ldap_obj[attr]:
if sorted(ldap_obj[attr]) == sorted(attrs[attr]):
continue
old[attr] = self.encode(ldap_obj[attr])
new[attr] = self.encode(attrs[attr])
# Deleted attributes
for attr in ldap_obj:
if (not protected_attrs or attr.lower() not in protected_attrs) and ldap_obj[attr] and attr not in attrs:
old[attr] = self.encode(ldap_obj[attr])
if old == new:
return None
return (old, new)
2021-07-12 13:37:41 +02:00
def format_changes(self, changes, protected_attrs=None, prefix=None):
2021-11-03 17:41:15 +01:00
"""
Format changes as string
:param changes: The changes as returned by get_changes
:param protected_attrs: Optional list of protected attributes
:param prefix: Optional prefix string for each line of the returned string
"""
assert self._conn or self.initialize()
return self._conn.format_changes(
2021-07-12 13:37:41 +02:00
changes[0], changes[1],
ignore_attrs=protected_attrs, prefix=prefix
)
def update_need(self, changes, protected_attrs=None):
"""
Check if update is need
:param changes: The changes as returned by get_changes
"""
if changes is None:
return False
assert self._conn or self.initialize()
return self._conn.update_need(
changes[0], changes[1],
ignore_attrs=protected_attrs
)
2021-06-02 18:59:09 +02:00
def add_object(self, dn, attrs):
"""
Add an object
:param dn: The LDAP object DN
:param attrs: The LDAP object attributes (as dict)
"""
attrs = dict(
(attr, self.encode(values))
for attr, values in attrs.items()
if attr != 'dn'
2021-06-02 18:59:09 +02:00
)
try:
if self._just_try:
2021-06-02 18:59:09 +02:00
log.debug('Just-try mode : do not really add object in LDAP')
return True
2021-11-03 17:41:15 +01:00
assert self._conn or self.initialize()
return self._conn.add_object(dn, attrs)
2021-06-02 18:59:09 +02:00
except LdapServerException:
log.error(
"An error occurred adding object %s in LDAP:\n%s\n",
dn, pretty_format_dict(attrs), exc_info=True
)
return False
def update_object(self, ldap_obj, changes, protected_attrs=None, rdn_attr=None):
"""
Update an object
:param ldap_obj: The original LDAP object
:param changes: The changes to make on LDAP object (as formated by get_changes() method)
:param protected_attrs: An optional list of protected attributes
:param rdn_attr: The LDAP object RDN attribute (to detect renaming, default: auto-detected)
"""
assert isinstance(changes, (list, tuple)) and len(changes) == 2 and isinstance(changes[0], dict) and isinstance(changes[1], dict), f'changes parameter must be a result of get_changes() method ({type(changes)} given)'
# In case of RDN change, we need to modify passed changes, copy it to make it unchanged in
# this case
_changes = copy.deepcopy(changes)
2021-06-02 18:59:09 +02:00
if not rdn_attr:
rdn_attr = ldap_obj['dn'].split('=')[0]
log.debug('Auto-detected RDN attribute from DN: %s => %s', ldap_obj['dn'], rdn_attr)
old_rdn_values = self.get_attr(_changes[0], rdn_attr, all_values=True)
new_rdn_values = self.get_attr(_changes[1], rdn_attr, all_values=True)
2021-06-02 18:59:09 +02:00
if old_rdn_values or new_rdn_values:
if not new_rdn_values:
log.error(
"%s : Attribute %s can't be deleted because it's used as RDN.",
ldap_obj['dn'], rdn_attr
)
return False
log.debug(
'%s: Changes detected on %s RDN attribute: must rename object before updating it',
ldap_obj['dn'], rdn_attr
)
# Compute new object DN
dn_parts = explode_dn(self.decode(ldap_obj['dn']))
2021-06-02 18:59:09 +02:00
basedn = ','.join(dn_parts[1:])
new_rdn = f'{rdn_attr}={escape_dn_chars(self.decode(new_rdn_values[0]))}'
new_dn = f'{new_rdn},{basedn}'
2021-06-02 18:59:09 +02:00
# Rename object
log.debug('%s: Rename to %s', ldap_obj['dn'], new_dn)
if not self.move_object(ldap_obj, new_dn):
2021-06-02 18:59:09 +02:00
return False
# Remove RDN in changes list
for attr in list(_changes[0].keys()):
2021-06-02 18:59:09 +02:00
if attr.lower() == rdn_attr.lower():
del _changes[0][attr]
for attr in list(_changes[1].keys()):
2021-06-02 18:59:09 +02:00
if attr.lower() == rdn_attr.lower():
del _changes[1][attr]
2021-06-02 18:59:09 +02:00
# Check that there are other changes
if not _changes[0] and not _changes[1]:
2021-06-02 18:59:09 +02:00
log.debug('%s: No other change after renaming', new_dn)
return True
# Otherwise, update object DN
ldap_obj['dn'] = new_dn
else:
log.debug('%s: No change detected on RDN attibute %s', ldap_obj['dn'], rdn_attr)
try:
if self._just_try:
2021-06-02 18:59:09 +02:00
log.debug('Just-try mode : do not really update object in LDAP')
return True
2021-11-03 17:41:15 +01:00
assert self._conn or self.initialize()
return self._conn.update_object(
2021-06-02 18:59:09 +02:00
ldap_obj['dn'],
_changes[0],
_changes[1],
2021-06-02 18:59:09 +02:00
ignore_attrs=protected_attrs
)
except LdapServerException:
log.error(
"An error occurred updating object %s in LDAP:\n%s\n -> \n%s\n\n",
ldap_obj['dn'], pretty_format_dict(_changes[0]), pretty_format_dict(_changes[1]),
2021-06-02 18:59:09 +02:00
exc_info=True
)
return False
def move_object(self, ldap_obj, new_dn_or_rdn):
"""
Move/rename an object
:param ldap_obj: The original LDAP object
:param new_dn_or_rdn: The new LDAP object's DN (or RDN)
"""
try:
if self._just_try:
2021-06-02 18:59:09 +02:00
log.debug('Just-try mode : do not really move object in LDAP')
return True
2021-11-03 17:41:15 +01:00
assert self._conn or self.initialize()
return self._conn.rename_object(ldap_obj['dn'], new_dn_or_rdn)
2021-06-02 18:59:09 +02:00
except LdapServerException:
log.error(
"An error occurred moving object %s in LDAP (destination: %s)",
ldap_obj['dn'], new_dn_or_rdn, exc_info=True
)
return False
def drop_object(self, ldap_obj):
"""
Drop/delete an object
:param ldap_obj: The original LDAP object to delete/drop
"""
try:
if self._just_try:
2021-06-02 18:59:09 +02:00
log.debug('Just-try mode : do not really drop object in LDAP')
return True
2021-11-03 17:41:15 +01:00
assert self._conn or self.initialize()
return self._conn.drop_object(ldap_obj['dn'])
2021-06-02 18:59:09 +02:00
except LdapServerException:
log.error(
"An error occurred removing object %s in LDAP",
ldap_obj['dn'], exc_info=True
)
return False
2019-06-13 19:15:13 +02:00
#
2021-05-19 19:19:57 +02:00
# LDAP date string helpers
2019-06-13 19:15:13 +02:00
#
2021-05-19 19:19:57 +02:00
def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None):
"""
Convert LDAP date string to datetime.datetime object
:param value: The LDAP date string to convert
:param to_timezone: If specified, the return datetime will be converted to this
specific timezone (optional, default : timezone of the LDAP date string)
:param default_timezone: The timezone used if LDAP date string does not specified
the timezone (optional, default : server local timezone)
:param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP)
"""
assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), f'to_timezone must be None, a datetime.tzinfo object or a string (not {type(to_timezone)})'
assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), f'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not {type(default_timezone)})'
date = dateutil.parser.parse(value, dayfirst=False)
if not date.tzinfo:
if naive:
return date
if not default_timezone:
default_timezone = pytz.utc
elif default_timezone == 'local':
default_timezone = dateutil.tz.tzlocal()
elif isinstance(default_timezone, str):
default_timezone = pytz.timezone(default_timezone)
if isinstance(default_timezone, pytz.tzinfo.DstTzInfo):
date = default_timezone.localize(date)
elif isinstance(default_timezone, datetime.tzinfo):
date = date.replace(tzinfo=default_timezone)
else:
raise Exception("It's not supposed to happen!")
elif naive:
return date.replace(tzinfo=None)
if to_timezone:
if to_timezone == 'local':
to_timezone = dateutil.tz.tzlocal()
elif isinstance(to_timezone, str):
to_timezone = pytz.timezone(to_timezone)
return date.astimezone(to_timezone)
return date
2019-06-13 19:15:13 +02:00
2021-05-19 19:19:57 +02:00
def parse_date(value, to_timezone=None, default_timezone=None, naive=True):
"""
Convert LDAP date string to datetime.date object
:param value: The LDAP date string to convert
:param to_timezone: If specified, the return datetime will be converted to this
specific timezone (optional, default : timezone of the LDAP date string)
:param default_timezone: The timezone used if LDAP date string does not specified
the timezone (optional, default : server local timezone)
:param naive: Use naive datetime : do not handle timezone conversion from LDAP
"""
return parse_datetime(value, to_timezone, default_timezone, naive).date()
2019-06-13 19:15:13 +02:00
2021-05-19 19:19:57 +02:00
def format_datetime(value, from_timezone=None, to_timezone=None, naive=None):
"""
Convert datetime.datetime object to LDAP date string
:param value: The datetime.datetime object to convert
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
(optional, default : server local timezone)
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
:param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion)
"""
assert isinstance(value, datetime.datetime), f'First parameter must be an datetime.datetime object (not {type(value)})'
assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), f'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not {type(from_timezone)})'
assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), f'to_timezone must be None, a datetime.tzinfo object or a string (not {type(to_timezone)})'
if not value.tzinfo and not naive:
if not from_timezone or from_timezone == 'local':
from_timezone = dateutil.tz.tzlocal()
elif isinstance(from_timezone, str):
from_timezone = pytz.timezone(from_timezone)
if isinstance(from_timezone, pytz.tzinfo.DstTzInfo):
from_value = from_timezone.localize(value)
elif isinstance(from_timezone, datetime.tzinfo):
from_value = value.replace(tzinfo=from_timezone)
else:
raise Exception("It's not supposed to happen!")
elif naive:
from_value = value.replace(tzinfo=pytz.utc)
else:
from_value = copy.deepcopy(value)
if not to_timezone:
to_timezone = pytz.utc
elif to_timezone == 'local':
to_timezone = dateutil.tz.tzlocal()
elif isinstance(to_timezone, str):
to_timezone = pytz.timezone(to_timezone)
to_value = from_value.astimezone(to_timezone) if not naive else from_value
datestring = to_value.strftime('%Y%m%d%H%M%S%z')
if datestring.endswith('+0000'):
datestring = datestring.replace('+0000', 'Z')
return datestring
2019-06-13 19:15:13 +02:00
2021-05-19 19:19:57 +02:00
def format_date(value, from_timezone=None, to_timezone=None, naive=True):
"""
Convert datetime.date object to LDAP date string
:param value: The datetime.date object to convert
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
(optional, default : server local timezone)
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
:param naive: Use naive datetime : do not handle timezone conversion before formating
and return datetime as UTC (because LDAP required a timezone)
"""
assert isinstance(value, datetime.date), f'First parameter must be an datetime.date object (not {type(value)})'
return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive)