bca3f4f347
The argparse option now reverse the default value set in configuration file instead of only the default value set on adding the configuration option.
1071 lines
38 KiB
Python
1071 lines
38 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
""" Configuration & options parser """
|
|
|
|
import argparse
|
|
from configparser import ConfigParser
|
|
from getpass import getpass
|
|
from logging.config import fileConfig
|
|
import logging
|
|
import os
|
|
import re
|
|
import stat
|
|
import sys
|
|
import textwrap
|
|
import traceback
|
|
|
|
import argcomplete
|
|
import keyring
|
|
from systemd.journal import JournalHandler
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# Constants
|
|
DEFAULT_ENCODING = 'utf-8'
|
|
DEFAULT_CONFIG_DIRPATH = os.path.expanduser('./')
|
|
DEFAULT_CONSOLE_LOG_FORMAT = '%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s'
|
|
|
|
|
|
class BaseOption: # pylint: disable=too-many-instance-attributes
|
|
""" Base configuration option class """
|
|
|
|
def __init__(self, config, section, name, default=None, comment=None,
|
|
arg=None, short_arg=None, arg_help=None, no_arg=False):
|
|
self.config = config
|
|
self.section = section
|
|
self.name = name
|
|
self.default = default
|
|
self.comment = comment
|
|
self.no_arg = no_arg
|
|
self.arg = arg
|
|
self.short_arg = short_arg
|
|
self.arg_help = arg_help if arg_help else comment
|
|
|
|
@property
|
|
def _isset_in_options(self):
|
|
""" Check if option is defined in registered arguments parser options """
|
|
return (
|
|
self.config.options
|
|
and not self.no_arg
|
|
and self._from_options != self.default
|
|
)
|
|
|
|
@property
|
|
def _from_options(self):
|
|
""" Get option from arguments parser options """
|
|
value = (
|
|
getattr(self.config.options, self.parser_dest)
|
|
if self.config.options and not self.no_arg else None
|
|
)
|
|
log.debug(
|
|
'_from_options(%s, %s) = %s',
|
|
self.section.name, self.name, value
|
|
)
|
|
return value
|
|
|
|
@property
|
|
def _isset_in_config_file(self):
|
|
""" Check if option is defined in the loaded configuration file """
|
|
return (
|
|
self.config.config_parser
|
|
and self.config.config_parser.has_option(self.section.name, self.name)
|
|
)
|
|
|
|
@property
|
|
def _from_config(self):
|
|
""" Get option value from ConfigParser """
|
|
return self.config.config_parser.get(self.section.name, self.name)
|
|
|
|
@property
|
|
def _default_in_config(self):
|
|
""" Get option default value considering current value from configuration """
|
|
return (
|
|
self._from_config if self._isset_in_config_file
|
|
else self.default
|
|
)
|
|
|
|
def isset(self):
|
|
""" Check if option is defined in the loaded configuration file """
|
|
return self._isset_in_config_file or self._isset_in_options
|
|
|
|
def get(self):
|
|
""" Get option value from options, config or default """
|
|
if self._isset_in_options:
|
|
return self._from_options
|
|
if self._isset_in_config_file:
|
|
return self._from_config
|
|
return self.default
|
|
|
|
def set(self, value):
|
|
""" Set option value to config file """
|
|
assert self.config.config_parser or (self.config.options and not self.no_arg), (
|
|
"Can't set option value: configuration file not configured, not options (or no argument)")
|
|
if value == '':
|
|
value = None
|
|
|
|
if self.config.config_parser:
|
|
if value == self.default or value is None:
|
|
# Remove option from config (is section exists)
|
|
if self.config.config_parser.has_section(self.section.name):
|
|
self.config.config_parser.remove_option(
|
|
self.section.name, self.name)
|
|
else:
|
|
# Store option to config
|
|
if not self.config.config_parser.has_section(self.section.name):
|
|
self.config.config_parser.add_section(self.section.name)
|
|
|
|
self.config.config_parser.set(
|
|
self.section.name, self.name, self.to_config(value)
|
|
)
|
|
|
|
if self.config.options and not self.no_arg:
|
|
setattr(self.config.options, self.parser_dest, value)
|
|
|
|
@property
|
|
def parser_action(self):
|
|
""" Get action as accept by argparse.ArgumentParser """
|
|
return 'store'
|
|
|
|
@property
|
|
def parser_type(self):
|
|
""" Get type as handle by argparse.ArgumentParser """
|
|
return str
|
|
|
|
@property
|
|
def parser_dest(self):
|
|
""" Get option name in arguments parser options """
|
|
return '{0}_{1}'.format(self.section.name, self.name)
|
|
|
|
@property
|
|
def parser_help(self):
|
|
""" Get option help message in arguments parser options """
|
|
if self.arg_help and self.default is not None:
|
|
return '{0} (Default: {1})'.format(
|
|
self.arg_help, re.sub(r'%([^%])', r'%%\1', str(self.default)))
|
|
if self.arg_help:
|
|
return self.arg_help
|
|
return None
|
|
|
|
@property
|
|
def parser_argument_name(self):
|
|
""" Get option argument name in parser options """
|
|
return (
|
|
self.arg if self.arg else
|
|
'--{0}-{1}'.format(
|
|
self.section.name, self.name
|
|
).lower().replace('_', '-')
|
|
)
|
|
|
|
def add_option_to_parser(self, section_opts):
|
|
""" Add option to arguments parser """
|
|
if self.no_arg:
|
|
return
|
|
args = [self.parser_argument_name]
|
|
if self.short_arg:
|
|
args.append(self.short_arg)
|
|
kwargs = dict(
|
|
action=self.parser_action,
|
|
dest=self.parser_dest,
|
|
help=self.parser_help,
|
|
default=self.default,
|
|
)
|
|
if self.parser_type: # pylint: disable=using-constant-test
|
|
kwargs['type'] = self.parser_type
|
|
|
|
log.debug(
|
|
'add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s',
|
|
self.section.name, self.name, ', '.join(args), kwargs
|
|
)
|
|
section_opts.add_argument(*args, **kwargs)
|
|
|
|
def to_config(self, value=None):
|
|
""" Format value as stored in configuration file """
|
|
value = value if value is not None else self.get()
|
|
return '' if value is None else str(value)
|
|
|
|
def export_to_config(self):
|
|
""" Export option to configuration file """
|
|
lines = []
|
|
if self.comment:
|
|
lines.append('# ' + self.comment)
|
|
value = self.to_config()
|
|
default_value = (
|
|
'' if self.default is None else
|
|
self.to_config(self.default)
|
|
)
|
|
log.debug(
|
|
'export_to_config(%s, %s): value=%s / default=%s',
|
|
self.section.name, self.name, value, default_value)
|
|
if default_value:
|
|
lines.append(
|
|
'# Default: %s' % default_value
|
|
)
|
|
if value and value != default_value:
|
|
lines.append(
|
|
'%s = %s' %
|
|
(self.name, value)
|
|
)
|
|
else:
|
|
lines.append('# %s =' % self.name)
|
|
lines.append('')
|
|
return '\n'.join(lines)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
""" Ask to user to enter value of this option and return it """
|
|
if self.comment:
|
|
print('# ' + self.comment)
|
|
default_value = kwargs.get('default_value', self.get())
|
|
if not prompt:
|
|
prompt = "%s: " % self.name
|
|
if default_value is not None:
|
|
prompt += "[%s] " % self.to_config(default_value)
|
|
value = input(prompt)
|
|
return default_value if value == '' else value
|
|
|
|
def ask_value(self, set_it=True):
|
|
"""
|
|
Ask to user to enter value of this option and set or
|
|
return it regarding set parameter
|
|
"""
|
|
value = self._ask_value()
|
|
if set_it:
|
|
return self.set(value)
|
|
return value
|
|
|
|
|
|
class StringOption(BaseOption):
|
|
""" String configuration option class """
|
|
|
|
|
|
class BooleanOption(BaseOption):
|
|
""" Boolean configuration option class """
|
|
|
|
@property
|
|
def _from_config(self):
|
|
""" Get option value from ConfigParser """
|
|
return self.config.config_parser.getboolean(self.section.name, self.name)
|
|
|
|
def to_config(self, value=None):
|
|
""" Format value as stored in configuration file """
|
|
return super().to_config(value).lower()
|
|
|
|
@property
|
|
def _isset_in_options(self):
|
|
""" Check if option is defined in registered arguments parser options """
|
|
return (
|
|
self.config.options
|
|
and not self.no_arg
|
|
and getattr(self.config.options, self.parser_dest)
|
|
)
|
|
|
|
@property
|
|
def _from_options(self):
|
|
""" Get option from arguments parser options """
|
|
return (
|
|
not self._default_in_config if self._isset_in_options
|
|
else self._default_in_config
|
|
)
|
|
|
|
@property
|
|
def parser_action(self):
|
|
return "store_true"
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return None
|
|
|
|
@property
|
|
def parser_argument_name(self):
|
|
""" Get option argument name in parser options """
|
|
return (
|
|
self.arg if self.arg else
|
|
'--{0}-{1}-{2}'.format(
|
|
self.section.name,
|
|
'enable' if not self._default_in_config else 'disable',
|
|
self.name
|
|
).lower().replace('_', '-')
|
|
)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
""" Ask to user to enter value of this option and return it """
|
|
default_value = self.get()
|
|
prompt = "%s: " % self.name
|
|
if default_value:
|
|
prompt += '[Y/n] '
|
|
else:
|
|
prompt += '[y/N] '
|
|
while True:
|
|
value = super()._ask_value(prompt, **kwargs)
|
|
if value in ['', None, default_value]:
|
|
return default_value
|
|
if value.lower() == 'y':
|
|
return True
|
|
if value.lower() == 'n':
|
|
return False
|
|
|
|
print('Invalid answer. Possible values: Y or N (case insensitive)')
|
|
|
|
|
|
class FloatOption(BaseOption):
|
|
""" Float configuration option class """
|
|
|
|
@property
|
|
def _from_config(self):
|
|
""" Get option value from ConfigParser """
|
|
return self.config.config_parser.getfloat(self.section.name, self.name)
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return float
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
""" Ask to user to enter value of this option and return it """
|
|
default_value = self.get()
|
|
while True:
|
|
value = super()._ask_value(prompt, **kwargs)
|
|
if value in ['', None, default_value]:
|
|
return default_value
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
print('Invalid answer. Must a numeric value, for instance "12" or "12.5"')
|
|
|
|
|
|
class IntegerOption(BaseOption):
|
|
""" Integer configuration option class """
|
|
|
|
@property
|
|
def _from_config(self):
|
|
""" Get option value from ConfigParser """
|
|
return self.config.config_parser.getint(self.section.name, self.name)
|
|
|
|
def to_config(self, value=None):
|
|
""" Format value as stored in configuration file """
|
|
value = value if value is not None else self.get()
|
|
return str(int(value)) if value is not None else ''
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return int
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
""" Ask to user to enter value of this option and return it """
|
|
default_value = kwargs.pop('default_value', self.get())
|
|
while True:
|
|
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
|
|
if value in ['', None, default_value]:
|
|
return default_value
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
print('Invalid answer. Must a integer value')
|
|
|
|
|
|
class PasswordOption(StringOption):
|
|
""" Password configuration option class """
|
|
|
|
def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs):
|
|
super().__init__(*arg, **kwargs)
|
|
self.username_option = username_option
|
|
self.keyring_value = keyring_value if keyring_value is not None else 'keyring'
|
|
|
|
@property
|
|
def _keyring_service_name(self):
|
|
""" Return keyring service name """
|
|
return '.'.join([
|
|
self.config.shortname,
|
|
self.section.name, self.name
|
|
])
|
|
|
|
@property
|
|
def _keyring_username(self):
|
|
""" Return keyring username """
|
|
return self.section.get(self.username_option) if self.username_option else self.name
|
|
|
|
def get(self):
|
|
""" Get option value """
|
|
value = super().get()
|
|
|
|
if value != self.keyring_value:
|
|
return value
|
|
|
|
service_name = self._keyring_service_name
|
|
username = self._keyring_username
|
|
log.debug(
|
|
'Retreive password %s for username=%s from keyring',
|
|
service_name, username
|
|
)
|
|
value = keyring.get_password(service_name, username)
|
|
|
|
if value is None:
|
|
value = getpass(
|
|
'Please enter %s%s: ' % (
|
|
self.comment if self.comment else
|
|
'%s %s' % (self.section.name, self.name),
|
|
(' for %s' % username)
|
|
if username != self.name else ''
|
|
)
|
|
)
|
|
keyring.set_password(service_name, username, value)
|
|
return value
|
|
|
|
def to_config(self, value=None):
|
|
""" Format value as stored in configuration file """
|
|
if super().get() == self.keyring_value:
|
|
return self.keyring_value
|
|
return super().to_config(value)
|
|
|
|
def set(self, value, use_keyring=None): # pylint: disable=arguments-differ
|
|
""" Set option value to config file """
|
|
if (use_keyring is None and super().get() == self.keyring_value) or use_keyring:
|
|
keyring.set_password(
|
|
self._keyring_service_name, self._keyring_username,
|
|
value)
|
|
value = self.keyring_value
|
|
super().set(value)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
""" Ask to user to enter value of this option and return it """
|
|
if self.comment:
|
|
print('# ' + self.comment)
|
|
default_value = kwargs.pop('default_value', self.get())
|
|
if not prompt:
|
|
prompt = '%s: ' % self.name
|
|
if default_value is not None:
|
|
# Hide value only if it differed from default value
|
|
if default_value == self.default:
|
|
prompt += '[%s] ' % default_value
|
|
else:
|
|
prompt += '[secret defined, leave to empty to keep it as unchange] '
|
|
value = getpass(prompt)
|
|
return default_value if value == '' else value
|
|
|
|
def ask_value(self, set_it=True):
|
|
"""
|
|
Ask to user to enter value of this option and set or
|
|
return it regarding set parameter
|
|
"""
|
|
value = self._ask_value()
|
|
if set_it:
|
|
use_keyring = None
|
|
default_use_keyring = (super().get() == self.keyring_value)
|
|
while use_keyring is None:
|
|
prompt = (
|
|
'Do you want to use XDG keyring ? [%s] ' %
|
|
('Y/n' if default_use_keyring else 'y/N')
|
|
)
|
|
result = input(prompt).lower()
|
|
if result == '':
|
|
use_keyring = default_use_keyring
|
|
elif result == 'y':
|
|
use_keyring = True
|
|
elif result == 'n':
|
|
use_keyring = False
|
|
else:
|
|
print('Invalid answer. Possible values: Y or N (case insensitive)')
|
|
return self.set(value, use_keyring=use_keyring)
|
|
return value
|
|
|
|
|
|
class ConfigSection:
|
|
""" Configuration section class """
|
|
|
|
def __init__(self, config, name, comment=None, order=None):
|
|
self.config = config
|
|
self.name = name
|
|
self.options = dict()
|
|
self.comment = comment
|
|
self.order = order if isinstance(order, int) else 10
|
|
|
|
def add_option(self, _type, name, **kwargs):
|
|
"""
|
|
Add option
|
|
|
|
:param _type: Option type, derivated from BaseOption
|
|
:param name: Option name
|
|
:param **kwargs: Dict of raw option for type class
|
|
"""
|
|
assert not self.defined(name), "Duplicated option %s" % name
|
|
self.options[name] = _type(self.config, self, name, **kwargs)
|
|
return self.options[name]
|
|
|
|
def defined(self, option):
|
|
""" Check if option is defined """
|
|
return option in self.options
|
|
|
|
def isset(self, option):
|
|
""" Check if option is set """
|
|
return self.defined(option) and self.options[option].isset()
|
|
|
|
def get(self, option):
|
|
""" Get option value """
|
|
assert self.defined(option), "Option %s unknown" % option
|
|
return self.options[option].get()
|
|
|
|
def set(self, option, value):
|
|
""" Set option value """
|
|
assert self.defined(option), "Option %s unknown" % option
|
|
return self.options[option].set(value)
|
|
|
|
def add_options_to_parser(self, parser):
|
|
""" Add section to argparse.ArgumentParser """
|
|
assert isinstance(parser, argparse.ArgumentParser)
|
|
section_opts = parser.add_argument_group(
|
|
self.comment if self.comment
|
|
else self.name.capitalize()
|
|
)
|
|
|
|
for option in self.options:
|
|
self.options[option].add_option_to_parser(section_opts)
|
|
|
|
def export_to_config(self):
|
|
""" Export section and their options to configuration file """
|
|
lines = []
|
|
if self.comment:
|
|
lines.append('# %s' % self.comment)
|
|
lines.append('[%s]' % self.name)
|
|
for option in self.options:
|
|
lines.append(self.options[option].export_to_config())
|
|
return '\n'.join(lines)
|
|
|
|
def ask_values(self, set_it=True):
|
|
"""
|
|
Ask user to enter value for each configuration option of the section
|
|
|
|
:param set_it: If True (default), option value will be updated with user input
|
|
|
|
:return: If set_it is True, return True if valid value for each configuration
|
|
option have been retrieved and set. If False, return a dict of configuration
|
|
options and their value.
|
|
:rtype: bool of dict
|
|
"""
|
|
if self.comment:
|
|
print('# %s' % self.comment)
|
|
print('[%s]\n' % self.name)
|
|
result = dict()
|
|
error = False
|
|
for name, option in self.options.items():
|
|
option_result = option.ask_value(set_it=set_it)
|
|
if set_it:
|
|
result[name] = option_result
|
|
elif not option_result:
|
|
error = True
|
|
print()
|
|
print()
|
|
if set_it:
|
|
return not error
|
|
return result
|
|
|
|
|
|
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
|
|
"""
|
|
Custom TextHelpFormatter for argparse.ArgumentParser that allowing line to keep line return
|
|
"""
|
|
|
|
def _split_lines(self, text, width):
|
|
result = []
|
|
for line in textwrap.dedent(text).splitlines():
|
|
# Keep empty line
|
|
if line == "":
|
|
result.append(line)
|
|
continue
|
|
# Split ident prefix and line text
|
|
m = re.match('^( *)(.*)$', line)
|
|
ident = m.group(1)
|
|
line_text = m.group(2)
|
|
# Wrap each lines and add in result with ident prefix
|
|
for subline in textwrap.wrap(line_text, width - len(ident)):
|
|
result.append(ident + subline)
|
|
return result
|
|
|
|
|
|
class Config: # pylint: disable=too-many-instance-attributes
|
|
""" Configuration helper """
|
|
|
|
def __init__(self, appname, shortname=None, version=None, encoding=None,
|
|
config_file_env_variable=None, default_config_dirpath=None):
|
|
self.appname = appname
|
|
self.shortname = shortname
|
|
self.version = version if version else '0.0'
|
|
self.encoding = encoding if encoding else 'utf-8'
|
|
self.config_parser = None
|
|
self.options_parser = None
|
|
self.options = None
|
|
self.sections = {}
|
|
self._loaded_callbacks = []
|
|
self._loaded_callbacks_executed = []
|
|
self._filepath = None
|
|
self.config_file_env_variable = config_file_env_variable
|
|
self.default_config_dirpath = default_config_dirpath
|
|
|
|
def add_section(self, name, loaded_callback=None, **kwargs):
|
|
"""
|
|
Add section
|
|
|
|
: param name: The section name
|
|
: param loaded_callback: An optional callback method that will be executed after configuration is loaded
|
|
The specified callback method will receive Config object as parameter.
|
|
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
|
|
"""
|
|
assert name not in self.sections, "Duplicated section %s" % name
|
|
|
|
self.sections[name] = ConfigSection(self, name, **kwargs)
|
|
if loaded_callback:
|
|
self._loaded_callbacks.append(loaded_callback)
|
|
# If configuration is already loaded, execute callback immediatly
|
|
if self.config_parser or self.options:
|
|
self._loaded()
|
|
return self.sections[name]
|
|
|
|
def defined(self, section, option):
|
|
""" Check option is defined in specified section """
|
|
return section in self.sections and self.sections[section].defined(option)
|
|
|
|
def isset(self, section, option):
|
|
""" Check option is set in specified section """
|
|
return section in self.sections and self.sections[section].isset(option)
|
|
|
|
def get(self, section, option):
|
|
""" Get option value """
|
|
assert self.config_parser or self.options, 'Unconfigured options parser'
|
|
assert self.defined(
|
|
section, option), 'Unknown option %s.%s' % (section, option)
|
|
value = self.sections[section].get(option)
|
|
log.debug('get(%s, %s): %s (%s)', section, option, value, type(value))
|
|
return value
|
|
|
|
def set(self, section, option, value):
|
|
""" Set option value """
|
|
assert self.config_parser, 'Unconfigured options parser'
|
|
assert self.defined(
|
|
section, option), 'Unknown option %s.%s' % (section, option)
|
|
self.sections[section].set(option, value)
|
|
|
|
def load_file(self, filepath, execute_callback=True):
|
|
""" Read configuration file """
|
|
|
|
self.config_parser = ConfigParser()
|
|
self._filepath = filepath
|
|
|
|
# Checking access of configuration file
|
|
if not os.path.isfile(filepath):
|
|
return True
|
|
|
|
if not os.access(filepath, os.R_OK):
|
|
return False
|
|
|
|
try:
|
|
self.config_parser.read(filepath, encoding=self.encoding)
|
|
except Exception: # pylint: disable=broad-except
|
|
self.config_parser = None
|
|
log.exception('Failed to read configuration file %s', filepath)
|
|
return False
|
|
|
|
# Logging initialization
|
|
if self.config_parser.has_section('loggers'):
|
|
fileConfig(filepath)
|
|
else:
|
|
# Otherwise, use systemd journal handler
|
|
handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname)
|
|
handler.setFormatter(
|
|
logging.Formatter(
|
|
'%(levelname)s | %(name)s | %(message)s'
|
|
)
|
|
)
|
|
logging.getLogger().addHandler(handler)
|
|
|
|
self._filepath = filepath
|
|
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
return True
|
|
|
|
def _loaded(self):
|
|
""" Execute loaded callbacks """
|
|
error = False
|
|
for callback in self._loaded_callbacks:
|
|
if callback in self._loaded_callbacks_executed:
|
|
continue
|
|
if not callback(self):
|
|
error = True
|
|
self._loaded_callbacks_executed.append(callback)
|
|
return not error
|
|
|
|
def save(self, filepath=None):
|
|
""" Save configuration file """
|
|
filepath = filepath if filepath else self._filepath
|
|
assert filepath, 'Configuration filepath is not set or provided'
|
|
|
|
# Checking access of target file/directory
|
|
dirpath = os.path.dirname(filepath)
|
|
if os.path.isfile(filepath):
|
|
if not os.access(filepath, os.W_OK):
|
|
log.error('Configuration file "%s" is not writable', filepath)
|
|
return False
|
|
elif not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK):
|
|
log.error(
|
|
'Configuration directory "%s" does not exist (or not writable)', dirpath)
|
|
return False
|
|
|
|
lines = ['#\n# %s configuration\n#\n' % self.appname]
|
|
|
|
for section_name in self._ordered_section_names:
|
|
lines.append('')
|
|
lines.append(self.sections[section_name].export_to_config())
|
|
|
|
try:
|
|
with open(filepath, 'wb') as fd:
|
|
fd.write(
|
|
'\n'.join(lines).encode(self.encoding)
|
|
)
|
|
|
|
# Privacy!
|
|
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
|
|
except Exception: # pylint: disable=broad-except
|
|
log.exception(
|
|
'Failed to write generated configuration file %s', filepath)
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def _ordered_section_names(self):
|
|
""" Get ordered list of section names """
|
|
return sorted(self.sections.keys(), key=lambda section: self.sections[section].order)
|
|
|
|
def get_arguments_parser(self, **kwargs):
|
|
""" Get arguments parser """
|
|
if self.options_parser:
|
|
return self.options_parser
|
|
|
|
self.options_parser = argparse.ArgumentParser(
|
|
description=kwargs.pop('description', self.appname),
|
|
formatter_class=RawWrappedTextHelpFormatter,
|
|
**kwargs)
|
|
|
|
config_file_help = 'Configuration file to use (default: %s)' % self.config_filepath
|
|
if self.config_file_env_variable:
|
|
config_file_help += '\n\nYou also could set %s environment variable to specify your configuration file path.' % self.config_file_env_variable
|
|
self.options_parser.add_argument(
|
|
'-c',
|
|
'--config',
|
|
default=self.config_filepath,
|
|
help=config_file_help
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
'--save',
|
|
action='store_true',
|
|
dest='save',
|
|
help='Save current configuration to file',
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
'-d',
|
|
'--debug',
|
|
action='store_true',
|
|
help='Show debug messages'
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
'-v',
|
|
'--verbose',
|
|
action='store_true',
|
|
help='Show verbose messages'
|
|
)
|
|
|
|
section = self.add_section('console', comment='Console logging')
|
|
section.add_option(
|
|
BooleanOption, 'enabled', default=False,
|
|
arg='--console', short_arg='-C',
|
|
comment='Enable/disable console log')
|
|
section.add_option(
|
|
StringOption, 'log_format', default=DEFAULT_CONSOLE_LOG_FORMAT,
|
|
arg='--console-log-format', comment='Console log format')
|
|
|
|
self.add_options_to_parser(self.options_parser)
|
|
|
|
return self.options_parser
|
|
|
|
def parse_arguments_options(self, argv=None, parser=None, create=True, ask_values=True,
|
|
exit_after_created=True, execute_callback=True):
|
|
"""
|
|
Parse arguments options
|
|
|
|
:param argv: Optional arguments list to parse (default: sys.argv[1:])
|
|
:param parser: Optional argparse.ArgumentParser use
|
|
(default: generated by self.get_arguments_parser())
|
|
:param create: If True, configuration file will be created if it does not exits
|
|
(default: True)
|
|
:param ask_values: If True, ask user to enter valor of each configuration options
|
|
(default: True)
|
|
:param exit_after_created: If True, script will end after configuration file
|
|
creation (default: True)
|
|
:param execute_callback: Sections's loaded callbacks will be executed only if True
|
|
(default: True)
|
|
"""
|
|
parser = parser if parser else self.get_arguments_parser()
|
|
argcomplete.autocomplete(parser)
|
|
options = parser.parse_args(argv if argv is not None else sys.argv[1:])
|
|
self.load_options(options, execute_callback=False)
|
|
|
|
if options.config:
|
|
options.config = os.path.abspath(options.config)
|
|
|
|
already_saved = False
|
|
if not os.path.isfile(options.config) and (create or options.save):
|
|
log.warning(
|
|
"Configuration file is missing, generate it (%s)",
|
|
options.config
|
|
)
|
|
if ask_values:
|
|
self.ask_values(set_it=True)
|
|
self.save(options.config)
|
|
if exit_after_created:
|
|
sys.exit(0)
|
|
already_saved = True
|
|
|
|
# Load configuration file
|
|
if os.path.isfile(options.config) and not self.load_file(options.config, execute_callback=False):
|
|
parser.error(
|
|
'Failed to load configuration from file %s' % options.config
|
|
)
|
|
|
|
if options.save and not already_saved:
|
|
self.save()
|
|
sys.exit(0)
|
|
|
|
if options.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
elif options.verbose:
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
if self.get('console', 'enabled'):
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
if self.get('console', 'log_format'):
|
|
console_formater = logging.Formatter(self.get('console', 'log_format'))
|
|
console_handler.setFormatter(console_formater)
|
|
logging.getLogger().addHandler(console_handler)
|
|
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
return options
|
|
|
|
def load_options(self, options, execute_callback=True):
|
|
""" Register arguments parser options """
|
|
assert isinstance(options, argparse.Namespace)
|
|
self.options = options
|
|
log.debug('Argument options: %s', options)
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
def add_options_to_parser(self, parser):
|
|
""" Add sections and their options to parser """
|
|
for section in self._ordered_section_names:
|
|
self.sections[section].add_options_to_parser(parser)
|
|
|
|
def ask_values(self, set_it=True, execute_callback=False):
|
|
"""
|
|
Ask user to enter value for each configuration option
|
|
|
|
:param set_it: If True (default), option value will be updated with user input
|
|
:param execute_callback: Sections's loaded callbacks will be finally executed
|
|
(only if set_it is True, default: False)
|
|
|
|
:return: If set_it is True, return True if valid value for each configuration
|
|
option have been retrieved and set. If False, return a dict of configuration
|
|
section and their options value.
|
|
:rtype: bool of dict
|
|
"""
|
|
# On set it mode, ensure configuration file parser is initialized
|
|
if set_it and not self.config_parser:
|
|
self.config_parser = ConfigParser()
|
|
result = dict()
|
|
error = False
|
|
for name, section in self.sections.items():
|
|
section_result = section.ask_values(set_it=set_it)
|
|
if not set_it:
|
|
result[name] = section_result
|
|
elif not section_result:
|
|
error = True
|
|
if set_it:
|
|
if error:
|
|
return False
|
|
if execute_callback:
|
|
self._loaded()
|
|
return True
|
|
return result
|
|
|
|
def configure(self, argv=None, description=False):
|
|
"""
|
|
Entry point of a script you could use to created your configuration file
|
|
|
|
Note: make sure to load to define all your configuration section and options
|
|
before running it.
|
|
"""
|
|
parser = self.get_arguments_parser(
|
|
description=description if description else "Generate configuration file"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-i', '--interactive',
|
|
action='store_true', dest='interactive',
|
|
help="Enable configuration interactive mode"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-O', '--overwrite',
|
|
action='store_true', dest='overwrite',
|
|
help="Overwrite configuration file if exists"
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-V', '--validate',
|
|
action='store_true', dest='validate',
|
|
help=(
|
|
"Validate configuration: initialize application to test if provided parameters works.\n\n"
|
|
"Note: Validation will occured after configuration file creation or update. On error, "
|
|
"re-run with -O/--overwrite parameter to fix it."
|
|
)
|
|
)
|
|
|
|
options = self.parse_arguments_options(
|
|
argv, create=False, execute_callback=False)
|
|
|
|
if os.path.exists(options.config) and not options.overwrite:
|
|
print('Configuration file %s already exists' % options.config)
|
|
sys.exit(1)
|
|
|
|
if options.interactive:
|
|
self.ask_values(set_it=True)
|
|
|
|
if self.save(options.config):
|
|
print('Configuration file %s created.' % options.config)
|
|
if options.validate:
|
|
print('Validate your configuration...')
|
|
try:
|
|
if self._loaded():
|
|
print('Your configuration seem valid.')
|
|
else:
|
|
print('Error(s) occurred validating your configuration. See logs for details.')
|
|
sys.exit(1)
|
|
except Exception:
|
|
print(
|
|
'Exception occurred validating your configuration: %s\n\nSee logs for details.' %
|
|
traceback.format_exc())
|
|
sys.exit(2)
|
|
else:
|
|
print('Error occured creating configuration file %s' %
|
|
options.config)
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|
|
@property
|
|
def config_dir(self):
|
|
""" Retrieve configuration directory path """
|
|
if self._filepath:
|
|
return os.path.dirname(self._filepath)
|
|
if self.default_config_dirpath:
|
|
return self.default_config_dirpath
|
|
return DEFAULT_CONFIG_DIRPATH
|
|
|
|
@property
|
|
def config_filepath(self):
|
|
""" Retrieve configuration file path """
|
|
if self._filepath:
|
|
return self._filepath
|
|
if self.config_file_env_variable and os.environ.get(self.config_file_env_variable):
|
|
return os.environ.get(self.config_file_env_variable)
|
|
return os.path.join(
|
|
self.config_dir,
|
|
("%s.ini" % self.shortname) if self.shortname
|
|
else "config.ini"
|
|
)
|
|
|
|
|
|
class ConfigurableObject:
|
|
"""
|
|
Base class of configurable object
|
|
|
|
This class provide a way to configure an object using :
|
|
- mylib.config.Config object
|
|
- argparse.Namespace object
|
|
- kwargs passed to __init__ method
|
|
"""
|
|
|
|
# Configuration object name (used for default options prefix and config section)
|
|
# Note: required if options_prefix or/and config_section parameters not provided
|
|
# to __init__ method.
|
|
_config_name = None
|
|
|
|
# Configuration comment (used for config section)
|
|
_config_comment = None
|
|
|
|
# Default options value
|
|
# Important: all supported options MUST HAVE a default value defined
|
|
_defaults = {}
|
|
|
|
# Store options passed throuht __init__ method
|
|
_kwargs = {}
|
|
_options = {}
|
|
_options_prefix = None
|
|
_config = None
|
|
_config_section = None
|
|
|
|
def __init__(self, options=None, options_prefix=None, config=None, config_section=None,
|
|
**kwargs):
|
|
|
|
for key, value in kwargs.items():
|
|
assert key in self._defaults, "Unknown %s option" % key
|
|
self._kwargs[key] = value
|
|
|
|
if options:
|
|
self._options = options
|
|
if options_prefix is not None:
|
|
self._options_prefix = options_prefix
|
|
elif self._config_name:
|
|
self._options_prefix = self._config_name + '_'
|
|
else:
|
|
raise Exception('No configuration name defined for %s' % __name__)
|
|
|
|
if config:
|
|
self._config = config
|
|
if config_section:
|
|
self._config_section = config_section
|
|
elif self._config_name:
|
|
self._config_section = self._config_name
|
|
else:
|
|
raise Exception('No configuration name defined for %s' % __name__)
|
|
|
|
def _get_option(self, option, default=None, required=False):
|
|
""" Retreive option value """
|
|
if self._kwargs and option in self._kwargs:
|
|
return self._kwargs[option]
|
|
|
|
if self._options and hasattr(self._options, self._options_prefix + option):
|
|
return getattr(self._options, self._options_prefix + option)
|
|
|
|
if self._config and self._config.defined(self._config_section, option):
|
|
return self._config.get(self._config_section, option)
|
|
|
|
assert not required, "Options %s not defined" % option
|
|
|
|
return default if default is not None else self._defaults.get(option)
|
|
|
|
def configure(self, comment=None, ** kwargs):
|
|
""" Configure options on registered mylib.Config object """
|
|
assert self._config, "mylib.Config object not registered. Must be passed to __init__ as config keyword argument."
|
|
|
|
return self._config.add_section(
|
|
self._config_section,
|
|
comment=comment if comment else self._config_comment,
|
|
loaded_callback=self.initialize, **kwargs)
|
|
|
|
def initialize(self, loaded_config=None):
|
|
""" Configuration initialized hook """
|
|
if loaded_config:
|
|
self.config = loaded_config # pylint: disable=attribute-defined-outside-init
|