python-mylib/mylib/config.py
Benjamin Renard 8438f95e68 Config: Add config_file_env_variable parameter
The new config_file_env_variable parameter allow to specify environment
variable name that will be used to determine configuration file path if
it's not explicitly provided using -c/--config parameter.
2021-11-19 16:16:46 +01:00

697 lines
23 KiB
Python

# -*- coding: utf-8 -*-
""" Configuration & options parser """
import argparse
from configparser import ConfigParser
from getpass import getpass
from logging.config import fileConfig
import logging
import os
import re
import stat
import sys
import textwrap
import argcomplete
import keyring
from systemd.journal import JournalHandler
log = logging.getLogger(__name__)
# Constants
DEFAULT_ENCODING = 'utf-8'
DEFAULT_CONFIG_DIRPATH = os.path.expanduser('./')
class BaseOption:
""" Base configuration option class """
def __init__(self, config, section, name, default=None, comment=None,
arg=None, short_arg=None, arg_help=None, no_arg=False):
self.config = config
self.section = section
self.name = name
self.default = default
self.comment = comment
self.no_arg = no_arg
self.arg = arg
self.short_arg = short_arg
self.arg_help = arg_help if arg_help else comment
@property
def _isset_in_options(self):
""" Check if option is defined in registered arguments parser options """
return (
self.config.options
and not self.no_arg
and self._from_options != self.default
)
@property
def _from_options(self):
""" Get option from arguments parser options """
value = (
getattr(self.config.options, self.parser_dest)
if self.config.options and not self.no_arg else None
)
log.debug(
'_from_options(%s, %s) = %s',
self.section.name, self.name, value
)
return value
@property
def _isset_in_config_file(self):
""" Check if option is defined in the loaded configuration file """
return (
self.config.config_parser
and self.config.config_parser.has_option(self.section.name, self.name)
)
@property
def _from_config(self):
""" Get option value from ConfigParser """
return self.config.config_parser.get(self.section.name, self.name)
def isset(self):
""" Check if option is defined in the loaded configuration file """
return self._isset_in_config_file or self._isset_in_options
def get(self):
""" Get option value from options, config or default """
if self._isset_in_options:
return self._from_options
if self._isset_in_config_file:
return self._from_config
return self.default
def set(self, value):
""" Set option value to config file """
assert self.config.config_parser, "Can set option value only if configuration file is configured"
if value == '':
value = None
if value == self.default or value is None:
# Remove option from config
self.config.config_parser.remove_option(
self.section.name, self.name)
else:
# Store option to config
if not self.config.config_parser.has_section(self.section.name):
self.config.config_parser.add_section(self.section.name)
self.config.config_parser.set(
self.section.name, self.name, self.to_config(value)
)
@property
def parser_action(self):
""" Get action as accept by argparse.ArgumentParser """
return 'store'
@property
def parser_type(self):
""" Get type as handle by argparse.ArgumentParser """
return str
@property
def parser_dest(self):
""" Get option name in arguments parser options """
return '{0}_{1}'.format(self.section.name, self.name)
@property
def parser_help(self):
""" Get option help message in arguments parser options """
if self.arg_help and self.default is not None:
return '{0} (Default: {1})'.format(self.arg_help, self.default)
if self.arg_help:
return self.arg_help
return None
@property
def parser_argument_name(self):
""" Get option argument name in parser options """
return (
self.arg if self.arg else
'--{0}-{1}'.format(
self.section.name, self.name
).lower().replace('_', '-')
)
def add_option_to_parser(self, section_opts):
""" Add option to arguments parser """
if self.no_arg:
return
args = [self.parser_argument_name]
if self.short_arg:
args.append(self.short_arg)
kwargs = dict(
action=self.parser_action,
dest=self.parser_dest,
help=self.parser_help,
default=self.default,
)
if self.parser_type: # pylint: disable=using-constant-test
kwargs['type'] = self.parser_type
log.debug(
'add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s',
self.section.name, self.name, ', '.join(args), kwargs
)
section_opts.add_argument(*args, **kwargs)
def to_config(self, value=None):
""" Format value as stored in configuration file """
value = value if value is not None else self.get()
return '' if value is None else str(value)
def export_to_config(self):
""" Export option to configuration file """
lines = []
if self.comment:
lines.append('# ' + self.comment)
value = self.to_config()
default_value = (
'' if self.default is None else
self.to_config(self.default)
)
log.debug(
'export_to_config(%s, %s): value=%s / default=%s',
self.section.name, self.name, value, default_value)
if default_value:
lines.append(
'# Default: %s' % default_value
)
if value and value != default_value:
lines.append(
'%s = %s' %
(self.name, value)
)
else:
lines.append('# %s =' % self.name)
lines.append('')
return '\n'.join(lines)
class StringOption(BaseOption):
""" String configuration option class """
class BooleanOption(BaseOption):
""" Boolean configuration option class """
@property
def _from_config(self):
""" Get option value from ConfigParser """
return self.config.config_parser.getboolean(self.section.name, self.name)
def to_config(self, value=None):
""" Format value as stored in configuration file """
return super().to_config(value).lower()
@property
def parser_action(self):
return "store_" + str(bool(not self.default)).lower()
@property
def parser_type(self):
return None
@property
def parser_argument_name(self):
""" Get option argument name in parser options """
return (
self.arg if self.arg else
'--{0}-{1}-{2}'.format(
self.section.name,
'enable' if not self.default else 'disable',
self.name
).lower().replace('_', '-')
)
class FloatOption(BaseOption):
""" Float configuration option class """
@property
def _from_config(self):
""" Get option value from ConfigParser """
return self.config.config_parser.getfloat(self.section.name, self.name)
@property
def parser_type(self):
return float
class IntegerOption(BaseOption):
""" Integer configuration option class """
@property
def _from_config(self):
""" Get option value from ConfigParser """
return self.config.config_parser.getint(self.section.name, self.name)
def to_config(self, value=None):
""" Format value as stored in configuration file """
value = value if value is not None else self.get()
return str(int(value)) if value is not None else ''
@property
def parser_type(self):
return int
class PasswordOption(StringOption):
""" Password configuration option class """
def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs):
super().__init__(*arg, **kwargs)
self.username_option = username_option
self.keyring_value = keyring_value if keyring_value is not None else 'keyring'
@property
def _keyring_service_name(self):
""" Return keyring service name """
return '.'.join([
self.config.shortname,
self.section.name, self.name
])
@property
def _keyring_username(self):
""" Return keyring username """
return self.section.get(self.username_option) if self.username_option else self.name
def get(self):
""" Get option value """
value = super().get()
if value != self.keyring_value:
return value
service_name = self._keyring_service_name
username = self._keyring_username
log.debug(
'Retreive password %s for username=%s from keyring',
service_name, username
)
value = keyring.get_password(service_name, username)
if value is None:
value = getpass(
'Please enter %s%s: ' % (
self.comment if self.comment else
'%s %s' % (self.section.name, self.name),
(' for %s' % username)
if username != self.name else ''
)
)
keyring.set_password(service_name, username, value)
return value
def to_config(self, value=None):
""" Format value as stored in configuration file """
if super().get() == self.keyring_value:
return self.keyring_value
return super().to_config(value)
def set(self, value):
""" Set option value to config file """
if super().get() == self.keyring_value:
keyring.set_password(
self._keyring_service_name, self._keyring_username,
value)
value = self.keyring_value
super().set(value)
class ConfigSection:
""" Configuration section class """
def __init__(self, config, name, comment=None, order=None):
self.config = config
self.name = name
self.options = dict()
self.comment = comment
self.order = order if isinstance(order, int) else 10
def add_option(self, _type, name, **kwargs):
"""
Add option
:param _type: Option type, derivated from BaseOption
:param name: Option name
:param **kwargs: Dict of raw option for type class
"""
assert not self.defined(name), "Duplicated option %s" % name
self.options[name] = _type(self.config, self, name, **kwargs)
return self.options[name]
def defined(self, option):
""" Check if option is defined """
return option in self.options
def isset(self, option):
""" Check if option is set """
return self.defined(option) and self.options[option].isset()
def get(self, option):
""" Get option value """
assert self.defined(option), "Option %s unknown" % option
return self.options[option].get()
def set(self, option, value):
""" Set option value """
assert self.defined(option), "Option %s unknown" % option
return self.options[option].set(value)
def add_options_to_parser(self, parser):
""" Add section to argparse.ArgumentParser """
assert isinstance(parser, argparse.ArgumentParser)
section_opts = parser.add_argument_group(
self.comment if self.comment
else self.name.capitalize()
)
for option in self.options:
self.options[option].add_option_to_parser(section_opts)
def export_to_config(self):
""" Export section and their options to configuration file """
lines = []
if self.comment:
lines.append('# %s' % self.comment)
lines.append('[%s]' % self.name)
for option in self.options:
lines.append(self.options[option].export_to_config())
return '\n'.join(lines)
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
def _split_lines(self, text, width):
result = []
for line in textwrap.dedent(text).splitlines():
# Keep empty line
if line == "":
result.append(line)
continue
# Split ident prefix and line text
m = re.match('^( *)(.*)$', line)
ident = m.group(1)
line_text = m.group(2)
# Wrap each lines and add in result with ident prefix
for l in textwrap.wrap(line_text, width - len(ident)):
result.append(ident + l)
return result
class Config:
""" Configuration helper """
def __init__(self, appname, shortname=None, version=None, encoding=None,
config_file_env_variable=None):
self.appname = appname
self.shortname = shortname
self.version = version if version else '0.0'
self.encoding = encoding if encoding else 'utf-8'
self.config_parser = None
self.options_parser = None
self.options = None
self.sections = {}
self._loaded_callbacks = []
self._loaded_callbacks_executed = []
self._filepath = None
self.config_file_env_variable = config_file_env_variable
def add_section(self, name, loaded_callback=None, **kwargs):
"""
Add section
: param name: The section name
: param loaded_callback: An optional callback method that will be executed after configuration is loaded
The specified callback method will receive Config object as parameter.
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
"""
assert name not in self.sections, "Duplicated section %s" % name
self.sections[name] = ConfigSection(self, name, **kwargs)
if loaded_callback:
self._loaded_callbacks.append(loaded_callback)
# If configuration is already loaded, execute callback immediatly
if self.config_parser or self.options:
self._loaded()
return self.sections[name]
def defined(self, section, option):
""" Check option is defined in specified section """
return section in self.sections and self.sections[section].defined(option)
def isset(self, section, option):
""" Check option is set in specified section """
return section in self.sections and self.sections[section].isset(option)
def get(self, section, option):
""" Get option value """
assert self.config_parser or self.options, 'Unconfigured options parser'
assert self.defined(
section, option), 'Unknown option %s.%s' % (section, option)
value = self.sections[section].get(option)
log.debug('get(%s, %s): %s (%s)', section, option, value, type(value))
return value
def set(self, section, option, value):
""" Set option value """
assert self.config_parser, 'Unconfigured options parser'
assert self.defined(
section, option), 'Unknown option %s.%s' % (section, option)
self.sections[section].set(option, value)
def load_file(self, filepath, execute_callback=True):
""" Read configuration file """
self.config_parser = ConfigParser()
self._filepath = filepath
# Checking access of configuration file
if not os.path.isfile(filepath):
return True
if not os.access(filepath, os.R_OK):
return False
try:
self.config_parser.read(filepath, encoding=self.encoding)
except Exception: # pylint: disable=broad-except
self.config_parser = None
log.exception('Failed to read configuration file %s', filepath)
return False
# Logging initialization
if self.config_parser.has_section('loggers'):
fileConfig(filepath)
else:
# Otherwise, use systemd journal handler
handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname)
handler.setFormatter(
logging.Formatter(
'%(levelname)s | %(name)s | %(message)s'
)
)
logging.getLogger().addHandler(handler)
self._filepath = filepath
if execute_callback:
self._loaded()
return True
def _loaded(self):
""" Execute loaded callbacks """
for callback in self._loaded_callbacks:
if callback in self._loaded_callbacks_executed:
continue
callback(self)
self._loaded_callbacks_executed.append(callback)
def save(self, filepath=None):
""" Save configuration file """
filepath = filepath if filepath else self._filepath
assert filepath, 'Configuration filepath is not set or provided'
# Checking access of target directory
dirpath = os.path.dirname(filepath)
if not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK):
log.error(
'Configuration directory "%s" does not exist (or not writable)', dirpath)
return False
if os.path.isfile(filepath) and not os.access(filepath, os.W_OK):
log.error('Configuration file "%s" is not writable', filepath)
return False
lines = ['#\n# %s configuration\n#\n' % self.appname]
for section_name in self._ordered_section_names:
lines.append('')
lines.append(self.sections[section_name].export_to_config())
try:
with open(filepath, 'wb') as fd:
fd.write(
'\n'.join(lines).encode(self.encoding)
)
# Privacy!
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
except Exception: # pylint: disable=broad-except
log.exception(
'Failed to write generated configuration file %s', filepath)
return False
return True
@property
def _ordered_section_names(self):
""" Get ordered list of section names """
return sorted(self.sections.keys(), key=lambda section: self.sections[section].order)
def get_arguments_parser(self, **kwargs):
""" Get arguments parser """
if self.options_parser:
return self.options_parser
self.options_parser = argparse.ArgumentParser(
description=kwargs.pop('description', self.appname),
formatter_class=RawWrappedTextHelpFormatter,
**kwargs)
config_file_help = 'Configuration file to use (default: %s)' % self.config_filepath
if self.config_file_env_variable:
config_file_help += '\n\nYou also could set %s environment variable to specify your configuration file path.' % self.config_file_env_variable
self.options_parser.add_argument(
'-c',
'--config',
default=self.config_filepath,
help=config_file_help
)
self.options_parser.add_argument(
'--save',
action='store_true',
dest='save',
help='Save current configuration to file',
)
self.options_parser.add_argument(
'-d',
'--debug',
action='store_true',
help='Show debug messages'
)
self.options_parser.add_argument(
'-v',
'--verbose',
action='store_true',
help='Show verbose messages'
)
self.options_parser.add_argument(
'-C',
'--console',
action='store_true',
help='Log on console'
)
self.add_options_to_parser(self.options_parser)
return self.options_parser
def parse_arguments_options(self, argv=None, parser=None, create=True, exit_after_created=True):
"""
Parse arguments options
:param argv: Optional arguments list to parse (default: sys.argv[1:])
:param parser: Optional argparse.ArgumentParser use
(default: generated by self.get_arguments_parser())
:param create: If True, configuration file will be created if it does not exits
(default: True)
:param exit_after_created: If True, script will end after configuration file
creation (default: True)
"""
parser = parser if parser else self.get_arguments_parser()
argcomplete.autocomplete(parser)
options = parser.parse_args(argv if argv is not None else sys.argv[1:])
self.load_options(options, execute_callback=False)
if options.config:
options.config = os.path.abspath(options.config)
already_saved = False
if not os.path.isfile(options.config) and (create or options.save):
log.warning(
"Configuration file is missing, generate it (%s)",
options.config
)
self.save(options.config)
if exit_after_created:
sys.exit(0)
already_saved = True
# Load configuration file
if os.path.isfile(options.config) and not self.load_file(options.config, execute_callback=False):
parser.error(
'Failed to load configuration from file %s' % options.config
)
if options.save and not already_saved:
self.save()
sys.exit(0)
if options.debug:
logging.getLogger().setLevel(logging.DEBUG)
elif options.verbose:
logging.getLogger().setLevel(logging.INFO)
if options.console:
console_handler = logging.StreamHandler(sys.stdout)
logging.getLogger().addHandler(console_handler)
self._loaded()
return options
def load_options(self, options, execute_callback=True):
""" Register arguments parser options """
assert isinstance(options, argparse.Namespace)
self.options = options
log.debug('Argument options: %s', options)
if execute_callback:
self._loaded()
def add_options_to_parser(self, parser):
""" Add sections and their options to parser """
for section in self._ordered_section_names:
self.sections[section].add_options_to_parser(parser)
@property
def config_dir(self):
""" Retrieve configuration directory path """
return os.path.dirname(self._filepath) if self._filepath else DEFAULT_CONFIG_DIRPATH
@property
def config_filepath(self):
""" Retrieve configuration file path """
if self._filepath:
return self._filepath
if self.config_file_env_variable and os.environ.get(self.config_file_env_variable):
return os.environ.get(self.config_file_env_variable)
return os.path.join(
self.config_dir,
("%s.ini" % self.shortname) if self.shortname
else "config.ini"
)