From dc0f17dd2000c78af918018094c183301ba30077 Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Wed, 3 Nov 2021 17:38:31 +0100 Subject: [PATCH] Add config lib --- mylib/config.py | 629 ++++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 6 + 2 files changed, 635 insertions(+) create mode 100644 mylib/config.py diff --git a/mylib/config.py b/mylib/config.py new file mode 100644 index 0000000..ab94297 --- /dev/null +++ b/mylib/config.py @@ -0,0 +1,629 @@ +# -*- coding: utf-8 -*- + +""" Configuration & options parser """ + +import argparse +from configparser import ConfigParser +from getpass import getpass +from logging.config import fileConfig +import logging +import os +import stat +import sys + +import argcomplete +import keyring +from systemd.journal import JournalHandler + + +log = logging.getLogger(__name__) + + +# Constants +DEFAULT_ENCODING = 'utf-8' +DEFAULT_CONFIG_DIRPATH = os.path.expanduser('./') + + +class BaseOption: + """ Base configuration option class """ + + def __init__(self, config, section, name, default=None, comment=None, + arg=None, short_arg=None, arg_help=None): + self.config = config + self.section = section + self.name = name + self.default = default + self.comment = comment + self.arg = arg + self.short_arg = short_arg + self.arg_help = arg_help if arg_help else comment + + @property + def _isset_in_options(self): + """ Check if option is defined in registered arguments parser options """ + return ( + self.config.options + and self._from_options != self.default + ) + + @property + def _from_options(self): + """ Get option from arguments parser options """ + value = ( + getattr(self.config.options, self.parser_dest) + if self.config.options else None + ) + log.debug( + '_from_options(%s, %s) = %s', + self.section.name, self.name, value + ) + return value + + @property + def _isset_in_config_file(self): + """ Check if option is defined in the loaded configuration file """ + return ( + self.config.config_parser + and self.config.config_parser.has_option(self.section.name, self.name) + ) + + @property + def _from_config(self): + """ Get option value from ConfigParser """ + return self.config.config_parser.get(self.section.name, self.name) + + def isset(self): + """ Check if option is defined in the loaded configuration file """ + return self._isset_in_config_file or self._isset_in_options + + def get(self): + """ Get option value from options, config or default """ + if self._isset_in_options: + return self._from_options + if self._isset_in_config_file: + return self._from_config + return self.default + + def set(self, value): + """ Set option value to config file """ + assert self.config.config_parser, "Can set option value only if configuration file is configured" + if value == '': + value = None + + if value == self.default or value is None: + # Remove option from config + self.config.config_parser.remove_option( + self.section.name, self.name) + else: + # Store option to config + if not self.config.config_parser.has_section(self.section.name): + self.config.config_parser.add_section(self.section.name) + + self.config.config_parser.set( + self.section.name, self.name, self.to_config(value) + ) + + @property + def parser_action(self): + """ Get action as accept by argparse.ArgumentParser """ + return 'store' + + @property + def parser_type(self): + """ Get type as handle by argparse.ArgumentParser """ + return str + + @property + def parser_dest(self): + """ Get option name in arguments parser options """ + return '{0}_{1}'.format(self.section.name, self.name) + + @property + def parser_help(self): + """ Get option help message in arguments parser options """ + if self.arg_help and self.default is not None: + return '{0} (Default: {1})'.format(self.arg_help, self.default) + if self.arg_help: + return self.arg_help + return None + + @property + def parser_argument_name(self): + """ Get option argument name in parser options """ + return ( + self.arg if self.arg else + '--{0}-{1}'.format( + self.section.name, self.name + ).lower().replace('_', '-') + ) + + def add_option_to_parser(self, section_opts): + """ Add option to arguments parser """ + args = [self.parser_argument_name] + if self.short_arg: + args.append(self.short_arg) + kwargs = dict( + action=self.parser_action, + dest=self.parser_dest, + help=self.parser_help, + default=self.default, + ) + if self.parser_type: # pylint: disable=using-constant-test + kwargs['type'] = self.parser_type + + log.debug( + 'add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s', + self.section.name, self.name, ', '.join(args), kwargs + ) + section_opts.add_argument(*args, **kwargs) + + def to_config(self, value=None): + """ Format value as stored in configuration file """ + value = value if value is not None else self.get() + return '' if value is None else str(value) + + def export_to_config(self): + """ Export option to configuration file """ + lines = [] + if self.comment: + lines.append('# ' + self.comment) + value = self.to_config() + default_value = ( + '' if self.default is None else + self.to_config(self.default) + ) + log.debug( + 'export_to_config(%s, %s): value=%s / default=%s', + self.section.name, self.name, value, default_value) + if default_value: + lines.append( + '# Default: %s' % default_value + ) + if value and value != default_value: + lines.append( + '%s = %s' % + (self.name, value) + ) + else: + lines.append('# %s =' % self.name) + lines.append('') + return '\n'.join(lines) + + +class StringOption(BaseOption): + """ String configuration option class """ + + +class BooleanOption(BaseOption): + """ Boolean configuration option class """ + + @property + def _from_config(self): + """ Get option value from ConfigParser """ + return self.config.config_parser.getboolean(self.section.name, self.name) + + def to_config(self, value=None): + """ Format value as stored in configuration file """ + return super().to_config(value).lower() + + @property + def parser_action(self): + return "store_" + str(bool(not self.default)).lower() + + @property + def parser_type(self): + return None + + +class FloatOption(BaseOption): + """ Float configuration option class """ + + @property + def _from_config(self): + """ Get option value from ConfigParser """ + return self.config.config_parser.getfloat(self.section.name, self.name) + + @property + def parser_type(self): + return float + + +class IntegerOption(BaseOption): + """ Integer configuration option class """ + + @property + def _from_config(self): + """ Get option value from ConfigParser """ + return self.config.config_parser.getint(self.section.name, self.name) + + def to_config(self, value=None): + """ Format value as stored in configuration file """ + value = value if value is not None else self.get() + return str(int(value)) if value is not None else '' + + @property + def parser_type(self): + return int + + +class PasswordOption(StringOption): + """ Password configuration option class """ + + def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs): + super().__init__(*arg, **kwargs) + self.username_option = username_option + self.keyring_value = keyring_value if keyring_value is not None else 'keyring' + + @property + def _keyring_service_name(self): + """ Return keyring service name """ + return '.'.join([ + self.config.shortname, + self.section.name, self.name + ]) + + @property + def _keyring_username(self): + """ Return keyring username """ + return self.section.get(self.username_option) if self.username_option else self.name + + def get(self): + """ Get option value """ + value = super().get() + + if value != self.keyring_value: + return value + + service_name = self._keyring_service_name + username = self._keyring_username + log.debug( + 'Retreive password %s for username=%s from keyring', + service_name, username + ) + value = keyring.get_password(service_name, username) + + if value is None: + value = getpass( + 'Please enter %s%s: ' % ( + self.comment if self.comment else + '%s %s' % (self.section.name, self.name), + (' for %s' % username) + if username != self.name else '' + ) + ) + keyring.set_password(service_name, username, value) + return value + + def to_config(self, value=None): + """ Format value as stored in configuration file """ + if super().get() == self.keyring_value: + return self.keyring_value + return super().to_config(value) + + def set(self, value): + """ Set option value to config file """ + if super().get() == self.keyring_value: + keyring.set_password( + self._keyring_service_name, self._keyring_username, + value) + value = self.keyring_value + super().set(value) + + +class ConfigSection: + """ Configuration section class """ + + def __init__(self, config, name, comment=None, order=None): + self.config = config + self.name = name + self.options = dict() + self.comment = comment + self.order = order if isinstance(order, int) else 10 + + def add_option(self, _type, name, **kwargs): + """ + Add option + + :param _type: Option type, derivated from BaseOption + :param name: Option name + :param **kwargs: Dict of raw option for type class + """ + assert not self.defined(name), "Duplicated option %s" % name + self.options[name] = _type(self.config, self, name, **kwargs) + return self.options[name] + + def defined(self, option): + """ Check if option is defined """ + return option in self.options + + def isset(self, option): + """ Check if option is set """ + return self.defined(option) and self.options[option].isset() + + def get(self, option): + """ Get option value """ + assert self.defined(option), "Option %s unknown" % option + return self.options[option].get() + + def set(self, option, value): + """ Set option value """ + assert self.defined(option), "Option %s unknown" % option + return self.options[option].set(value) + + def add_options_to_parser(self, parser): + """ Add section to argparse.ArgumentParser """ + assert isinstance(parser, argparse.ArgumentParser) + section_opts = parser.add_argument_group( + self.comment if self.comment + else self.name.capitalize() + ) + + for option in self.options: + self.options[option].add_option_to_parser(section_opts) + + def export_to_config(self): + """ Export section and their options to configuration file """ + lines = [] + if self.comment: + lines.append('# %s' % self.comment) + lines.append('[%s]' % self.name) + for option in self.options: + lines.append(self.options[option].export_to_config()) + return '\n'.join(lines) + + +class Config: + """ Configuration helper """ + + def __init__(self, appname, shortname=None, version=None, encoding=None): + self.appname = appname + self.shortname = shortname + self.version = version if version else '0.0' + self.encoding = encoding if encoding else 'utf-8' + self.config_parser = None + self.options_parser = None + self.options = None + self.sections = {} + self._loaded_callbacks = [] + self._filepath = None + + def add_section(self, name, loaded_callback=None, **kwargs): + """ + Add section + + : param name: The section name + : param loaded_callback: An optional callback method that will be executed after configuration is loaded + The specified callback method will receive Config object as parameter. + : param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method + """ + assert name not in self.sections, "Duplicated section %s" % name + + self.sections[name] = ConfigSection(self, name, **kwargs) + if loaded_callback: + self._loaded_callbacks.append(loaded_callback) + # If configuration is already loaded, execute callback immediatly + if self._filepath: + loaded_callback(self) + return self.sections[name] + + def defined(self, section, option): + """ Check option is defined in specified section """ + return section in self.sections and self.sections[section].defined(option) + + def isset(self, section, option): + """ Check option is set in specified section """ + return section in self.sections and self.sections[section].isset(option) + + def get(self, section, option): + """ Get option value """ + assert self.config_parser or self.options, 'Unconfigured options parser' + assert self.defined( + section, option), 'Unknown option %s.%s' % (section, option) + value = self.sections[section].get(option) + log.debug('get(%s, %s): %s (%s)', section, option, value, type(value)) + return value + + def set(self, section, option, value): + """ Set option value """ + assert self.config_parser, 'Unconfigured options parser' + assert self.defined( + section, option), 'Unknown option %s.%s' % (section, option) + self.sections[section].set(option, value) + + def load_file(self, filepath): + """ Read configuration file """ + + self.config_parser = ConfigParser() + self._filepath = filepath + + # Checking access of configuration file + if not os.path.isfile(filepath): + return True + + if not os.access(filepath, os.R_OK): + return False + + try: + self.config_parser.read(filepath, encoding=self.encoding) + except Exception: # pylint: disable=broad-except + self.config_parser = None + log.exception('Failed to read configuration file %s', filepath) + return False + + # Logging initialization + if self.config_parser.has_section('loggers'): + fileConfig(filepath) + else: + # Otherwise, use systemd journal handler + handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname) + handler.setFormatter( + logging.Formatter( + '%(levelname)s | %(name)s | %(message)s' + ) + ) + logging.getLogger().addHandler(handler) + + self._filepath = filepath + + # Execute loaded callbacks + for callback in self._loaded_callbacks: + callback(self) + + return True + + def save(self, filepath=None): + """ Save configuration file """ + filepath = filepath if filepath else self._filepath + assert filepath, 'Configuration filepath is not set or provided' + + # Checking access of target directory + dirpath = os.path.dirname(filepath) + if not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK): + log.error( + 'Configuration directory "%s" does not exist (or not writable)', dirpath) + return False + + if os.path.isfile(filepath) and not os.access(filepath, os.W_OK): + log.error('Configuration file "%s" is not writable', filepath) + return False + + lines = ['#\n# %s configuration\n#\n' % self.appname] + + for section_name in self._ordered_section_names: + lines.append('') + lines.append(self.sections[section_name].export_to_config()) + + try: + with open(filepath, 'wb') as fd: + fd.write( + '\n'.join(lines).encode(self.encoding) + ) + + # Privacy! + os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR) + except Exception: # pylint: disable=broad-except + log.exception( + 'Failed to write generated configuration file %s', filepath) + return False + return True + + @property + def _ordered_section_names(self): + """ Get ordered list of section names """ + return sorted(self.sections.keys(), key=lambda section: self.sections[section].order) + + def get_arguments_parser(self, **kwargs): + """ Get arguments parser """ + if self.options_parser: + return self.options_parser + + self.options_parser = argparse.ArgumentParser( + description=self.appname, **kwargs) + + self.options_parser.add_argument( + '-c', + '--config', + default=self.config_filepath, + help='Configuration file to use (default: %s)' % + self.config_filepath + ) + + self.options_parser.add_argument( + '--save', + action='store_true', + dest='save', + help='Save current configuration to file', + ) + + self.options_parser.add_argument( + '-d', + '--debug', + action='store_true', + help='Show debug messages' + ) + + self.options_parser.add_argument( + '-v', + '--verbose', + action='store_true', + help='Show verbose messages' + ) + + self.add_options_to_parser(self.options_parser) + + return self.options_parser + + def parse_arguments_options(self, argv=None, parser=None, create=True, exit_after_created=True): + """ + Parse arguments options + + :param argv: Optional arguments list to parse (default: sys.argv[1:]) + :param parser: Optional argparse.ArgumentParser use + (default: generated by self.get_arguments_parser()) + :param create: If True, configuration file will be created if it does not exits + (default: True) + :param exit_after_created: If True, script will end after configuration file + creation (default: True) + """ + parser = parser if parser else self.get_arguments_parser() + argcomplete.autocomplete(parser) + options = parser.parse_args(argv if argv else sys.argv[1:]) + + if options.config: + options.config = os.path.abspath(options.config) + + already_saved = False + if not os.path.isfile(options.config) and (create or options.save): + log.warning( + "Configuration file is missing, generate it (%s)", + options.config + ) + self.save(options.config) + if exit_after_created: + sys.exit(0) + already_saved = True + + # Load configuration file + if os.path.isfile(options.config) and not self.load_file(options.config): + parser.error( + 'Failed to load configuration from file %s' % options.config + ) + + if options.save and not already_saved: + self.save() + sys.exit(0) + + if options.debug: + logging.getLogger().setLevel(logging.DEBUG) + elif options.verbose: + logging.getLogger().setLevel(logging.INFO) + + self.load_options(options) + return options + + def load_options(self, options): + """ Register arguments parser options """ + assert isinstance(options, argparse.Namespace) + self.options = options + log.debug('Argument options: %s', options) + + def add_options_to_parser(self, parser): + """ Add sections and their options to parser """ + for section in self._ordered_section_names: + self.sections[section].add_options_to_parser(parser) + + @property + def config_dir(self): + """ Retrieve configuration directory path """ + return os.path.dirname(self._filepath) if self._filepath else DEFAULT_CONFIG_DIRPATH + + @property + def config_filepath(self): + """ Retrieve configuration file path """ + if self._filepath: + return self._filepath + return os.path.join( + self.config_dir, + ("%s.ini" % self.shortname) if self.shortname + else "config.ini" + ) diff --git a/setup.py b/setup.py index 53425ec..0135e34 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,12 @@ setup( 'pytest', 'pylint', ], + 'config': [ + 'configparser', + 'argcomplete', + 'keyring', + 'systemd-python', + ], 'ldap': [ 'python-ldap', 'python-dateutil',