#!/usr/bin/env python3 import copy import datetime import logging import dateutil.parser import dateutil.tz import ldap from ldap.controls import SimplePagedResultsControl from ldap.controls.simple import RelaxRulesControl import ldap.modlist as modlist import pytz class LdapServer(object): # pylint: disable=useless-object-inheritance uri = None dn = None pwd = None v2 = None con = 0 def __init__(self,uri,dn=None,pwd=None,v2=None,raiseOnError=False, logger=False): self.uri = uri self.dn = dn self.pwd = pwd self.raiseOnError = raiseOnError if v2: self.v2=True if logger: self.logger = logger else: self.logger = logging.getLogger() def _error(self,error,level=logging.WARNING): if self.raiseOnError: raise LdapServerException(error) self.logger.log(level, error) def connect(self): if self.con == 0: try: con = ldap.initialize(self.uri) if self.v2: con.protocol_version = ldap.VERSION2 # pylint: disable=no-member else: con.protocol_version = ldap.VERSION3 # pylint: disable=no-member if self.dn: con.simple_bind_s(self.dn,self.pwd) elif self.uri.startswith('ldapi://'): con.sasl_interactive_bind_s("", ldap.sasl.external()) self.con = con return True except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL) return False return True @staticmethod def get_scope(scope): if scope == 'base': return ldap.SCOPE_BASE # pylint: disable=no-member if scope == 'one': return ldap.SCOPE_ONELEVEL # pylint: disable=no-member if scope == 'sub': return ldap.SCOPE_SUBTREE # pylint: disable=no-member raise Exception("Unknown LDAP scope '%s'" % scope) def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None): res_id = self.con.search( basedn, self.get_scope(scope if scope else 'sub'), filterstr if filterstr else '(objectClass=*)', attrs if attrs else [] ) ret = {} c = 0 while True: res_type, res_data = self.con.result(res_id,0) if res_data == [] or (sizelimit and c > sizelimit): break if res_type == ldap.RES_SEARCH_ENTRY: # pylint: disable=no-member ret[res_data[0][0]] = res_data[0][1] c += 1 return ret def get_object(self, dn, filterstr=None, attrs=None): result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs) return result[dn] if dn in result else None def paged_search(self, basedn, filterstr, attrs, scope='sub', pagesize=500): assert not self.v2, "Paged search is not available on LDAP version 2" # Initialize SimplePagedResultsControl object page_control = SimplePagedResultsControl( True, size=pagesize, cookie='' # Start without cookie ) ret = {} pages_count = 0 self.logger.debug( "LdapServer - Paged search with base DN '%s', filter '%s', scope '%s', pagesize=%d and attrs=%s", basedn, filterstr, scope, pagesize, attrs ) while True: pages_count += 1 self.logger.debug( "LdapServer - Paged search: request page %d with a maximum of %d objects (current total count: %d)", pages_count, pagesize, len(ret) ) try: res_id = self.con.search_ext( basedn, self.get_scope(scope), filterstr, attrs, serverctrls=[page_control] ) except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL) return False try: rtype, rdata, rmsgid, rctrls = self.con.result3(res_id) # pylint: disable=unused-variable except ldap.LDAPError as e: # pylint: disable=no-member self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL) return False # Detect and catch PagedResultsControl answer from rctrls result_page_control = None if rctrls: for rctrl in rctrls: if rctrl.controlType == SimplePagedResultsControl.controlType: result_page_control = rctrl break # If PagedResultsControl answer not detected, paged serach if not result_page_control: self._error('LdapServer - Server ignores RFC2696 control, paged search can not works', logging.CRITICAL) return False # Store results of this page for obj_dn, obj_attrs in rdata: ret[obj_dn] = obj_attrs # If no cookie returned, we are done if not result_page_control.cookie: break # Otherwise, set cookie for the next search page_control.cookie = result_page_control.cookie self.logger.debug("LdapServer - Paged search end: %d object(s) retreived in %d page(s) of %d object(s)", len(ret), pages_count, pagesize) return ret def add_object(self,dn,attrs): ldif = modlist.addModlist(attrs) try: self.logger.debug("LdapServer - Add %s", dn) self.con.add_s(dn,ldif) return True except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR) return False def update_object(self, dn, old, new, ignore_attrs=None, relax=False): assert not relax or not self.v2, "Relax modification is not available on LDAP version 2" ldif = modlist.modifyModlist( old, new, ignore_attr_types=ignore_attrs if ignore_attrs else [] ) if ldif == []: return True try: if relax: self.con.modify_ext_s(dn, ldif, serverctrls=[RelaxRulesControl()]) else: self.con.modify_s(dn, ldif) return True except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR) return False @staticmethod def update_need(old, new, ignore_attrs=None): ldif = modlist.modifyModlist( old, new, ignore_attr_types=ignore_attrs if ignore_attrs else [] ) if ldif == []: return False return True @staticmethod def get_changes(old, new, ignore_attrs=None): return modlist.modifyModlist( old, new, ignore_attr_types=ignore_attrs if ignore_attrs else [] ) @staticmethod def format_changes(old, new, ignore_attrs=None, prefix=None): msg = [] for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs if ignore_attrs else []): if op == ldap.MOD_ADD: # pylint: disable=no-member op = 'ADD' elif op == ldap.MOD_DELETE: # pylint: disable=no-member op = 'DELETE' elif op == ldap.MOD_REPLACE: # pylint: disable=no-member op = 'REPLACE' else: op = 'UNKNOWN (=%s)' % op if val is None and op == 'DELETE': msg.append('%s - %s %s' % (prefix if prefix else '', op, attr)) else: msg.append('%s - %s %s: %s' % (prefix, op, attr, val)) return '\n'.join(msg) def rename_object(self, dn, new_rdn, new_sup=None, delete_old=True): # If new_rdn is a complete DN, split new RDN and new superior DN if len(new_rdn.split(',')) > 1: self.logger.debug( "LdapServer - Rename with a full new DN detected (%s): split new RDN and new superior DN", new_rdn ) assert new_sup is None, "You can't provide a complete DN as new_rdn and also provide new_sup parameter" new_dn_parts = new_rdn.split(',') new_sup = ','.join(new_dn_parts[1:]) new_rdn = new_dn_parts[0] try: self.logger.debug( "LdapServer - Rename %s in %s (new superior: %s, delete old: %s)", dn, new_rdn, "same" if new_sup is None else new_sup, delete_old ) self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old) return True except ldap.LDAPError as e: # pylint: disable=no-member self._error( "LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % ( dn, new_rdn, "same" if new_sup is None else new_sup, delete_old, e ), logging.ERROR ) return False def drop_object(self, dn): try: self.logger.debug("LdapServer - Delete %s", dn) self.con.delete_s(dn) return True except ldap.LDAPError as e: # pylint: disable=no-member self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR) return False @staticmethod def get_dn(obj): return obj[0][0] @staticmethod def get_attr(obj, attr, all=None, default=None): if attr not in obj: for k in obj: if k.lower() == attr.lower(): attr = k break if all is not None: if attr in obj: return obj[attr] return default or [] if attr in obj: return obj[attr][0] return default class LdapServerException(BaseException): def __init__(self,msg): BaseException.__init__(self, msg) # # Helpers # def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None): """ Convert LDAP date string to datetime.datetime object :param value: The LDAP date string to convert :param to_timezone: If specified, the return datetime will be converted to this specific timezone (optional, default : timezone of the LDAP date string) :param default_timezone: The timezone used if LDAP date string does not specified the timezone (optional, default : server local timezone) :param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP) """ assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) assert default_timezone is None or isinstance(default_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone) date = dateutil.parser.parse(value, dayfirst=False) if not date.tzinfo: if naive: return date if not default_timezone: default_timezone = pytz.utc elif default_timezone == 'local': default_timezone = dateutil.tz.tzlocal() elif isinstance(default_timezone, str): default_timezone = pytz.timezone(default_timezone) if isinstance(default_timezone, pytz.tzinfo.DstTzInfo): date = default_timezone.localize(date) elif isinstance(default_timezone, datetime.tzinfo): date = date.replace(tzinfo=default_timezone) else: raise Exception("It's not supposed to happen!") elif naive: return date.replace(tzinfo=None) if to_timezone: if to_timezone == 'local': to_timezone = dateutil.tz.tzlocal() elif isinstance(to_timezone, str): to_timezone = pytz.timezone(to_timezone) return date.astimezone(to_timezone) return date def parse_date(value, to_timezone=None, default_timezone=None, naive=None): """ Convert LDAP date string to datetime.date object :param value: The LDAP date string to convert :param to_timezone: If specified, the return datetime will be converted to this specific timezone (optional, default : timezone of the LDAP date string) :param default_timezone: The timezone used if LDAP date string does not specified the timezone (optional, default : server local timezone) :param naive: Use naive datetime : do not handle timezone conversion from LDAP """ return parse_datetime(value, to_timezone, default_timezone, naive).date() def format_datetime(value, from_timezone=None, to_timezone=None, naive=None): """ Convert datetime.datetime object to LDAP date string :param value: The datetime.datetime object to convert :param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo) (optional, default : server local timezone) :param to_timezone: The timezone used in LDAP (optional, default : UTC) :param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion) """ assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value) assert from_timezone is None or isinstance(from_timezone, (datetime.tzinfo, pytz.tzinfo.DstTzInfo, str)), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone) assert to_timezone is None or isinstance(to_timezone, (datetime.tzinfo, str)), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone) if not value.tzinfo and not naive: if not from_timezone or from_timezone == 'local': from_timezone = dateutil.tz.tzlocal() elif isinstance(from_timezone, str): from_timezone = pytz.timezone(from_timezone) if isinstance(from_timezone, pytz.tzinfo.DstTzInfo): from_value = from_timezone.localize(value) elif isinstance(from_timezone, datetime.tzinfo): from_value = value.replace(tzinfo=from_timezone) else: raise Exception("It's not supposed to happen!") elif naive: from_value = value.replace(tzinfo=pytz.utc) else: from_value = copy.deepcopy(value) if not to_timezone: to_timezone = pytz.utc elif to_timezone == 'local': to_timezone = dateutil.tz.tzlocal() elif isinstance(to_timezone, str): to_timezone = pytz.timezone(to_timezone) to_value = from_value.astimezone(to_timezone) if not naive else from_value datestring = to_value.strftime('%Y%m%d%H%M%S%z') if datestring.endswith('+0000'): datestring = datestring.replace('+0000', 'Z') return datestring def format_date(value, from_timezone=None, to_timezone=None, naive=None): """ Convert datetime.date object to LDAP date string :param value: The datetime.date object to convert :param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo) (optional, default : server local timezone) :param to_timezone: The timezone used in LDAP (optional, default : UTC) :param naive: Use naive datetime : do not handle timezone conversion before formating and return datetime as UTC (because LDAP required a timezone) """ assert isinstance(value, datetime.date), 'First parameter must be an datetime.date object (not %s)' % type(value) return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive) # # Tests # if __name__ == '__main__': now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal()) print("Now = %s" % now) datestring_now = format_datetime(now) print("format_datetime : %s" % datestring_now) print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc)) print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local')) print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris')) print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc)) print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal())) print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local')) print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo')) print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True)) print("format_date : %s" % format_date(now)) print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc)) print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal())) print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local')) print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris')) print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc)) print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal())) print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local')) print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo')) print("format_date (naive=True) : %s" % format_date(now, naive=True)) print("parse_datetime : %s" % parse_datetime(datestring_now)) print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc)) print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local')) print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris')) print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc)) print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal())) print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local')) print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo')) print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True)) print("parse_date : %s" % parse_date(datestring_now)) print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc)) print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal())) print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local')) print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris')) print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc)) print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal())) print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local')) print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo')) print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True))