Add config lib
This commit is contained in:
parent
439a1e9b33
commit
dc0f17dd20
2 changed files with 635 additions and 0 deletions
629
mylib/config.py
Normal file
629
mylib/config.py
Normal file
|
@ -0,0 +1,629 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" Configuration & options parser """
|
||||
|
||||
import argparse
|
||||
from configparser import ConfigParser
|
||||
from getpass import getpass
|
||||
from logging.config import fileConfig
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
|
||||
import argcomplete
|
||||
import keyring
|
||||
from systemd.journal import JournalHandler
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Constants
|
||||
DEFAULT_ENCODING = 'utf-8'
|
||||
DEFAULT_CONFIG_DIRPATH = os.path.expanduser('./')
|
||||
|
||||
|
||||
class BaseOption:
|
||||
""" Base configuration option class """
|
||||
|
||||
def __init__(self, config, section, name, default=None, comment=None,
|
||||
arg=None, short_arg=None, arg_help=None):
|
||||
self.config = config
|
||||
self.section = section
|
||||
self.name = name
|
||||
self.default = default
|
||||
self.comment = comment
|
||||
self.arg = arg
|
||||
self.short_arg = short_arg
|
||||
self.arg_help = arg_help if arg_help else comment
|
||||
|
||||
@property
|
||||
def _isset_in_options(self):
|
||||
""" Check if option is defined in registered arguments parser options """
|
||||
return (
|
||||
self.config.options
|
||||
and self._from_options != self.default
|
||||
)
|
||||
|
||||
@property
|
||||
def _from_options(self):
|
||||
""" Get option from arguments parser options """
|
||||
value = (
|
||||
getattr(self.config.options, self.parser_dest)
|
||||
if self.config.options else None
|
||||
)
|
||||
log.debug(
|
||||
'_from_options(%s, %s) = %s',
|
||||
self.section.name, self.name, value
|
||||
)
|
||||
return value
|
||||
|
||||
@property
|
||||
def _isset_in_config_file(self):
|
||||
""" Check if option is defined in the loaded configuration file """
|
||||
return (
|
||||
self.config.config_parser
|
||||
and self.config.config_parser.has_option(self.section.name, self.name)
|
||||
)
|
||||
|
||||
@property
|
||||
def _from_config(self):
|
||||
""" Get option value from ConfigParser """
|
||||
return self.config.config_parser.get(self.section.name, self.name)
|
||||
|
||||
def isset(self):
|
||||
""" Check if option is defined in the loaded configuration file """
|
||||
return self._isset_in_config_file or self._isset_in_options
|
||||
|
||||
def get(self):
|
||||
""" Get option value from options, config or default """
|
||||
if self._isset_in_options:
|
||||
return self._from_options
|
||||
if self._isset_in_config_file:
|
||||
return self._from_config
|
||||
return self.default
|
||||
|
||||
def set(self, value):
|
||||
""" Set option value to config file """
|
||||
assert self.config.config_parser, "Can set option value only if configuration file is configured"
|
||||
if value == '':
|
||||
value = None
|
||||
|
||||
if value == self.default or value is None:
|
||||
# Remove option from config
|
||||
self.config.config_parser.remove_option(
|
||||
self.section.name, self.name)
|
||||
else:
|
||||
# Store option to config
|
||||
if not self.config.config_parser.has_section(self.section.name):
|
||||
self.config.config_parser.add_section(self.section.name)
|
||||
|
||||
self.config.config_parser.set(
|
||||
self.section.name, self.name, self.to_config(value)
|
||||
)
|
||||
|
||||
@property
|
||||
def parser_action(self):
|
||||
""" Get action as accept by argparse.ArgumentParser """
|
||||
return 'store'
|
||||
|
||||
@property
|
||||
def parser_type(self):
|
||||
""" Get type as handle by argparse.ArgumentParser """
|
||||
return str
|
||||
|
||||
@property
|
||||
def parser_dest(self):
|
||||
""" Get option name in arguments parser options """
|
||||
return '{0}_{1}'.format(self.section.name, self.name)
|
||||
|
||||
@property
|
||||
def parser_help(self):
|
||||
""" Get option help message in arguments parser options """
|
||||
if self.arg_help and self.default is not None:
|
||||
return '{0} (Default: {1})'.format(self.arg_help, self.default)
|
||||
if self.arg_help:
|
||||
return self.arg_help
|
||||
return None
|
||||
|
||||
@property
|
||||
def parser_argument_name(self):
|
||||
""" Get option argument name in parser options """
|
||||
return (
|
||||
self.arg if self.arg else
|
||||
'--{0}-{1}'.format(
|
||||
self.section.name, self.name
|
||||
).lower().replace('_', '-')
|
||||
)
|
||||
|
||||
def add_option_to_parser(self, section_opts):
|
||||
""" Add option to arguments parser """
|
||||
args = [self.parser_argument_name]
|
||||
if self.short_arg:
|
||||
args.append(self.short_arg)
|
||||
kwargs = dict(
|
||||
action=self.parser_action,
|
||||
dest=self.parser_dest,
|
||||
help=self.parser_help,
|
||||
default=self.default,
|
||||
)
|
||||
if self.parser_type: # pylint: disable=using-constant-test
|
||||
kwargs['type'] = self.parser_type
|
||||
|
||||
log.debug(
|
||||
'add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s',
|
||||
self.section.name, self.name, ', '.join(args), kwargs
|
||||
)
|
||||
section_opts.add_argument(*args, **kwargs)
|
||||
|
||||
def to_config(self, value=None):
|
||||
""" Format value as stored in configuration file """
|
||||
value = value if value is not None else self.get()
|
||||
return '' if value is None else str(value)
|
||||
|
||||
def export_to_config(self):
|
||||
""" Export option to configuration file """
|
||||
lines = []
|
||||
if self.comment:
|
||||
lines.append('# ' + self.comment)
|
||||
value = self.to_config()
|
||||
default_value = (
|
||||
'' if self.default is None else
|
||||
self.to_config(self.default)
|
||||
)
|
||||
log.debug(
|
||||
'export_to_config(%s, %s): value=%s / default=%s',
|
||||
self.section.name, self.name, value, default_value)
|
||||
if default_value:
|
||||
lines.append(
|
||||
'# Default: %s' % default_value
|
||||
)
|
||||
if value and value != default_value:
|
||||
lines.append(
|
||||
'%s = %s' %
|
||||
(self.name, value)
|
||||
)
|
||||
else:
|
||||
lines.append('# %s =' % self.name)
|
||||
lines.append('')
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
class StringOption(BaseOption):
|
||||
""" String configuration option class """
|
||||
|
||||
|
||||
class BooleanOption(BaseOption):
|
||||
""" Boolean configuration option class """
|
||||
|
||||
@property
|
||||
def _from_config(self):
|
||||
""" Get option value from ConfigParser """
|
||||
return self.config.config_parser.getboolean(self.section.name, self.name)
|
||||
|
||||
def to_config(self, value=None):
|
||||
""" Format value as stored in configuration file """
|
||||
return super().to_config(value).lower()
|
||||
|
||||
@property
|
||||
def parser_action(self):
|
||||
return "store_" + str(bool(not self.default)).lower()
|
||||
|
||||
@property
|
||||
def parser_type(self):
|
||||
return None
|
||||
|
||||
|
||||
class FloatOption(BaseOption):
|
||||
""" Float configuration option class """
|
||||
|
||||
@property
|
||||
def _from_config(self):
|
||||
""" Get option value from ConfigParser """
|
||||
return self.config.config_parser.getfloat(self.section.name, self.name)
|
||||
|
||||
@property
|
||||
def parser_type(self):
|
||||
return float
|
||||
|
||||
|
||||
class IntegerOption(BaseOption):
|
||||
""" Integer configuration option class """
|
||||
|
||||
@property
|
||||
def _from_config(self):
|
||||
""" Get option value from ConfigParser """
|
||||
return self.config.config_parser.getint(self.section.name, self.name)
|
||||
|
||||
def to_config(self, value=None):
|
||||
""" Format value as stored in configuration file """
|
||||
value = value if value is not None else self.get()
|
||||
return str(int(value)) if value is not None else ''
|
||||
|
||||
@property
|
||||
def parser_type(self):
|
||||
return int
|
||||
|
||||
|
||||
class PasswordOption(StringOption):
|
||||
""" Password configuration option class """
|
||||
|
||||
def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs):
|
||||
super().__init__(*arg, **kwargs)
|
||||
self.username_option = username_option
|
||||
self.keyring_value = keyring_value if keyring_value is not None else 'keyring'
|
||||
|
||||
@property
|
||||
def _keyring_service_name(self):
|
||||
""" Return keyring service name """
|
||||
return '.'.join([
|
||||
self.config.shortname,
|
||||
self.section.name, self.name
|
||||
])
|
||||
|
||||
@property
|
||||
def _keyring_username(self):
|
||||
""" Return keyring username """
|
||||
return self.section.get(self.username_option) if self.username_option else self.name
|
||||
|
||||
def get(self):
|
||||
""" Get option value """
|
||||
value = super().get()
|
||||
|
||||
if value != self.keyring_value:
|
||||
return value
|
||||
|
||||
service_name = self._keyring_service_name
|
||||
username = self._keyring_username
|
||||
log.debug(
|
||||
'Retreive password %s for username=%s from keyring',
|
||||
service_name, username
|
||||
)
|
||||
value = keyring.get_password(service_name, username)
|
||||
|
||||
if value is None:
|
||||
value = getpass(
|
||||
'Please enter %s%s: ' % (
|
||||
self.comment if self.comment else
|
||||
'%s %s' % (self.section.name, self.name),
|
||||
(' for %s' % username)
|
||||
if username != self.name else ''
|
||||
)
|
||||
)
|
||||
keyring.set_password(service_name, username, value)
|
||||
return value
|
||||
|
||||
def to_config(self, value=None):
|
||||
""" Format value as stored in configuration file """
|
||||
if super().get() == self.keyring_value:
|
||||
return self.keyring_value
|
||||
return super().to_config(value)
|
||||
|
||||
def set(self, value):
|
||||
""" Set option value to config file """
|
||||
if super().get() == self.keyring_value:
|
||||
keyring.set_password(
|
||||
self._keyring_service_name, self._keyring_username,
|
||||
value)
|
||||
value = self.keyring_value
|
||||
super().set(value)
|
||||
|
||||
|
||||
class ConfigSection:
|
||||
""" Configuration section class """
|
||||
|
||||
def __init__(self, config, name, comment=None, order=None):
|
||||
self.config = config
|
||||
self.name = name
|
||||
self.options = dict()
|
||||
self.comment = comment
|
||||
self.order = order if isinstance(order, int) else 10
|
||||
|
||||
def add_option(self, _type, name, **kwargs):
|
||||
"""
|
||||
Add option
|
||||
|
||||
:param _type: Option type, derivated from BaseOption
|
||||
:param name: Option name
|
||||
:param **kwargs: Dict of raw option for type class
|
||||
"""
|
||||
assert not self.defined(name), "Duplicated option %s" % name
|
||||
self.options[name] = _type(self.config, self, name, **kwargs)
|
||||
return self.options[name]
|
||||
|
||||
def defined(self, option):
|
||||
""" Check if option is defined """
|
||||
return option in self.options
|
||||
|
||||
def isset(self, option):
|
||||
""" Check if option is set """
|
||||
return self.defined(option) and self.options[option].isset()
|
||||
|
||||
def get(self, option):
|
||||
""" Get option value """
|
||||
assert self.defined(option), "Option %s unknown" % option
|
||||
return self.options[option].get()
|
||||
|
||||
def set(self, option, value):
|
||||
""" Set option value """
|
||||
assert self.defined(option), "Option %s unknown" % option
|
||||
return self.options[option].set(value)
|
||||
|
||||
def add_options_to_parser(self, parser):
|
||||
""" Add section to argparse.ArgumentParser """
|
||||
assert isinstance(parser, argparse.ArgumentParser)
|
||||
section_opts = parser.add_argument_group(
|
||||
self.comment if self.comment
|
||||
else self.name.capitalize()
|
||||
)
|
||||
|
||||
for option in self.options:
|
||||
self.options[option].add_option_to_parser(section_opts)
|
||||
|
||||
def export_to_config(self):
|
||||
""" Export section and their options to configuration file """
|
||||
lines = []
|
||||
if self.comment:
|
||||
lines.append('# %s' % self.comment)
|
||||
lines.append('[%s]' % self.name)
|
||||
for option in self.options:
|
||||
lines.append(self.options[option].export_to_config())
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
class Config:
|
||||
""" Configuration helper """
|
||||
|
||||
def __init__(self, appname, shortname=None, version=None, encoding=None):
|
||||
self.appname = appname
|
||||
self.shortname = shortname
|
||||
self.version = version if version else '0.0'
|
||||
self.encoding = encoding if encoding else 'utf-8'
|
||||
self.config_parser = None
|
||||
self.options_parser = None
|
||||
self.options = None
|
||||
self.sections = {}
|
||||
self._loaded_callbacks = []
|
||||
self._filepath = None
|
||||
|
||||
def add_section(self, name, loaded_callback=None, **kwargs):
|
||||
"""
|
||||
Add section
|
||||
|
||||
: param name: The section name
|
||||
: param loaded_callback: An optional callback method that will be executed after configuration is loaded
|
||||
The specified callback method will receive Config object as parameter.
|
||||
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
|
||||
"""
|
||||
assert name not in self.sections, "Duplicated section %s" % name
|
||||
|
||||
self.sections[name] = ConfigSection(self, name, **kwargs)
|
||||
if loaded_callback:
|
||||
self._loaded_callbacks.append(loaded_callback)
|
||||
# If configuration is already loaded, execute callback immediatly
|
||||
if self._filepath:
|
||||
loaded_callback(self)
|
||||
return self.sections[name]
|
||||
|
||||
def defined(self, section, option):
|
||||
""" Check option is defined in specified section """
|
||||
return section in self.sections and self.sections[section].defined(option)
|
||||
|
||||
def isset(self, section, option):
|
||||
""" Check option is set in specified section """
|
||||
return section in self.sections and self.sections[section].isset(option)
|
||||
|
||||
def get(self, section, option):
|
||||
""" Get option value """
|
||||
assert self.config_parser or self.options, 'Unconfigured options parser'
|
||||
assert self.defined(
|
||||
section, option), 'Unknown option %s.%s' % (section, option)
|
||||
value = self.sections[section].get(option)
|
||||
log.debug('get(%s, %s): %s (%s)', section, option, value, type(value))
|
||||
return value
|
||||
|
||||
def set(self, section, option, value):
|
||||
""" Set option value """
|
||||
assert self.config_parser, 'Unconfigured options parser'
|
||||
assert self.defined(
|
||||
section, option), 'Unknown option %s.%s' % (section, option)
|
||||
self.sections[section].set(option, value)
|
||||
|
||||
def load_file(self, filepath):
|
||||
""" Read configuration file """
|
||||
|
||||
self.config_parser = ConfigParser()
|
||||
self._filepath = filepath
|
||||
|
||||
# Checking access of configuration file
|
||||
if not os.path.isfile(filepath):
|
||||
return True
|
||||
|
||||
if not os.access(filepath, os.R_OK):
|
||||
return False
|
||||
|
||||
try:
|
||||
self.config_parser.read(filepath, encoding=self.encoding)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
self.config_parser = None
|
||||
log.exception('Failed to read configuration file %s', filepath)
|
||||
return False
|
||||
|
||||
# Logging initialization
|
||||
if self.config_parser.has_section('loggers'):
|
||||
fileConfig(filepath)
|
||||
else:
|
||||
# Otherwise, use systemd journal handler
|
||||
handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname)
|
||||
handler.setFormatter(
|
||||
logging.Formatter(
|
||||
'%(levelname)s | %(name)s | %(message)s'
|
||||
)
|
||||
)
|
||||
logging.getLogger().addHandler(handler)
|
||||
|
||||
self._filepath = filepath
|
||||
|
||||
# Execute loaded callbacks
|
||||
for callback in self._loaded_callbacks:
|
||||
callback(self)
|
||||
|
||||
return True
|
||||
|
||||
def save(self, filepath=None):
|
||||
""" Save configuration file """
|
||||
filepath = filepath if filepath else self._filepath
|
||||
assert filepath, 'Configuration filepath is not set or provided'
|
||||
|
||||
# Checking access of target directory
|
||||
dirpath = os.path.dirname(filepath)
|
||||
if not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK):
|
||||
log.error(
|
||||
'Configuration directory "%s" does not exist (or not writable)', dirpath)
|
||||
return False
|
||||
|
||||
if os.path.isfile(filepath) and not os.access(filepath, os.W_OK):
|
||||
log.error('Configuration file "%s" is not writable', filepath)
|
||||
return False
|
||||
|
||||
lines = ['#\n# %s configuration\n#\n' % self.appname]
|
||||
|
||||
for section_name in self._ordered_section_names:
|
||||
lines.append('')
|
||||
lines.append(self.sections[section_name].export_to_config())
|
||||
|
||||
try:
|
||||
with open(filepath, 'wb') as fd:
|
||||
fd.write(
|
||||
'\n'.join(lines).encode(self.encoding)
|
||||
)
|
||||
|
||||
# Privacy!
|
||||
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.exception(
|
||||
'Failed to write generated configuration file %s', filepath)
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def _ordered_section_names(self):
|
||||
""" Get ordered list of section names """
|
||||
return sorted(self.sections.keys(), key=lambda section: self.sections[section].order)
|
||||
|
||||
def get_arguments_parser(self, **kwargs):
|
||||
""" Get arguments parser """
|
||||
if self.options_parser:
|
||||
return self.options_parser
|
||||
|
||||
self.options_parser = argparse.ArgumentParser(
|
||||
description=self.appname, **kwargs)
|
||||
|
||||
self.options_parser.add_argument(
|
||||
'-c',
|
||||
'--config',
|
||||
default=self.config_filepath,
|
||||
help='Configuration file to use (default: %s)' %
|
||||
self.config_filepath
|
||||
)
|
||||
|
||||
self.options_parser.add_argument(
|
||||
'--save',
|
||||
action='store_true',
|
||||
dest='save',
|
||||
help='Save current configuration to file',
|
||||
)
|
||||
|
||||
self.options_parser.add_argument(
|
||||
'-d',
|
||||
'--debug',
|
||||
action='store_true',
|
||||
help='Show debug messages'
|
||||
)
|
||||
|
||||
self.options_parser.add_argument(
|
||||
'-v',
|
||||
'--verbose',
|
||||
action='store_true',
|
||||
help='Show verbose messages'
|
||||
)
|
||||
|
||||
self.add_options_to_parser(self.options_parser)
|
||||
|
||||
return self.options_parser
|
||||
|
||||
def parse_arguments_options(self, argv=None, parser=None, create=True, exit_after_created=True):
|
||||
"""
|
||||
Parse arguments options
|
||||
|
||||
:param argv: Optional arguments list to parse (default: sys.argv[1:])
|
||||
:param parser: Optional argparse.ArgumentParser use
|
||||
(default: generated by self.get_arguments_parser())
|
||||
:param create: If True, configuration file will be created if it does not exits
|
||||
(default: True)
|
||||
:param exit_after_created: If True, script will end after configuration file
|
||||
creation (default: True)
|
||||
"""
|
||||
parser = parser if parser else self.get_arguments_parser()
|
||||
argcomplete.autocomplete(parser)
|
||||
options = parser.parse_args(argv if argv else sys.argv[1:])
|
||||
|
||||
if options.config:
|
||||
options.config = os.path.abspath(options.config)
|
||||
|
||||
already_saved = False
|
||||
if not os.path.isfile(options.config) and (create or options.save):
|
||||
log.warning(
|
||||
"Configuration file is missing, generate it (%s)",
|
||||
options.config
|
||||
)
|
||||
self.save(options.config)
|
||||
if exit_after_created:
|
||||
sys.exit(0)
|
||||
already_saved = True
|
||||
|
||||
# Load configuration file
|
||||
if os.path.isfile(options.config) and not self.load_file(options.config):
|
||||
parser.error(
|
||||
'Failed to load configuration from file %s' % options.config
|
||||
)
|
||||
|
||||
if options.save and not already_saved:
|
||||
self.save()
|
||||
sys.exit(0)
|
||||
|
||||
if options.debug:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif options.verbose:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
self.load_options(options)
|
||||
return options
|
||||
|
||||
def load_options(self, options):
|
||||
""" Register arguments parser options """
|
||||
assert isinstance(options, argparse.Namespace)
|
||||
self.options = options
|
||||
log.debug('Argument options: %s', options)
|
||||
|
||||
def add_options_to_parser(self, parser):
|
||||
""" Add sections and their options to parser """
|
||||
for section in self._ordered_section_names:
|
||||
self.sections[section].add_options_to_parser(parser)
|
||||
|
||||
@property
|
||||
def config_dir(self):
|
||||
""" Retrieve configuration directory path """
|
||||
return os.path.dirname(self._filepath) if self._filepath else DEFAULT_CONFIG_DIRPATH
|
||||
|
||||
@property
|
||||
def config_filepath(self):
|
||||
""" Retrieve configuration file path """
|
||||
if self._filepath:
|
||||
return self._filepath
|
||||
return os.path.join(
|
||||
self.config_dir,
|
||||
("%s.ini" % self.shortname) if self.shortname
|
||||
else "config.ini"
|
||||
)
|
6
setup.py
6
setup.py
|
@ -25,6 +25,12 @@ setup(
|
|||
'pytest',
|
||||
'pylint',
|
||||
],
|
||||
'config': [
|
||||
'configparser',
|
||||
'argcomplete',
|
||||
'keyring',
|
||||
'systemd-python',
|
||||
],
|
||||
'ldap': [
|
||||
'python-ldap',
|
||||
'python-dateutil',
|
||||
|
|
Loading…
Reference in a new issue