python-mylib/mylib/config.py

1446 lines
52 KiB
Python

# pylint: disable=too-many-lines
""" Configuration & options parser """
import argparse
import logging
import os
import re
import sys
import textwrap
import traceback
from configparser import ConfigParser
from getpass import getpass
from logging.config import fileConfig
import argcomplete
import keyring
from systemd.journal import JournalHandler
log = logging.getLogger(__name__)
# Constants
DEFAULT_ENCODING = "utf-8"
DEFAULT_CONFIG_DIRPATH = os.path.expanduser("./")
DEFAULT_CONFIG_FILE_MODE = 0o600
DEFAULT_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
DEFAULT_CONSOLE_LOG_FORMAT = DEFAULT_LOG_FORMAT
DEFAULT_FILELOG_FORMAT = DEFAULT_LOG_FORMAT
class ConfigException(BaseException):
"""Configuration exception"""
class BaseOption: # pylint: disable=too-many-instance-attributes
"""Base configuration option class"""
def __init__(
self,
config,
section,
name,
default=None,
comment=None,
arg=None,
short_arg=None,
arg_help=None,
no_arg=False,
):
self.config = config
self.section = section
self.name = name
self.default = default
self.comment = comment
self.no_arg = no_arg
self.arg = arg
self.short_arg = short_arg
self.arg_help = arg_help if arg_help else comment
self._set = False
@property
def _isset_in_options(self):
"""Check if option is defined in registered arguments parser options"""
return self.config.options and not self.no_arg and self._from_options != self.default
@property
def _from_options(self):
"""Get option from arguments parser options"""
value = (
getattr(self.config.options, self.parser_dest)
if self.config.options and not self.no_arg
else None
)
log.debug("_from_options(%s, %s) = %s", self.section.name, self.name, value)
return value
@property
def _isset_in_config_file(self):
"""Check if option is defined in the loaded configuration file"""
return self.config.config_filepath and self.config.config_parser.has_option(
self.section.name, self.name
)
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.config.config_parser.get(self.section.name, self.name)
@property
def _default_in_config(self):
"""Get option default value considering current value from configuration"""
return self._from_config if self._isset_in_config_file else self.default
def isset(self):
"""Check if option is defined in the loaded configuration file"""
return self._isset_in_config_file or self._isset_in_options
def get(self):
"""Get option value from options, config or default"""
if self._isset_in_options and not self._set:
return self._from_options
if self._isset_in_config_file:
return self._from_config
return self.default
def set(self, value):
"""Set option value to config file and options"""
if value == "":
value = None
if value == self.default or value is None:
# Remove option from config (is section exists)
if self.config.config_parser.has_section(self.section.name):
self.config.config_parser.remove_option(self.section.name, self.name)
else:
# Store option to config
if not self.config.config_parser.has_section(self.section.name):
self.config.config_parser.add_section(self.section.name)
self.config.config_parser.set(self.section.name, self.name, self.to_config(value))
self._set = True
def set_default(self, default_value):
"""Set option default value"""
self.default = default_value
@property
def parser_action(self):
"""Get action as accept by argparse.ArgumentParser"""
return "store"
@property
def parser_type(self):
"""Get type as handle by argparse.ArgumentParser"""
return str
@property
def parser_dest(self):
"""Get option name in arguments parser options"""
return f"{self.section.name}_{self.name}"
@property
def parser_help(self):
"""Get option help message in arguments parser options"""
if self.arg_help and self.default is not None:
# pylint: disable=consider-using-f-string
return "{} (Default: {})".format(
self.arg_help, re.sub(r"%([^%])", r"%%\1", str(self._default_in_config))
)
if self.arg_help:
return self.arg_help
return None
@property
def parser_argument_name(self):
"""Get option argument name in parser options"""
return (
self.arg if self.arg else f"--{self.section.name}-{self.name}".lower().replace("_", "-")
)
def add_option_to_parser(self, section_opts):
"""Add option to arguments parser"""
if self.no_arg:
return
args = [self.parser_argument_name]
if self.short_arg:
args.append(self.short_arg)
kwargs = {
"action": self.parser_action,
"dest": self.parser_dest,
"help": self.parser_help,
"default": self.default,
}
if self.parser_type: # pylint: disable=using-constant-test
kwargs["type"] = self.parser_type
log.debug(
"add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s",
self.section.name,
self.name,
", ".join(args),
kwargs,
)
section_opts.add_argument(*args, **kwargs)
def to_config(self, value=None):
"""Format value as stored in configuration file"""
value = value if value is not None else self.get()
return "" if value is None else str(value)
def export_to_config(self):
"""Export option to configuration file"""
lines = []
if self.comment:
lines.append(f"# {self.comment}")
value = self.to_config()
default_value = "" if self.default is None else self.to_config(self.default)
log.debug(
"export_to_config(%s, %s): value=%s / default=%s",
self.section.name,
self.name,
value,
default_value,
)
if default_value:
if isinstance(default_value, str) and "\n" in default_value:
lines.append("# Default:")
lines.extend([f"# {line}" for line in default_value.split("\n")])
else:
lines.append(f"# Default: {default_value}")
if value and value != default_value:
if isinstance(value, str) and "\n" in value:
value_lines = value.split("\n")
lines.append(f"# Default: {value_lines[0]}")
lines.extend([f' {line.replace("#", "%(hash)s")}' for line in value_lines[1:]])
else:
lines.append(f"{self.name} = {value}")
else:
lines.append(f"# {self.name} =")
lines.append("")
return "\n".join(lines)
@staticmethod
def _get_user_input(prompt):
"""
Get user console input
Note: do not use directly input() to allow to mock it in tests
"""
return input(prompt)
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
if self.comment:
print(f"# {self.comment}")
default_value = kwargs.get("default_value", self.get())
if not prompt:
prompt = f"{self.name}: "
if default_value is not None:
if isinstance(default_value, str) and "\n" in default_value:
prompt += "[\n %s\n] " % "\n ".join(
self.to_config(default_value).split("\n")
)
else:
prompt += f"[{self.to_config(default_value)}] "
value = self._get_user_input(prompt)
return default_value if value == "" else value
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
value = self._ask_value()
if set_it:
self.set(value)
return value
class StringOption(BaseOption):
"""String configuration option class"""
class BooleanOption(BaseOption):
"""Boolean configuration option class"""
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.config.config_parser.getboolean(self.section.name, self.name)
def to_config(self, value=None):
"""Format value as stored in configuration file"""
return super().to_config(value).lower()
@property
def _isset_in_options(self):
"""Check if option is defined in registered arguments parser options"""
return (
self.config.options
and not self.no_arg
and getattr(self.config.options, self.parser_dest)
)
@property
def _from_options(self):
"""Get option from arguments parser options"""
return not self._default_in_config if self._isset_in_options else self._default_in_config
@property
def parser_action(self):
return "store_true"
@property
def parser_type(self):
return None
@property
def parser_argument_name(self):
"""Get option argument name in parser options"""
# pylint: disable=consider-using-f-string
return (
self.arg
if self.arg
else "--{}-{}-{}".format(
self.section.name, "enable" if not self._default_in_config else "disable", self.name
)
.lower()
.replace("_", "-")
)
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
default_value = self.get()
prompt = f"{self.name}: "
if default_value:
prompt += "[Y/n] "
else:
prompt += "[y/N] "
while True:
value = super()._ask_value(prompt, **kwargs)
if value in ["", None, default_value]:
return default_value
if value.lower() == "y":
return True
if value.lower() == "n":
return False
print("Invalid answer. Possible values: Y or N (case insensitive)")
class FloatOption(BaseOption):
"""Float configuration option class"""
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.config.config_parser.getfloat(self.section.name, self.name)
@property
def parser_type(self):
return float
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
default_value = self.get()
while True:
value = super()._ask_value(prompt, **kwargs)
if value in ["", None, default_value]:
return default_value
try:
return float(value)
except ValueError:
print('Invalid answer. Must a numeric value, for instance "12" or "12.5"')
class IntegerOption(BaseOption):
"""Integer configuration option class"""
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.config.config_parser.getint(self.section.name, self.name)
def to_config(self, value=None):
"""Format value as stored in configuration file"""
value = value if value is not None else self.get()
return str(int(value)) if value is not None else ""
@property
def parser_type(self):
return int
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
default_value = kwargs.pop("default_value", self.get())
while True:
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
if value in ["", None, default_value]:
return default_value
try:
return int(value)
except ValueError:
print("Invalid answer. Must a integer value")
class OctalOption(BaseOption):
"""Octal configuration option class"""
@staticmethod
def octal(value):
"""Convert configuration octal string as integer"""
return int(str(value), 8)
@staticmethod
def octal_string(value):
"""Convert integer to configuration octal string"""
return oct(value)[2:]
@property
def _from_config(self):
"""Get option value from ConfigParser"""
return self.octal(self.config.config_parser.getint(self.section.name, self.name))
def to_config(self, value=None):
"""Format value as stored in configuration file"""
value = value if value is not None else self.get()
return self.octal_string(value) if value is not None else ""
@property
def parser_type(self):
return self.octal
@property
def parser_help(self):
"""Get option help message in arguments parser options"""
if self.arg_help and self.default is not None:
# pylint: disable=consider-using-f-string
return "{} (Default: {})".format(
self.arg_help,
re.sub(r"%([^%])", r"%%\1", self.octal_string(self._default_in_config)),
)
if self.arg_help:
return self.arg_help
return None
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
default_value = kwargs.pop("default_value", self.get())
while True:
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
if value in ["", None, default_value]:
return default_value
try:
return self.octal(value)
except ValueError:
print("Invalid answer. Must an octal value")
class PasswordOption(StringOption):
"""Password configuration option class"""
def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs):
super().__init__(*arg, **kwargs)
self.username_option = username_option
self.keyring_value = keyring_value if keyring_value is not None else "keyring"
@property
def _keyring_service_name(self):
"""Return keyring service name"""
return ".".join([self.config.shortname, self.section.name, self.name])
@property
def _keyring_username(self):
"""Return keyring username"""
return self.section.get(self.username_option) if self.username_option else self.name
def get(self):
"""Get option value"""
value = super().get()
if value != self.keyring_value:
return value
service_name = self._keyring_service_name
username = self._keyring_username
log.debug("Retrieve password %s for username=%s from keyring", service_name, username)
value = keyring.get_password(service_name, username)
if value is None:
# pylint: disable=consider-using-f-string
value = getpass(
"Please enter {}{}: ".format(
f"{self.section.name} {self.name}",
f" for {username}" if username != self.name else "",
)
)
keyring.set_password(service_name, username, value)
return value
def to_config(self, value=None):
"""Format value as stored in configuration file"""
if super().get() == self.keyring_value:
return self.keyring_value
return super().to_config(value)
def set(self, value, use_keyring=None): # pylint: disable=arguments-differ
"""Set option value to config file"""
if (use_keyring is None and super().get() == self.keyring_value) or use_keyring:
keyring.set_password(self._keyring_service_name, self._keyring_username, value)
value = self.keyring_value
super().set(value)
def _ask_value(self, prompt=None, **kwargs):
"""Ask to user to enter value of this option and return it"""
if self.comment:
print("# " + self.comment)
default_value = kwargs.pop("default_value", self.get())
if not prompt:
prompt = f"{self.name}: "
if default_value is not None:
# Hide value only if it differed from default value
if default_value == self.default:
prompt += f"[{default_value}] "
else:
prompt += "[secret defined, leave to empty to keep it as unchange] "
value = getpass(prompt)
return default_value if value == "" else value
def ask_value(self, set_it=True):
"""
Ask to user to enter value of this option and set it if set_it parameter is True
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
value = self._ask_value()
if set_it:
use_keyring = None
default_use_keyring = super().get() == self.keyring_value
while use_keyring is None:
prompt = (
f"Do you want to use XDG keyring ? [{'Y/n' if default_use_keyring else 'y/N'}] "
)
result = input(prompt).lower()
if result == "":
use_keyring = default_use_keyring
elif result == "y":
use_keyring = True
elif result == "n":
use_keyring = False
else:
print("Invalid answer. Possible values: Y or N (case insensitive)")
self.set(value, use_keyring=use_keyring)
return value
class ConfigSection:
"""Configuration section class"""
def __init__(self, config, name, comment=None, order=None):
self.config = config
self.name = name
self.options = {}
self.comment = comment
self.order = order if isinstance(order, int) else 10
def add_option(self, _type, name, **kwargs):
"""
Add option
:param _type: Option type, derivated from BaseOption
:param name: Option name
:param **kwargs: Dict of raw option for type class
"""
assert not self.defined(name), f"Duplicated option {name}"
self.options[name] = _type(self.config, self, name, **kwargs)
return self.options[name]
def defined(self, option):
"""Check if option is defined"""
return option in self.options
def isset(self, option):
"""Check if option is set"""
return self.defined(option) and self.options[option].isset()
def get(self, option):
"""Get option value"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].get()
def set(self, option, value):
"""Set option value"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].set(value)
def set_default(self, option, default_value):
"""Set default option value"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].set_default(default_value)
def set_defaults(self, **default_values):
"""Set default options value"""
for option, default_value in default_values.items():
self.set_default(option, default_value)
def add_options_to_parser(self, parser):
"""Add section to argparse.ArgumentParser"""
assert isinstance(parser, argparse.ArgumentParser)
section_opts = parser.add_argument_group(
self.comment if self.comment else self.name.capitalize()
)
for option in self.options: # pylint: disable=consider-using-dict-items
self.options[option].add_option_to_parser(section_opts)
def export_to_config(self):
"""Export section and their options to configuration file"""
lines = []
if self.comment:
lines.append(f"# {self.comment}")
lines.append(f"[{self.name}]")
for option in self.options: # pylint: disable=consider-using-dict-items
lines.append(self.options[option].export_to_config())
return "\n".join(lines)
def ask_values(self, set_it=True):
"""
Ask user to enter value for each configuration option of the section
:param set_it: If True (default), option value will be updated with user input
:return: a dict of configuration options and their value.
:rtype: bool of dict
"""
if self.comment:
print(f"# {self.comment}")
print(f"[{self.name}]\n")
result = {}
for name, option in self.options.items():
result[name] = option.ask_value(set_it=set_it)
print()
print()
return result
def ask_value(self, option, set_it=True):
"""
Ask user to enter value for the specified configuration option of the section
:param options: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(option), f"Option {option} unknown"
return self.options[option].ask_value(set_it=set_it)
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
"""
Custom TextHelpFormatter for argparse.ArgumentParser that allowing line to keep line return
"""
def _split_lines(self, text, width):
result = []
for line in textwrap.dedent(text).splitlines():
# Keep empty line
if line == "":
result.append(line)
continue
# Split ident prefix and line text
m = re.match("^( *)(.*)$", line)
ident = m.group(1)
line_text = m.group(2)
# Wrap each lines and add in result with ident prefix
for subline in textwrap.wrap(line_text, width - len(ident)):
result.append(ident + subline)
return result
class Config: # pylint: disable=too-many-instance-attributes
"""Configuration helper"""
def __init__(
self,
appname,
shortname=None,
version=None,
encoding=None,
config_file_env_variable=None,
default_config_dirpath=None,
default_config_filename=None,
default_config_file_mode=None,
):
self.appname = appname
self.shortname = shortname
self.version = version if version else "0.0"
self.encoding = encoding if encoding else "utf-8"
self.config_parser = None
self.options_parser = None
self.options = None
self.sections = {}
self._loaded_callbacks = []
self._loaded_callbacks_executed = []
self._filepath = None
self.config_file_env_variable = config_file_env_variable
self.default_config_dirpath = default_config_dirpath
self.default_config_filename = default_config_filename
self.default_config_file_mode = default_config_file_mode or DEFAULT_CONFIG_FILE_MODE
self.add_logging_sections()
self._init_config_parser()
def add_logging_sections(self):
"""Add logging sections"""
console_section = self.add_section("console", comment="Console logging", order=998)
console_section.add_option(
BooleanOption,
"enabled",
default=False,
arg="--console",
short_arg="-C",
comment="Enable/disable console log",
)
console_section.add_option(
BooleanOption,
"force_stderr",
default=False,
arg="--console-stderr",
comment="Force console log on stderr",
)
console_section.add_option(
StringOption,
"log_format",
default=DEFAULT_CONSOLE_LOG_FORMAT,
arg="--console-log-format",
comment="Console log format",
)
console_section.add_option(
StringOption,
"log_level",
comment=(
"Console log level limit : by default, all logged messages (according to main log "
"level) will be logged to the console, but you can set a minimal level if you "
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
f"want. Possible values: {', '.join(logging._nameToLevel)}."
),
)
logfile_section = self.add_section("logfile", comment="Logging file", order=999)
logfile_section.add_option(StringOption, "path", comment="File log path")
logfile_section.add_option(
StringOption,
"format",
default=DEFAULT_FILELOG_FORMAT,
comment="File log format",
)
logfile_section.add_option(
StringOption,
"level",
comment=(
"File log level limit : by default, all logged messages (according to main log "
"level) will be logged to the log file, but you can set a minimal level if you "
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
f"want. Possible values: {', '.join(logging._nameToLevel)}."
),
)
def add_section(self, name, loaded_callback=None, **kwargs):
"""
Add section
: param name: The section name
: param loaded_callback: An optional callback method that will be executed after
configuration is loaded. The specified callback method will receive
Config object as parameter.
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
"""
assert name not in self.sections, f"Duplicated section {name}"
self.sections[name] = ConfigSection(self, name, **kwargs)
if loaded_callback:
self._loaded_callbacks.append(loaded_callback)
# If configuration is already loaded, execute callback immediately
if self._filepath or self.options:
self._loaded()
return self.sections[name]
def defined(self, section, option):
"""Check option is defined in specified section"""
return section in self.sections and self.sections[section].defined(option)
def isset(self, section, option):
"""Check option is set in specified section"""
return section in self.sections and self.sections[section].isset(option)
def get(self, section, option):
"""Get option value"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
value = self.sections[section].get(option)
log.debug("get(%s, %s): %s (%s)", section, option, value, type(value))
return value
def get_option(self, option, default=None):
"""Get an argument parser option value"""
if self.options and hasattr(self.options, option):
return getattr(self.options, option)
return default
def __getitem__(self, key):
assert key in self.sections, f"Unknown section {key}"
return ConfigSectionAsDictWrapper(self.sections[key])
def set(self, section, option, value):
"""Set option value"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
self._init_config_parser()
self.sections[section].set(option, value)
def set_default(self, section, option, default_value):
"""Set default option value"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
self._init_config_parser()
self.sections[section].set_default(option, default_value)
def set_defaults(self, section, **default_values):
"""Set default options value"""
assert section in self.sections, f"Unknown section {section}"
self._init_config_parser()
self.sections[section].set_defaults(**default_values)
def _init_config_parser(self, force=False):
"""Initialize ConfigParser object"""
if not self.config_parser or force:
self.config_parser = ConfigParser(defaults={"hash": "#"})
def load_file(self, filepath, execute_callback=True):
"""Read configuration file"""
self._init_config_parser(force=True)
self._filepath = filepath
# Checking access of configuration file
if not os.path.isfile(filepath):
return True
if not os.access(filepath, os.R_OK):
return False
try:
self.config_parser.read(filepath, encoding=self.encoding)
except Exception: # pylint: disable=broad-except
self._init_config_parser(force=True)
log.exception("Failed to read configuration file %s", filepath)
return False
# Logging initialization
if self.config_parser.has_section("loggers"):
fileConfig(filepath)
else:
# Otherwise, use systemd journal handler
handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname)
handler.setFormatter(logging.Formatter("%(levelname)s | %(name)s | %(message)s"))
logging.getLogger().addHandler(handler)
self._filepath = filepath
if execute_callback:
self._loaded()
return True
def _loaded(self):
"""Execute loaded callbacks"""
error = False
for callback in self._loaded_callbacks:
if callback in self._loaded_callbacks_executed:
continue
if not callback(self):
error = True
self._loaded_callbacks_executed.append(callback)
return not error
def save(self, filepath=None, reload=True):
"""Save configuration file"""
filepath = filepath if filepath else self._filepath
assert filepath, "Configuration filepath is not set or provided"
# Checking access of target file/directory
dirpath = os.path.dirname(filepath)
if os.path.isfile(filepath):
if not os.access(filepath, os.W_OK):
log.error('Configuration file "%s" is not writable', filepath)
return False
elif not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK):
log.error('Configuration directory "%s" does not exist (or not writable)', dirpath)
return False
lines = [f"#\n# {self.appname} configuration\n#\n"]
for section_name in self._ordered_section_names:
lines.append("")
lines.append(self.sections[section_name].export_to_config())
try:
with open(filepath, "wb") as fd:
fd.write("\n".join(lines).encode(self.encoding))
# Privacy!
os.chmod(filepath, self.default_config_file_mode)
except Exception: # pylint: disable=broad-except
log.exception("Failed to write generated configuration file %s", filepath)
return False
if reload:
return self.load_file(filepath)
return True
@property
def _ordered_section_names(self):
"""Get ordered list of section names"""
return sorted(self.sections.keys(), key=lambda section: self.sections[section].order)
def get_arguments_parser(self, reconfigure=False, **kwargs):
"""Get arguments parser"""
if self.options_parser:
return self.options_parser
self.options_parser = argparse.ArgumentParser(
description=kwargs.pop("description", self.appname),
formatter_class=RawWrappedTextHelpFormatter,
**kwargs,
)
config_file_help = f"Configuration file to use (default: {self.config_filepath})"
if self.config_file_env_variable:
config_file_help += (
"\n\nYou also could set "
f"{self.config_file_env_variable} environment variable to "
"specify your configuration file path."
)
self.options_parser.add_argument(
"-c", "--config", default=self.config_filepath, help=config_file_help
)
self.options_parser.add_argument(
"--save",
action="store_true",
dest="save",
help="Save current configuration to file",
)
if reconfigure:
self.options_parser.add_argument(
"--reconfigure",
action="store_true",
dest="mylib_config_reconfigure",
help="Reconfigure and update configuration file",
)
self.options_parser.add_argument(
"-d", "--debug", action="store_true", help="Show debug messages"
)
self.options_parser.add_argument(
"-v", "--verbose", action="store_true", help="Show verbose messages"
)
self.add_options_to_parser(self.options_parser)
return self.options_parser
def parse_arguments_options(
self,
argv=None,
parser=None,
create=True,
ask_values=True,
exit_after_created=True,
execute_callback=True,
hardcoded_options=None,
):
"""
Parse arguments options
:param argv: Optional arguments list to parse (default: sys.argv[1:])
:param parser: Optional argparse.ArgumentParser use
(default: generated by self.get_arguments_parser())
:param create: If True, configuration file will be created if it does not exits
(default: True)
:param ask_values: If True, ask user to enter valor of each configuration options
(default: True)
:param exit_after_created: If True, script will end after configuration file
creation (default: True)
:param execute_callback: Sections's loaded callbacks will be executed only if True
(default: True)
:param hardcoded_options: Optional hard-coded options to set after loading arguments
and configuration file. These options are passed using an
list of tuple of 3 elements: the section and the option
names and the value.
[('section1', 'option1', value), ...]
"""
parser = parser if parser else self.get_arguments_parser()
argcomplete.autocomplete(parser)
options = parser.parse_args(argv if argv is not None else sys.argv[1:])
self.load_options(options, execute_callback=False)
if options.config:
options.config = os.path.abspath(options.config)
already_saved = False
if not os.path.isfile(options.config) and (create or options.save):
log.warning("Configuration file is missing, generate it (%s)", options.config)
if ask_values:
self.ask_values(set_it=True)
self.save(options.config)
if exit_after_created:
sys.exit(0)
already_saved = True
# Load configuration file
if os.path.isfile(options.config) and not self.load_file(
options.config, execute_callback=False
):
parser.error(f"Failed to load configuration from file {options.config}")
if options.save and not already_saved:
self.save()
sys.exit(0)
if options.debug:
logging.getLogger().setLevel(logging.DEBUG)
elif options.verbose:
logging.getLogger().setLevel(logging.INFO)
if hardcoded_options:
assert isinstance(hardcoded_options, list), (
"hardcoded_options must be a list of tuple of 3 elements: "
"the section and the option names and the value."
)
for opt_info in hardcoded_options:
assert isinstance(opt_info, tuple) and len(opt_info) == 3, (
"Invalid hard-coded option value: it must be a tuple of 3 "
"elements: the section and the option names and the value."
)
self.set(*opt_info)
if self.get("console", "enabled"):
console_log_level = (
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
logging._nameToLevel.get(self.get("console", "log_level"))
if self.get("console", "log_level")
else logging.DEBUG
)
if console_log_level < logging.WARNING:
stdout_console_handler = logging.StreamHandler(
sys.stderr if self.get("console", "force_stderr") else sys.stdout
)
stdout_console_handler.addFilter(StdoutInfoFilter())
stdout_console_handler.setLevel(console_log_level)
stderr_console_handler = logging.StreamHandler(sys.stderr)
stderr_console_handler.setLevel(
console_log_level if console_log_level > logging.WARNING else logging.WARNING
)
if self.get("console", "log_format"):
console_formater = logging.Formatter(self.get("console", "log_format"))
if console_log_level < logging.WARNING:
stdout_console_handler.setFormatter(console_formater)
stderr_console_handler.setFormatter(console_formater)
if console_log_level < logging.WARNING:
logging.getLogger().addHandler(stdout_console_handler)
logging.getLogger().addHandler(stderr_console_handler)
if self.get("logfile", "path"):
logfile_handler = logging.FileHandler(self.get("logfile", "path"))
logfile_level = (
# logging.getLevelNamesMapping() not available in python 3.9
# pylint: disable=protected-access
logging._nameToLevel.get(self.get("logfile", "level"))
if self.get("logfile", "level")
else logging.DEBUG
)
if logfile_level is None:
log.fatal("Invalid log file level specified (%s)", self.get("logfile", "level"))
sys.exit(1)
logfile_handler.setLevel(logfile_level)
if self.get("logfile", "format"):
logfile_formater = logging.Formatter(self.get("logfile", "format"))
logfile_handler.setFormatter(logfile_formater)
logging.getLogger().addHandler(logfile_handler)
if execute_callback:
self._loaded()
if self.get_option("mylib_config_reconfigure", default=False):
self.ask_values(set_it=True)
sys.exit(1)
return options
def load_options(self, options, execute_callback=True):
"""Register arguments parser options"""
assert isinstance(options, argparse.Namespace)
self.options = options
log.debug("Argument options: %s", options)
if execute_callback:
self._loaded()
def add_options_to_parser(self, parser):
"""Add sections and their options to parser"""
for section in self._ordered_section_names:
self.sections[section].add_options_to_parser(parser)
def ask_values(self, set_it=True, execute_callback=False):
"""
Ask user to enter value for each configuration option
:param set_it: If True (default), option value will be updated with user input
:param execute_callback: Sections's loaded callbacks will be finally executed
(only if set_it is True, default: False)
:return: a dict of configuration section and their options value.
:rtype: dict
"""
result = {}
for name, section in self.sections.items():
result[name] = section.ask_values(set_it=set_it)
if set_it and execute_callback:
self._loaded()
return result
def ask_value(self, section, option, set_it=True):
"""
Ask user to enter value for the specified configuration option
:param section: The configuration section name
:param option: The configuration option name
:param set_it: If True (default), option value will be updated with user input
:return: The configuration option value.
:rtype: mixed
"""
assert self.defined(section, option), f"Unknown option {section}.{option}"
return self.sections[section].ask_value(option, set_it=set_it)
def configure(self, argv=None, description=False):
"""
Entry point of a script you could use to created your configuration file
Note: make sure to load to define all your configuration section and options
before running it.
"""
parser = self.get_arguments_parser(
description=description if description else "Generate configuration file"
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
dest="interactive",
help="Enable configuration interactive mode",
)
parser.add_argument(
"-O",
"--overwrite",
action="store_true",
dest="overwrite",
help="Overwrite configuration file if exists",
)
parser.add_argument(
"-V",
"--validate",
action="store_true",
dest="validate",
help=(
"Validate configuration: initialize application to test if provided parameters"
" works.\n\nNote: Validation will occurred after configuration file creation or"
" update. On error, re-run with -O/--overwrite parameter to fix it."
),
)
options = self.parse_arguments_options(argv, create=False, execute_callback=False)
def validate():
"""Validate configuration file"""
print("Validate your configuration...")
try:
if self.load_file(options.config):
print("Your configuration seem valid.")
else:
print("Error(s) occurred validating your configuration. See logs for details.")
sys.exit(1)
except Exception: # pylint: disable=broad-except
print(
"Exception occurred validating your configuration:\n"
f"{traceback.format_exc()}"
"\n\nSee logs for details."
)
sys.exit(2)
if os.path.exists(options.config) and not options.overwrite:
print(
f"Configuration file {options.config} already exists. "
"Use -O/--overwrite parameter to overwrite it."
)
if options.validate:
validate()
sys.exit(0)
sys.exit(1)
if options.interactive:
self.ask_values(set_it=True)
if self.save(options.config, reload=False):
print(f"Configuration file {options.config} created.")
if options.validate:
validate()
else:
print(f"Error occurred creating configuration file {options.config}")
sys.exit(1)
sys.exit(0)
@property
def config_dir(self):
"""Retrieve configuration directory path"""
if self._filepath:
return os.path.dirname(self._filepath)
if self.default_config_dirpath:
return self.default_config_dirpath
return DEFAULT_CONFIG_DIRPATH
@property
def config_filepath(self):
"""Retrieve configuration file path"""
if self._filepath:
return self._filepath
if self.config_file_env_variable and os.environ.get(self.config_file_env_variable):
return os.environ.get(self.config_file_env_variable)
if self.default_config_filename:
filename = self.default_config_filename
else:
filename = f"{self.shortname}.ini" if self.shortname else "config.ini"
return os.path.join(self.config_dir, filename)
class ConfigurableObject:
"""
Base class of configurable object
This class provide a way to configure an object using :
- mylib.config.Config object
- argparse.Namespace object
- kwargs passed to __init__ method
"""
# Configuration object name (used for default options prefix and config section)
# Note: required if options_prefix or/and config_section parameters not provided
# to __init__ method.
_config_name = None
# Configuration comment (used for config section)
_config_comment = None
# Default options value
# Important: all supported options MUST HAVE a default value defined
_defaults = {
"just_try": None,
}
# Store options passed throuht __init__ method
_kwargs = {}
_options = {}
_options_prefix = None
_config = None
_config_section = None
def __init__(
self, options=None, options_prefix=None, config=None, config_section=None, **kwargs
):
for key, value in kwargs.items():
assert key in self._defaults, f"Unknown {key} option"
self._kwargs[key] = value
if options:
self._options = options
if options_prefix is not None:
self._options_prefix = options_prefix
elif self._config_name:
self._options_prefix = self._config_name + "_"
else:
raise ConfigException(f"No configuration name defined for {__name__}")
if config:
self._config = config
if config_section:
self._config_section = config_section
elif self._config_name:
self._config_section = self._config_name
else:
raise ConfigException(f"No configuration name defined for {__name__}")
def _get_option(self, option, default=None, required=False):
"""Retrieve option value"""
if self._kwargs and option in self._kwargs:
return self._kwargs[option]
if self._options and hasattr(self._options, self._options_prefix + option):
return getattr(self._options, self._options_prefix + option)
if self._config and self._config.defined(self._config_section, option):
return self._config.get(self._config_section, option)
assert not required, f"Options {option} not defined"
return default if default is not None else self._defaults.get(option)
def _set_option(self, option, value):
"""Set option value"""
self._kwargs[option] = value
def set_default(self, option, default_value):
"""Set option default value"""
assert option in self._defaults, f"Unknown option {option}"
self._defaults[option] = default_value
def set_defaults(self, **default_values):
"""Set options default value"""
for option, default_value in default_values.items():
self.set_default(option, default_value)
def configure(
self,
comment=None,
just_try=False,
just_try_default=False,
just_try_help="Just-try mode",
**kwargs,
):
"""
Configure options on registered mylib.Config object
:param comment: Configuration section comment (default: self._config_comment)
:param just_try: Add just-try mode option (default: False)
:param just_try_default: Default just-try mode option value (default: False)
:param just_try_help: Default just-try mode option help message (default: "Just-try mode")
:param kwargs: Other provided parameters are directly passed to Config.add_section() method
"""
assert self._config, (
"mylib.Config object not registered. Must be passed to __init__ as config keyword"
" argument."
)
section = self._config.add_section(
self._config_section,
comment=comment if comment else self._config_comment,
loaded_callback=self.initialize,
**kwargs,
)
if just_try:
self._defaults["just_try"] = just_try_default
section.add_option(
BooleanOption,
"just_try",
default=self._defaults["just_try"],
comment=just_try_help if just_try_help else "Just-try mode",
)
return section
def initialize(self, loaded_config=None):
"""Configuration initialized hook"""
if loaded_config:
self.config = loaded_config # pylint: disable=attribute-defined-outside-init
@property
def _just_try(self):
"""Check if just-try mode is enabled"""
# If "just_try" provided to constructor, use it value
if "just_try" in self._kwargs:
log.debug(
"Just-try mode is %s by value passed to constructor",
"enabled" if self._kwargs["just_try"] else "disabled",
)
return self._kwargs["just_try"]
# If options provided and just-try option exist and is enabled, just-try mode enabled
if (
self._options
and hasattr(self._options, f"{self._options_prefix}just_try")
and getattr(self._options, f"{self._options_prefix}just_try")
):
log.debug("Just-try mode for %s is enabled", __class__.__name__)
return True
# If options provided and a just_try option exist and is enabled, just-try mode enabled
if (
self._options
and hasattr(self._options, "just_try")
and getattr(self._options, "just_try")
):
log.debug("Just-try mode is globally enabled")
return True
# If Config provided, config section defined and just-try enabled in config, just-try mode
# enabled
if (
self._config
and self._config.defined(self._config_section, "just_try")
and self._config.get(self._config_section, "just_try")
):
log.debug("Just-try mode for %s is enabled in configuration", self._config_section)
return True
# If Config provided, use it's get_option() method to obtain a global just_try parameter
# value with a default to False, otherwise always false
return self._config.get_option("just_try", default=False) if self._config else False
class ConfigSectionAsDictWrapper:
"""
Wrapper for ConfigSection that offer __getitems__ and __setitem__ methods
to allow access to section options as with a dictionary.
"""
__section = None
def __init__(self, section):
self.__section = section
def __getitem__(self, key):
return self.__section.get(key)
def __setitem__(self, key, value):
self.__section.set(key, value)
def __delitem__(self, key):
raise ConfigException("Deleting a configuration option is not supported")
# pylint: disable=too-few-public-methods
class StdoutInfoFilter(logging.Filter):
"""Logging filter to keep messages only >= logging.INFO"""
def filter(self, record):
return record.levelno in (logging.DEBUG, logging.INFO)