LdapServer: fix ident with 4 spaces instead of tab
This commit is contained in:
parent
35187aebdf
commit
8a62a8545d
1 changed files with 370 additions and 370 deletions
740
LdapServer.py
740
LdapServer.py
|
@ -12,428 +12,428 @@ import pytz
|
|||
|
||||
class LdapServer(object):
|
||||
|
||||
uri = None
|
||||
dn = None
|
||||
pwd = None
|
||||
v2 = None
|
||||
uri = None
|
||||
dn = None
|
||||
pwd = None
|
||||
v2 = None
|
||||
|
||||
con = 0
|
||||
con = 0
|
||||
|
||||
def __init__(self,uri,dn=None,pwd=None,v2=None,raiseOnError=False, logger=False):
|
||||
self.uri = uri
|
||||
self.dn = dn
|
||||
self.pwd = pwd
|
||||
self.raiseOnError = raiseOnError
|
||||
if v2:
|
||||
self.v2=True
|
||||
if logger:
|
||||
self.logger = logger
|
||||
else:
|
||||
self.logger = logging.getLogger()
|
||||
def __init__(self,uri,dn=None,pwd=None,v2=None,raiseOnError=False, logger=False):
|
||||
self.uri = uri
|
||||
self.dn = dn
|
||||
self.pwd = pwd
|
||||
self.raiseOnError = raiseOnError
|
||||
if v2:
|
||||
self.v2=True
|
||||
if logger:
|
||||
self.logger = logger
|
||||
else:
|
||||
self.logger = logging.getLogger()
|
||||
|
||||
def _error(self,error,level=logging.WARNING):
|
||||
if self.raiseOnError:
|
||||
raise LdapServerException(error)
|
||||
else:
|
||||
self.logger.log(level, error)
|
||||
def _error(self,error,level=logging.WARNING):
|
||||
if self.raiseOnError:
|
||||
raise LdapServerException(error)
|
||||
else:
|
||||
self.logger.log(level, error)
|
||||
|
||||
def connect(self):
|
||||
if self.con == 0:
|
||||
try:
|
||||
con = ldap.initialize(self.uri)
|
||||
if self.v2:
|
||||
con.protocol_version = ldap.VERSION2
|
||||
else:
|
||||
con.protocol_version = ldap.VERSION3
|
||||
def connect(self):
|
||||
if self.con == 0:
|
||||
try:
|
||||
con = ldap.initialize(self.uri)
|
||||
if self.v2:
|
||||
con.protocol_version = ldap.VERSION2
|
||||
else:
|
||||
con.protocol_version = ldap.VERSION3
|
||||
|
||||
if self.dn:
|
||||
con.simple_bind_s(self.dn,self.pwd)
|
||||
elif self.uri.startswith('ldapi://'):
|
||||
con.sasl_interactive_bind_s("", ldap.sasl.external())
|
||||
if self.dn:
|
||||
con.simple_bind_s(self.dn,self.pwd)
|
||||
elif self.uri.startswith('ldapi://'):
|
||||
con.sasl_interactive_bind_s("", ldap.sasl.external())
|
||||
|
||||
self.con = con
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL)
|
||||
return False
|
||||
return True
|
||||
self.con = con
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error connecting and binding to LDAP server : %s' % e,logging.CRITICAL)
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_scope(self, scope):
|
||||
if scope == 'base':
|
||||
return ldap.SCOPE_BASE
|
||||
elif scope == 'one':
|
||||
return ldap.SCOPE_ONELEVEL
|
||||
elif scope == 'sub':
|
||||
return ldap.SCOPE_SUBTREE
|
||||
raise Exception("Unknown LDAP scope '%s'" % scope)
|
||||
def get_scope(self, scope):
|
||||
if scope == 'base':
|
||||
return ldap.SCOPE_BASE
|
||||
elif scope == 'one':
|
||||
return ldap.SCOPE_ONELEVEL
|
||||
elif scope == 'sub':
|
||||
return ldap.SCOPE_SUBTREE
|
||||
raise Exception("Unknown LDAP scope '%s'" % scope)
|
||||
|
||||
def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None):
|
||||
res_id = self.con.search(
|
||||
basedn,
|
||||
self.get_scope(scope if scope else 'sub'),
|
||||
filterstr if filterstr else '(objectClass=*)',
|
||||
attrs if attrs else []
|
||||
)
|
||||
ret = {}
|
||||
c = 0
|
||||
while True:
|
||||
res_type, res_data = self.con.result(res_id,0)
|
||||
if res_data == [] or (sizelimit and c > sizelimit):
|
||||
break
|
||||
else:
|
||||
if res_type == ldap.RES_SEARCH_ENTRY:
|
||||
ret[res_data[0][0]] = res_data[0][1]
|
||||
c += 1
|
||||
return ret
|
||||
def search(self, basedn, filterstr=None, attrs=None, sizelimit=0, scope=None):
|
||||
res_id = self.con.search(
|
||||
basedn,
|
||||
self.get_scope(scope if scope else 'sub'),
|
||||
filterstr if filterstr else '(objectClass=*)',
|
||||
attrs if attrs else []
|
||||
)
|
||||
ret = {}
|
||||
c = 0
|
||||
while True:
|
||||
res_type, res_data = self.con.result(res_id,0)
|
||||
if res_data == [] or (sizelimit and c > sizelimit):
|
||||
break
|
||||
else:
|
||||
if res_type == ldap.RES_SEARCH_ENTRY:
|
||||
ret[res_data[0][0]] = res_data[0][1]
|
||||
c += 1
|
||||
return ret
|
||||
|
||||
def get_object(self, dn, filterstr=None, attrs=None):
|
||||
result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs)
|
||||
return result[dn] if dn in result else None
|
||||
def get_object(self, dn, filterstr=None, attrs=None):
|
||||
result = self.search(dn, filterstr=filterstr, scope='base', attrs=attrs)
|
||||
return result[dn] if dn in result else None
|
||||
|
||||
def paged_search(self, basedn, filterstr, attrs, scope='sub', pagesize=500):
|
||||
# Initialize SimplePagedResultsControl object
|
||||
page_control = SimplePagedResultsControl(
|
||||
True,
|
||||
size=pagesize,
|
||||
cookie='' # Start without cookie
|
||||
)
|
||||
ret = {}
|
||||
pages_count = 0
|
||||
self.logger.debug(
|
||||
"LdapServer - Paged search with base DN '%s', filter '%s', scope '%s', pagesize=%d and attrs=%s",
|
||||
basedn,
|
||||
filterstr,
|
||||
scope,
|
||||
pagesize,
|
||||
attrs
|
||||
)
|
||||
while True:
|
||||
pages_count += 1
|
||||
self.logger.debug(
|
||||
"LdapServer - Paged search: request page %d with a maximum of %d objects (current total count: %d)",
|
||||
pages_count,
|
||||
pagesize,
|
||||
len(ret)
|
||||
)
|
||||
try:
|
||||
res_id = self.con.search_ext(
|
||||
basedn,
|
||||
self.get_scope(scope),
|
||||
filterstr,
|
||||
attrs,
|
||||
serverctrls=[page_control]
|
||||
)
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL)
|
||||
return False
|
||||
try:
|
||||
rtype, rdata, rmsgid, rctrls = self.con.result3(res_id)
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL)
|
||||
return False
|
||||
def paged_search(self, basedn, filterstr, attrs, scope='sub', pagesize=500):
|
||||
# Initialize SimplePagedResultsControl object
|
||||
page_control = SimplePagedResultsControl(
|
||||
True,
|
||||
size=pagesize,
|
||||
cookie='' # Start without cookie
|
||||
)
|
||||
ret = {}
|
||||
pages_count = 0
|
||||
self.logger.debug(
|
||||
"LdapServer - Paged search with base DN '%s', filter '%s', scope '%s', pagesize=%d and attrs=%s",
|
||||
basedn,
|
||||
filterstr,
|
||||
scope,
|
||||
pagesize,
|
||||
attrs
|
||||
)
|
||||
while True:
|
||||
pages_count += 1
|
||||
self.logger.debug(
|
||||
"LdapServer - Paged search: request page %d with a maximum of %d objects (current total count: %d)",
|
||||
pages_count,
|
||||
pagesize,
|
||||
len(ret)
|
||||
)
|
||||
try:
|
||||
res_id = self.con.search_ext(
|
||||
basedn,
|
||||
self.get_scope(scope),
|
||||
filterstr,
|
||||
attrs,
|
||||
serverctrls=[page_control]
|
||||
)
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error running paged search on LDAP server: %s' % e, logging.CRITICAL)
|
||||
return False
|
||||
try:
|
||||
rtype, rdata, rmsgid, rctrls = self.con.result3(res_id)
|
||||
except ldap.LDAPError as e:
|
||||
self._error('LdapServer - Error pulling paged search result from LDAP server: %s' % e, logging.CRITICAL)
|
||||
return False
|
||||
|
||||
# Detect and catch PagedResultsControl answer from rctrls
|
||||
result_page_control = None
|
||||
if rctrls:
|
||||
for rctrl in rctrls:
|
||||
if rctrl.controlType == SimplePagedResultsControl.controlType:
|
||||
result_page_control = rctrl
|
||||
break
|
||||
# Detect and catch PagedResultsControl answer from rctrls
|
||||
result_page_control = None
|
||||
if rctrls:
|
||||
for rctrl in rctrls:
|
||||
if rctrl.controlType == SimplePagedResultsControl.controlType:
|
||||
result_page_control = rctrl
|
||||
break
|
||||
|
||||
# If PagedResultsControl answer not detected, paged serach
|
||||
if not result_page_control:
|
||||
self._error('LdapServer - Server ignores RFC2696 control, paged search can not works', logging.CRITICAL)
|
||||
return False
|
||||
# If PagedResultsControl answer not detected, paged serach
|
||||
if not result_page_control:
|
||||
self._error('LdapServer - Server ignores RFC2696 control, paged search can not works', logging.CRITICAL)
|
||||
return False
|
||||
|
||||
# Store results of this page
|
||||
for obj_dn, obj_attrs in rdata:
|
||||
ret[obj_dn] = obj_attrs
|
||||
# Store results of this page
|
||||
for obj_dn, obj_attrs in rdata:
|
||||
ret[obj_dn] = obj_attrs
|
||||
|
||||
# If no cookie returned, we are done
|
||||
if not result_page_control.cookie:
|
||||
break
|
||||
# If no cookie returned, we are done
|
||||
if not result_page_control.cookie:
|
||||
break
|
||||
|
||||
# Otherwise, set cookie for the next search
|
||||
page_control.cookie = result_page_control.cookie
|
||||
# Otherwise, set cookie for the next search
|
||||
page_control.cookie = result_page_control.cookie
|
||||
|
||||
self.logger.debug("LdapServer - Paged search end: %d object(s) retreived in %d page(s) of %d object(s)", len(ret), pages_count, pagesize)
|
||||
return ret
|
||||
self.logger.debug("LdapServer - Paged search end: %d object(s) retreived in %d page(s) of %d object(s)", len(ret), pages_count, pagesize)
|
||||
return ret
|
||||
|
||||
def add_object(self,dn,attrs):
|
||||
ldif = modlist.addModlist(attrs)
|
||||
try:
|
||||
self.logger.debug("LdapServer - Add %s" % dn)
|
||||
self.con.add_s(dn,ldif)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR)
|
||||
def add_object(self,dn,attrs):
|
||||
ldif = modlist.addModlist(attrs)
|
||||
try:
|
||||
self.logger.debug("LdapServer - Add %s" % dn)
|
||||
self.con.add_s(dn,ldif)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error adding %s : %s" % (dn,e), logging.ERROR)
|
||||
|
||||
return False
|
||||
return False
|
||||
|
||||
def update_object(self, dn, old, new, ignore_attrs=[]):
|
||||
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
if ldif == []:
|
||||
return True
|
||||
try:
|
||||
self.con.modify_s(dn,ldif)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR)
|
||||
return False
|
||||
def update_object(self, dn, old, new, ignore_attrs=[]):
|
||||
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
if ldif == []:
|
||||
return True
|
||||
try:
|
||||
self.con.modify_s(dn,ldif)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error updating %s : %s\nOld : %s\nNew : %s" % (dn, e, old, new), logging.ERROR)
|
||||
return False
|
||||
|
||||
def update_need(self, old, new, ignore_attrs=[]):
|
||||
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
if ldif == []:
|
||||
return False
|
||||
return True
|
||||
def update_need(self, old, new, ignore_attrs=[]):
|
||||
ldif = modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
if ldif == []:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_changes(self, old, new, ignore_attrs=[]):
|
||||
return modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
def get_changes(self, old, new, ignore_attrs=[]):
|
||||
return modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs)
|
||||
|
||||
def format_changes(self, old, new, ignore_attrs=[], prefix=''):
|
||||
msg = []
|
||||
for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs):
|
||||
if op == ldap.MOD_ADD:
|
||||
op = 'ADD'
|
||||
elif op == ldap.MOD_DELETE:
|
||||
op = 'DELETE'
|
||||
elif op == ldap.MOD_REPLACE:
|
||||
op = 'REPLACE'
|
||||
else:
|
||||
op = 'UNKNOWN (=%s)' % op
|
||||
if val is None and op == 'DELETE':
|
||||
msg.append('%s - %s %s' % (prefix, op, attr))
|
||||
else:
|
||||
msg.append('%s - %s %s: %s' % (prefix, op, attr, val))
|
||||
return '\n'.join(msg)
|
||||
def format_changes(self, old, new, ignore_attrs=[], prefix=''):
|
||||
msg = []
|
||||
for (op, attr, val) in modlist.modifyModlist(old, new, ignore_attr_types=ignore_attrs):
|
||||
if op == ldap.MOD_ADD:
|
||||
op = 'ADD'
|
||||
elif op == ldap.MOD_DELETE:
|
||||
op = 'DELETE'
|
||||
elif op == ldap.MOD_REPLACE:
|
||||
op = 'REPLACE'
|
||||
else:
|
||||
op = 'UNKNOWN (=%s)' % op
|
||||
if val is None and op == 'DELETE':
|
||||
msg.append('%s - %s %s' % (prefix, op, attr))
|
||||
else:
|
||||
msg.append('%s - %s %s: %s' % (prefix, op, attr, val))
|
||||
return '\n'.join(msg)
|
||||
|
||||
def rename_object(self, dn, new_rdn, new_sup=None, delete_old=True):
|
||||
# If new_rdn is a complete DN, split new RDN and new superior DN
|
||||
if len(new_rdn.split(',')) > 1:
|
||||
self.logger.debug(
|
||||
"LdapServer - Rename with a full new DN detected (%s): split new RDN and new superior DN",
|
||||
new_rdn
|
||||
)
|
||||
assert new_sup is None, "You can't provide a complete DN as new_rdn and also provide new_sup parameter"
|
||||
new_dn_parts = new_rdn.split(',')
|
||||
new_sup = ','.join(new_dn_parts[1:])
|
||||
new_rdn = new_dn_parts[0]
|
||||
try:
|
||||
self.logger.debug(
|
||||
"LdapServer - Rename %s in %s (new superior: %s, delete old: %s)",
|
||||
dn,
|
||||
new_rdn,
|
||||
"same" if new_sup is None else new_sup,
|
||||
delete_old
|
||||
)
|
||||
self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error(
|
||||
"LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % (
|
||||
dn,
|
||||
new_rdn,
|
||||
"same" if new_sup is None else new_sup,
|
||||
delete_old,
|
||||
e
|
||||
),
|
||||
logging.ERROR
|
||||
)
|
||||
def rename_object(self, dn, new_rdn, new_sup=None, delete_old=True):
|
||||
# If new_rdn is a complete DN, split new RDN and new superior DN
|
||||
if len(new_rdn.split(',')) > 1:
|
||||
self.logger.debug(
|
||||
"LdapServer - Rename with a full new DN detected (%s): split new RDN and new superior DN",
|
||||
new_rdn
|
||||
)
|
||||
assert new_sup is None, "You can't provide a complete DN as new_rdn and also provide new_sup parameter"
|
||||
new_dn_parts = new_rdn.split(',')
|
||||
new_sup = ','.join(new_dn_parts[1:])
|
||||
new_rdn = new_dn_parts[0]
|
||||
try:
|
||||
self.logger.debug(
|
||||
"LdapServer - Rename %s in %s (new superior: %s, delete old: %s)",
|
||||
dn,
|
||||
new_rdn,
|
||||
"same" if new_sup is None else new_sup,
|
||||
delete_old
|
||||
)
|
||||
self.con.rename_s(dn, new_rdn, newsuperior=new_sup, delold=delete_old)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error(
|
||||
"LdapServer - Error renaming %s in %s (new superior: %s, delete old: %s): %s" % (
|
||||
dn,
|
||||
new_rdn,
|
||||
"same" if new_sup is None else new_sup,
|
||||
delete_old,
|
||||
e
|
||||
),
|
||||
logging.ERROR
|
||||
)
|
||||
|
||||
return False
|
||||
return False
|
||||
|
||||
def drop_object(self,dn):
|
||||
try:
|
||||
self.logger.debug("LdapServer - Delete %s" % dn)
|
||||
self.con.delete_s(dn)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR)
|
||||
def drop_object(self,dn):
|
||||
try:
|
||||
self.logger.debug("LdapServer - Delete %s" % dn)
|
||||
self.con.delete_s(dn)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
self._error("LdapServer - Error deleting %s : %s" % (dn,e), logging.ERROR)
|
||||
|
||||
return False
|
||||
return False
|
||||
|
||||
def get_dn(self,obj):
|
||||
return obj[0][0]
|
||||
def get_dn(self,obj):
|
||||
return obj[0][0]
|
||||
|
||||
def get_attr(self,obj,attr,all=None,default=None):
|
||||
if attr not in obj:
|
||||
for k in obj:
|
||||
if k.lower() == attr.lower():
|
||||
attr = k
|
||||
break
|
||||
if all is not None:
|
||||
if attr in obj:
|
||||
return obj[attr]
|
||||
else:
|
||||
return default or []
|
||||
else:
|
||||
if attr in obj:
|
||||
return obj[attr][0]
|
||||
else:
|
||||
return default
|
||||
def get_attr(self,obj,attr,all=None,default=None):
|
||||
if attr not in obj:
|
||||
for k in obj:
|
||||
if k.lower() == attr.lower():
|
||||
attr = k
|
||||
break
|
||||
if all is not None:
|
||||
if attr in obj:
|
||||
return obj[attr]
|
||||
else:
|
||||
return default or []
|
||||
else:
|
||||
if attr in obj:
|
||||
return obj[attr][0]
|
||||
else:
|
||||
return default
|
||||
|
||||
class LdapServerException(BaseException):
|
||||
def __init__(self,msg):
|
||||
BaseException.__init__(self, msg)
|
||||
def __init__(self,msg):
|
||||
BaseException.__init__(self, msg)
|
||||
|
||||
#
|
||||
# Helpers
|
||||
#
|
||||
def parse_datetime(value, to_timezone=None, default_timezone=None, naive=None):
|
||||
"""
|
||||
Convert LDAP date string to datetime.datetime object
|
||||
"""
|
||||
Convert LDAP date string to datetime.datetime object
|
||||
|
||||
:param value: The LDAP date string to convert
|
||||
:param to_timezone: If specified, the return datetime will be converted to this
|
||||
specific timezone (optional, default : timezone of the LDAP date string)
|
||||
:param default_timezone: The timezone used if LDAP date string does not specified
|
||||
the timezone (optional, default : server local timezone)
|
||||
:param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP)
|
||||
"""
|
||||
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
|
||||
assert default_timezone is None or isinstance(default_timezone, datetime.tzinfo) or isinstance(default_timezone, pytz.tzinfo.DstTzInfo) or isinstance(default_timezone, str), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone)
|
||||
date = dateutil.parser.parse(value, dayfirst=False)
|
||||
if not date.tzinfo:
|
||||
if naive:
|
||||
return date
|
||||
if not default_timezone:
|
||||
default_timezone = pytz.utc
|
||||
elif default_timezone == 'local':
|
||||
default_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(default_timezone, str):
|
||||
default_timezone = pytz.timezone(default_timezone)
|
||||
if isinstance(default_timezone, pytz.tzinfo.DstTzInfo):
|
||||
date = default_timezone.localize(date)
|
||||
elif isinstance(default_timezone, datetime.tzinfo):
|
||||
date = date.replace(tzinfo=default_timezone)
|
||||
else:
|
||||
raise Exception("It's not supposed to happen!")
|
||||
elif naive:
|
||||
return date.replace(tzinfo=None)
|
||||
if to_timezone:
|
||||
if to_timezone == 'local':
|
||||
to_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(to_timezone, str):
|
||||
to_timezone = pytz.timezone(to_timezone)
|
||||
return date.astimezone(to_timezone)
|
||||
return date
|
||||
:param value: The LDAP date string to convert
|
||||
:param to_timezone: If specified, the return datetime will be converted to this
|
||||
specific timezone (optional, default : timezone of the LDAP date string)
|
||||
:param default_timezone: The timezone used if LDAP date string does not specified
|
||||
the timezone (optional, default : server local timezone)
|
||||
:param naive: Use naive datetime : return naive datetime object (without timezone conversion from LDAP)
|
||||
"""
|
||||
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
|
||||
assert default_timezone is None or isinstance(default_timezone, datetime.tzinfo) or isinstance(default_timezone, pytz.tzinfo.DstTzInfo) or isinstance(default_timezone, str), 'default_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(default_timezone)
|
||||
date = dateutil.parser.parse(value, dayfirst=False)
|
||||
if not date.tzinfo:
|
||||
if naive:
|
||||
return date
|
||||
if not default_timezone:
|
||||
default_timezone = pytz.utc
|
||||
elif default_timezone == 'local':
|
||||
default_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(default_timezone, str):
|
||||
default_timezone = pytz.timezone(default_timezone)
|
||||
if isinstance(default_timezone, pytz.tzinfo.DstTzInfo):
|
||||
date = default_timezone.localize(date)
|
||||
elif isinstance(default_timezone, datetime.tzinfo):
|
||||
date = date.replace(tzinfo=default_timezone)
|
||||
else:
|
||||
raise Exception("It's not supposed to happen!")
|
||||
elif naive:
|
||||
return date.replace(tzinfo=None)
|
||||
if to_timezone:
|
||||
if to_timezone == 'local':
|
||||
to_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(to_timezone, str):
|
||||
to_timezone = pytz.timezone(to_timezone)
|
||||
return date.astimezone(to_timezone)
|
||||
return date
|
||||
|
||||
def parse_date(value, to_timezone=None, default_timezone=None, naive=None):
|
||||
"""
|
||||
Convert LDAP date string to datetime.date object
|
||||
"""
|
||||
Convert LDAP date string to datetime.date object
|
||||
|
||||
:param value: The LDAP date string to convert
|
||||
:param to_timezone: If specified, the return datetime will be converted to this
|
||||
specific timezone (optional, default : timezone of the LDAP date string)
|
||||
:param default_timezone: The timezone used if LDAP date string does not specified
|
||||
the timezone (optional, default : server local timezone)
|
||||
:param naive: Use naive datetime : do not handle timezone conversion from LDAP
|
||||
"""
|
||||
return parse_datetime(value, to_timezone, default_timezone, naive).date()
|
||||
:param value: The LDAP date string to convert
|
||||
:param to_timezone: If specified, the return datetime will be converted to this
|
||||
specific timezone (optional, default : timezone of the LDAP date string)
|
||||
:param default_timezone: The timezone used if LDAP date string does not specified
|
||||
the timezone (optional, default : server local timezone)
|
||||
:param naive: Use naive datetime : do not handle timezone conversion from LDAP
|
||||
"""
|
||||
return parse_datetime(value, to_timezone, default_timezone, naive).date()
|
||||
|
||||
def format_datetime(value, from_timezone=None, to_timezone=None, naive=None):
|
||||
"""
|
||||
Convert datetime.datetime object to LDAP date string
|
||||
"""
|
||||
Convert datetime.datetime object to LDAP date string
|
||||
|
||||
:param value: The datetime.datetime object to convert
|
||||
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
|
||||
(optional, default : server local timezone)
|
||||
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
|
||||
:param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion)
|
||||
"""
|
||||
assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value)
|
||||
assert from_timezone is None or isinstance(from_timezone, datetime.tzinfo) or isinstance(from_timezone, pytz.tzinfo.DstTzInfo) or isinstance(from_timezone, str), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone)
|
||||
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
|
||||
if not value.tzinfo and not naive:
|
||||
if not from_timezone or from_timezone == 'local':
|
||||
from_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(from_timezone, str):
|
||||
from_timezone = pytz.timezone(from_timezone)
|
||||
if isinstance(from_timezone, pytz.tzinfo.DstTzInfo):
|
||||
from_value = from_timezone.localize(value)
|
||||
elif isinstance(from_timezone, datetime.tzinfo):
|
||||
from_value = value.replace(tzinfo=from_timezone)
|
||||
else:
|
||||
raise Exception("It's not supposed to happen!")
|
||||
elif naive:
|
||||
from_value = value.replace(tzinfo=pytz.utc)
|
||||
else:
|
||||
from_value = copy.deepcopy(value)
|
||||
if not to_timezone:
|
||||
to_timezone = pytz.utc
|
||||
elif to_timezone == 'local':
|
||||
to_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(to_timezone, str):
|
||||
to_timezone = pytz.timezone(to_timezone)
|
||||
to_value = from_value.astimezone(to_timezone) if not naive else from_value
|
||||
datestring = to_value.strftime('%Y%m%d%H%M%S%z')
|
||||
if datestring.endswith('+0000'):
|
||||
datestring = datestring.replace('+0000', 'Z')
|
||||
return datestring
|
||||
:param value: The datetime.datetime object to convert
|
||||
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
|
||||
(optional, default : server local timezone)
|
||||
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
|
||||
:param naive: Use naive datetime : datetime store as UTC in LDAP (without conversion)
|
||||
"""
|
||||
assert isinstance(value, datetime.datetime), 'First parameter must be an datetime.datetime object (not %s)' % type(value)
|
||||
assert from_timezone is None or isinstance(from_timezone, datetime.tzinfo) or isinstance(from_timezone, pytz.tzinfo.DstTzInfo) or isinstance(from_timezone, str), 'from_timezone parameter must be None, a string, a pytz.tzinfo.DstTzInfo or a datetime.tzinfo object (not %s)' % type(from_timezone)
|
||||
assert to_timezone is None or isinstance(to_timezone, datetime.tzinfo) or isinstance(to_timezone, str), 'to_timezone must be None, a datetime.tzinfo object or a string (not %s)' % type(to_timezone)
|
||||
if not value.tzinfo and not naive:
|
||||
if not from_timezone or from_timezone == 'local':
|
||||
from_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(from_timezone, str):
|
||||
from_timezone = pytz.timezone(from_timezone)
|
||||
if isinstance(from_timezone, pytz.tzinfo.DstTzInfo):
|
||||
from_value = from_timezone.localize(value)
|
||||
elif isinstance(from_timezone, datetime.tzinfo):
|
||||
from_value = value.replace(tzinfo=from_timezone)
|
||||
else:
|
||||
raise Exception("It's not supposed to happen!")
|
||||
elif naive:
|
||||
from_value = value.replace(tzinfo=pytz.utc)
|
||||
else:
|
||||
from_value = copy.deepcopy(value)
|
||||
if not to_timezone:
|
||||
to_timezone = pytz.utc
|
||||
elif to_timezone == 'local':
|
||||
to_timezone = dateutil.tz.tzlocal()
|
||||
elif isinstance(to_timezone, str):
|
||||
to_timezone = pytz.timezone(to_timezone)
|
||||
to_value = from_value.astimezone(to_timezone) if not naive else from_value
|
||||
datestring = to_value.strftime('%Y%m%d%H%M%S%z')
|
||||
if datestring.endswith('+0000'):
|
||||
datestring = datestring.replace('+0000', 'Z')
|
||||
return datestring
|
||||
|
||||
def format_date(value, from_timezone=None, to_timezone=None, naive=None):
|
||||
"""
|
||||
Convert datetime.date object to LDAP date string
|
||||
"""
|
||||
Convert datetime.date object to LDAP date string
|
||||
|
||||
:param value: The datetime.date object to convert
|
||||
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
|
||||
(optional, default : server local timezone)
|
||||
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
|
||||
:param naive: Use naive datetime : do not handle timezone conversion before formating
|
||||
and return datetime as UTC (because LDAP required a timezone)
|
||||
"""
|
||||
assert isinstance(value, datetime.date), 'First parameter must be an datetime.date object (not %s)' % type(value)
|
||||
return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive)
|
||||
:param value: The datetime.date object to convert
|
||||
:param from_timezone: The timezone used if datetime.datetime object is naive (no tzinfo)
|
||||
(optional, default : server local timezone)
|
||||
:param to_timezone: The timezone used in LDAP (optional, default : UTC)
|
||||
:param naive: Use naive datetime : do not handle timezone conversion before formating
|
||||
and return datetime as UTC (because LDAP required a timezone)
|
||||
"""
|
||||
assert isinstance(value, datetime.date), 'First parameter must be an datetime.date object (not %s)' % type(value)
|
||||
return format_datetime(datetime.datetime.combine(value, datetime.datetime.min.time()), from_timezone, to_timezone, naive)
|
||||
|
||||
#
|
||||
# Tests
|
||||
#
|
||||
if __name__ == '__main__':
|
||||
now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal())
|
||||
print("Now = %s" % now)
|
||||
now = datetime.datetime.now().replace(tzinfo=dateutil.tz.tzlocal())
|
||||
print("Now = %s" % now)
|
||||
|
||||
datestring_now = format_datetime(now)
|
||||
print("format_datetime : %s" % datestring_now)
|
||||
print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc))
|
||||
print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local'))
|
||||
print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
|
||||
print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc))
|
||||
print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local'))
|
||||
print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo'))
|
||||
print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True))
|
||||
datestring_now = format_datetime(now)
|
||||
print("format_datetime : %s" % datestring_now)
|
||||
print("format_datetime (from_timezone=utc) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=pytz.utc))
|
||||
print("format_datetime (from_timezone=local) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_datetime (from_timezone='local') : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='local'))
|
||||
print("format_datetime (from_timezone=Paris) : %s" % format_datetime(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
|
||||
print("format_datetime (to_timezone=utc) : %s" % format_datetime(now, to_timezone=pytz.utc))
|
||||
print("format_datetime (to_timezone=local) : %s" % format_datetime(now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_datetime (to_timezone='local') : %s" % format_datetime(now, to_timezone='local'))
|
||||
print("format_datetime (to_timezone=Tokyo) : %s" % format_datetime(now, to_timezone='Asia/Tokyo'))
|
||||
print("format_datetime (naive=True) : %s" % format_datetime(now, naive=True))
|
||||
|
||||
print("format_date : %s" % format_date(now))
|
||||
print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc))
|
||||
print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local'))
|
||||
print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
|
||||
print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc))
|
||||
print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local'))
|
||||
print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo'))
|
||||
print("format_date (naive=True) : %s" % format_date(now, naive=True))
|
||||
print("format_date : %s" % format_date(now))
|
||||
print("format_date (from_timezone=utc) : %s" % format_date(now.replace(tzinfo=None), from_timezone=pytz.utc))
|
||||
print("format_date (from_timezone=local) : %s" % format_date(now.replace(tzinfo=None), from_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_date (from_timezone='local') : %s" % format_date(now.replace(tzinfo=None), from_timezone='local'))
|
||||
print("format_date (from_timezone=Paris) : %s" % format_date(now.replace(tzinfo=None), from_timezone='Europe/Paris'))
|
||||
print("format_date (to_timezone=utc) : %s" % format_date(now, to_timezone=pytz.utc))
|
||||
print("format_date (to_timezone=local) : %s" % format_date(now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("format_date (to_timezone='local') : %s" % format_date(now, to_timezone='local'))
|
||||
print("format_date (to_timezone=Tokyo) : %s" % format_date(now, to_timezone='Asia/Tokyo'))
|
||||
print("format_date (naive=True) : %s" % format_date(now, naive=True))
|
||||
|
||||
|
||||
print("parse_datetime : %s" % parse_datetime(datestring_now))
|
||||
print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc))
|
||||
print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local'))
|
||||
print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris'))
|
||||
print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc))
|
||||
print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local'))
|
||||
print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo'))
|
||||
print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True))
|
||||
print("parse_datetime : %s" % parse_datetime(datestring_now))
|
||||
print("parse_datetime (default_timezone=utc) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=pytz.utc))
|
||||
print("parse_datetime (default_timezone=local) : %s" % parse_datetime(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_datetime (default_timezone='local') : %s" % parse_datetime(datestring_now[0:-1], default_timezone='local'))
|
||||
print("parse_datetime (default_timezone=Paris) : %s" % parse_datetime(datestring_now[0:-1], default_timezone='Europe/Paris'))
|
||||
print("parse_datetime (to_timezone=utc) : %s" % parse_datetime(datestring_now, to_timezone=pytz.utc))
|
||||
print("parse_datetime (to_timezone=local) : %s" % parse_datetime(datestring_now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_datetime (to_timezone='local') : %s" % parse_datetime(datestring_now, to_timezone='local'))
|
||||
print("parse_datetime (to_timezone=Tokyo) : %s" % parse_datetime(datestring_now, to_timezone='Asia/Tokyo'))
|
||||
print("parse_datetime (naive=True) : %s" % parse_datetime(datestring_now, naive=True))
|
||||
|
||||
print("parse_date : %s" % parse_date(datestring_now))
|
||||
print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc))
|
||||
print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local'))
|
||||
print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris'))
|
||||
print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc))
|
||||
print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local'))
|
||||
print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo'))
|
||||
print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True))
|
||||
print("parse_date : %s" % parse_date(datestring_now))
|
||||
print("parse_date (default_timezone=utc) : %s" % parse_date(datestring_now[0:-1], default_timezone=pytz.utc))
|
||||
print("parse_date (default_timezone=local) : %s" % parse_date(datestring_now[0:-1], default_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_date (default_timezone='local') : %s" % parse_date(datestring_now[0:-1], default_timezone='local'))
|
||||
print("parse_date (default_timezone=Paris) : %s" % parse_date(datestring_now[0:-1], default_timezone='Europe/Paris'))
|
||||
print("parse_date (to_timezone=utc) : %s" % parse_date(datestring_now, to_timezone=pytz.utc))
|
||||
print("parse_date (to_timezone=local) : %s" % parse_date(datestring_now, to_timezone=dateutil.tz.tzlocal()))
|
||||
print("parse_date (to_timezone='local') : %s" % parse_date(datestring_now, to_timezone='local'))
|
||||
print("parse_date (to_timezone=Tokyo) : %s" % parse_date(datestring_now, to_timezone='Asia/Tokyo'))
|
||||
print("parse_date (naive=True) : %s" % parse_date(datestring_now, naive=True))
|
||||
|
|
Loading…
Reference in a new issue