check_slapdd_crc32/check_slapdd_crc32
Benjamin Renard d246980abc
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Add CI for testing and publishing (gitea version & debian package)
2022-05-01 20:51:15 +02:00

181 lines
4.7 KiB
Python
Executable file

#!/usr/bin/python3
"""
OpenLDAP tool to check CRC32 of LDIF files of slapd.d directory
"""
import argparse
import binascii
import logging
import os
import re
import sys
version = '0.0'
default_slapdd_path = '/etc/ldap/slapd.d'
# Main
parser = argparse.ArgumentParser(
description=f'{__doc__} (version: {version})'
)
parser.add_argument(
'-d', '--debug',
action='store_true',
help='Show debug messages'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Show verbose messages'
)
parser.add_argument(
'-l',
'--log-file',
action="store",
type=str,
dest="logfile",
help="Log file path"
)
parser.add_argument(
'-C', '--console',
action='store_true',
help='Also log on console (even if log file is provided)'
)
parser.add_argument(
'-f', '--fix',
action='store_true',
help='Fix CRC32 value in LDIF files'
)
parser.add_argument(
'-p', '--path',
action='store',
type=str,
dest='slapdd_path',
help=f'Default slapd.d directory path (default: {default_slapdd_path}',
default=default_slapdd_path
)
options = parser.parse_args()
# Initialize log
log = logging.getLogger()
logformat = logging.Formatter(
f'%(asctime)s - {os.path.basename(sys.argv[0])} - %(levelname)s - '
'%(message)s')
if options.debug:
log.setLevel(logging.DEBUG)
elif options.verbose:
log.setLevel(logging.INFO)
else:
log.setLevel(logging.WARNING)
if options.logfile:
logfile = logging.FileHandler(options.logfile)
logfile.setFormatter(logformat)
log.addHandler(logfile)
if not options.logfile or options.console:
logconsole = logging.StreamHandler()
logconsole.setFormatter(logformat)
log.addHandler(logconsole)
def check_file(dir_path, file_name):
"""
Check CRC32 of an LDIF file
:param dir_path: The directory path
:param file_name: The file name
"""
path = os.path.join(dir_path, file_name)
lines = []
current_crc32 = None
try:
with open(path, 'rb') as fd:
for line in fd.readlines():
if line.startswith(
b'# AUTO-GENERATED FILE - DO NOT EDIT!! Use ldapmodify.'
):
logging.debug(
'%s: AUTO-GENERATED line detected, pass (%s)',
path, line)
continue
if line.startswith(b'# CRC32 '):
logging.debug(
'%s: CRC32 line detected, retreive current CRC32 '
'value (%s)',
path, line)
current_crc32 = re.match(
'^# CRC32 (.*)$', line.decode()
).group(1)
logging.debug(
'%s: current CRC32 found is "%s"',
path, current_crc32)
continue
lines.append(line)
except IOError as err:
logging.error('%s: fail to read file content (%s)', path, err)
return False
# pylint: disable=consider-using-f-string
crc32 = (
"%08X" % (
(binascii.crc32(b"".join(lines)) & 0xFFFFFFFF) % (1 << 32)
)
).lower()
if current_crc32:
if current_crc32 == crc32:
log.info('%s: current CRC32 value is correct (%s)', path, crc32)
else:
log.warning(
'%s: invalid CRC32 value found (%s != %s)',
path, current_crc32, crc32)
fix_crc32(path, crc32, lines)
else:
log.warning(
'%s: no CRC32 value found. Correct CRC32 value is "%s".',
path, crc32)
fix_crc32(path, crc32, lines)
return True
def fix_crc32(path, crc32, lines):
"""
Fix CRC32 value of an LDIF file
:param path: The file path
:param crc32: The CRC32 value of the file
:param lines: Array of file lines without headers
"""
if not options.fix:
return True
try:
headers_lines = [
b'# AUTO-GENERATED FILE - DO NOT EDIT!! Use ldapmodify.\n',
b'# CRC32 ' + crc32.encode() + b'\n',
]
with open(path, 'wb') as fd:
for line in headers_lines + lines:
fd.write(line)
except IOError as err:
logging.error('%s: fail to write new file content (%s)', path, err)
return False
return True
log.info('Checking CRC32 in slapd directory "%s"', options.slapdd_path)
for dirpath, dnames, fnames in os.walk(options.slapdd_path):
log.debug(
'%s: sub-dirs = "%s", files = "%s"',
dirpath, '", "'.join(dnames), '", "'.join(fnames))
for fname in fnames:
if fname.endswith('.ldif'):
check_file(dirpath, fname)