1403 lines
50 KiB
Python
1403 lines
50 KiB
Python
# pylint: disable=too-many-lines
|
|
|
|
""" Configuration & options parser """
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import re
|
|
import stat
|
|
import sys
|
|
import textwrap
|
|
import traceback
|
|
from configparser import ConfigParser
|
|
from getpass import getpass
|
|
from logging.config import fileConfig
|
|
|
|
import argcomplete
|
|
import keyring
|
|
from systemd.journal import JournalHandler
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# Constants
|
|
DEFAULT_ENCODING = "utf-8"
|
|
DEFAULT_CONFIG_DIRPATH = os.path.expanduser("./")
|
|
DEFAULT_LOG_FORMAT = "%(asctime)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s"
|
|
DEFAULT_CONSOLE_LOG_FORMAT = DEFAULT_LOG_FORMAT
|
|
DEFAULT_FILELOG_FORMAT = DEFAULT_LOG_FORMAT
|
|
|
|
|
|
class ConfigException(BaseException):
|
|
"""Configuration exception"""
|
|
|
|
|
|
class BaseOption: # pylint: disable=too-many-instance-attributes
|
|
"""Base configuration option class"""
|
|
|
|
def __init__(
|
|
self,
|
|
config,
|
|
section,
|
|
name,
|
|
default=None,
|
|
comment=None,
|
|
arg=None,
|
|
short_arg=None,
|
|
arg_help=None,
|
|
no_arg=False,
|
|
):
|
|
self.config = config
|
|
self.section = section
|
|
self.name = name
|
|
self.default = default
|
|
self.comment = comment
|
|
self.no_arg = no_arg
|
|
self.arg = arg
|
|
self.short_arg = short_arg
|
|
self.arg_help = arg_help if arg_help else comment
|
|
self._set = False
|
|
|
|
@property
|
|
def _isset_in_options(self):
|
|
"""Check if option is defined in registered arguments parser options"""
|
|
return self.config.options and not self.no_arg and self._from_options != self.default
|
|
|
|
@property
|
|
def _from_options(self):
|
|
"""Get option from arguments parser options"""
|
|
value = (
|
|
getattr(self.config.options, self.parser_dest)
|
|
if self.config.options and not self.no_arg
|
|
else None
|
|
)
|
|
log.debug("_from_options(%s, %s) = %s", self.section.name, self.name, value)
|
|
return value
|
|
|
|
@property
|
|
def _isset_in_config_file(self):
|
|
"""Check if option is defined in the loaded configuration file"""
|
|
return self.config.config_filepath and self.config.config_parser.has_option(
|
|
self.section.name, self.name
|
|
)
|
|
|
|
@property
|
|
def _from_config(self):
|
|
"""Get option value from ConfigParser"""
|
|
return self.config.config_parser.get(self.section.name, self.name)
|
|
|
|
@property
|
|
def _default_in_config(self):
|
|
"""Get option default value considering current value from configuration"""
|
|
return self._from_config if self._isset_in_config_file else self.default
|
|
|
|
def isset(self):
|
|
"""Check if option is defined in the loaded configuration file"""
|
|
return self._isset_in_config_file or self._isset_in_options
|
|
|
|
def get(self):
|
|
"""Get option value from options, config or default"""
|
|
if self._isset_in_options and not self._set:
|
|
return self._from_options
|
|
if self._isset_in_config_file:
|
|
return self._from_config
|
|
return self.default
|
|
|
|
def set(self, value):
|
|
"""Set option value to config file and options"""
|
|
if value == "":
|
|
value = None
|
|
|
|
if value == self.default or value is None:
|
|
# Remove option from config (is section exists)
|
|
if self.config.config_parser.has_section(self.section.name):
|
|
self.config.config_parser.remove_option(self.section.name, self.name)
|
|
else:
|
|
# Store option to config
|
|
if not self.config.config_parser.has_section(self.section.name):
|
|
self.config.config_parser.add_section(self.section.name)
|
|
|
|
self.config.config_parser.set(self.section.name, self.name, self.to_config(value))
|
|
|
|
self._set = True
|
|
|
|
def set_default(self, default_value):
|
|
"""Set option default value"""
|
|
self.default = default_value
|
|
|
|
@property
|
|
def parser_action(self):
|
|
"""Get action as accept by argparse.ArgumentParser"""
|
|
return "store"
|
|
|
|
@property
|
|
def parser_type(self):
|
|
"""Get type as handle by argparse.ArgumentParser"""
|
|
return str
|
|
|
|
@property
|
|
def parser_dest(self):
|
|
"""Get option name in arguments parser options"""
|
|
return f"{self.section.name}_{self.name}"
|
|
|
|
@property
|
|
def parser_help(self):
|
|
"""Get option help message in arguments parser options"""
|
|
if self.arg_help and self.default is not None:
|
|
# pylint: disable=consider-using-f-string
|
|
return "{} (Default: {})".format(
|
|
self.arg_help, re.sub(r"%([^%])", r"%%\1", str(self._default_in_config))
|
|
)
|
|
if self.arg_help:
|
|
return self.arg_help
|
|
return None
|
|
|
|
@property
|
|
def parser_argument_name(self):
|
|
"""Get option argument name in parser options"""
|
|
return (
|
|
self.arg if self.arg else f"--{self.section.name}-{self.name}".lower().replace("_", "-")
|
|
)
|
|
|
|
def add_option_to_parser(self, section_opts):
|
|
"""Add option to arguments parser"""
|
|
if self.no_arg:
|
|
return
|
|
args = [self.parser_argument_name]
|
|
if self.short_arg:
|
|
args.append(self.short_arg)
|
|
kwargs = {
|
|
"action": self.parser_action,
|
|
"dest": self.parser_dest,
|
|
"help": self.parser_help,
|
|
"default": self.default,
|
|
}
|
|
if self.parser_type: # pylint: disable=using-constant-test
|
|
kwargs["type"] = self.parser_type
|
|
|
|
log.debug(
|
|
"add_option_to_parser(%s, %s): argument name(s)=%s / kwargs=%s",
|
|
self.section.name,
|
|
self.name,
|
|
", ".join(args),
|
|
kwargs,
|
|
)
|
|
section_opts.add_argument(*args, **kwargs)
|
|
|
|
def to_config(self, value=None):
|
|
"""Format value as stored in configuration file"""
|
|
value = value if value is not None else self.get()
|
|
return "" if value is None else str(value)
|
|
|
|
def export_to_config(self):
|
|
"""Export option to configuration file"""
|
|
lines = []
|
|
if self.comment:
|
|
lines.append(f"# {self.comment}")
|
|
value = self.to_config()
|
|
default_value = "" if self.default is None else self.to_config(self.default)
|
|
log.debug(
|
|
"export_to_config(%s, %s): value=%s / default=%s",
|
|
self.section.name,
|
|
self.name,
|
|
value,
|
|
default_value,
|
|
)
|
|
if default_value:
|
|
if isinstance(default_value, str) and "\n" in default_value:
|
|
lines.append("# Default:")
|
|
lines.extend([f"# {line}" for line in default_value.split("\n")])
|
|
else:
|
|
lines.append(f"# Default: {default_value}")
|
|
if value and value != default_value:
|
|
if isinstance(value, str) and "\n" in value:
|
|
value_lines = value.split("\n")
|
|
lines.append(f"# Default: {value_lines[0]}")
|
|
lines.extend([f' {line.replace("#", "%(hash)s")}' for line in value_lines[1:]])
|
|
else:
|
|
lines.append(f"{self.name} = {value}")
|
|
else:
|
|
lines.append(f"# {self.name} =")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
@staticmethod
|
|
def _get_user_input(prompt):
|
|
"""
|
|
Get user console input
|
|
Note: do not use directly input() to allow to mock it in tests
|
|
"""
|
|
return input(prompt)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
if self.comment:
|
|
print(f"# {self.comment}")
|
|
default_value = kwargs.get("default_value", self.get())
|
|
if not prompt:
|
|
prompt = f"{self.name}: "
|
|
if default_value is not None:
|
|
if isinstance(default_value, str) and "\n" in default_value:
|
|
prompt += "[\n %s\n] " % "\n ".join(
|
|
self.to_config(default_value).split("\n")
|
|
)
|
|
else:
|
|
prompt += f"[{self.to_config(default_value)}] "
|
|
value = self._get_user_input(prompt)
|
|
return default_value if value == "" else value
|
|
|
|
def ask_value(self, set_it=True):
|
|
"""
|
|
Ask to user to enter value of this option and set or
|
|
return it regarding set parameter
|
|
"""
|
|
value = self._ask_value()
|
|
if set_it:
|
|
return self.set(value)
|
|
return value
|
|
|
|
|
|
class StringOption(BaseOption):
|
|
"""String configuration option class"""
|
|
|
|
|
|
class BooleanOption(BaseOption):
|
|
"""Boolean configuration option class"""
|
|
|
|
@property
|
|
def _from_config(self):
|
|
"""Get option value from ConfigParser"""
|
|
return self.config.config_parser.getboolean(self.section.name, self.name)
|
|
|
|
def to_config(self, value=None):
|
|
"""Format value as stored in configuration file"""
|
|
return super().to_config(value).lower()
|
|
|
|
@property
|
|
def _isset_in_options(self):
|
|
"""Check if option is defined in registered arguments parser options"""
|
|
return (
|
|
self.config.options
|
|
and not self.no_arg
|
|
and getattr(self.config.options, self.parser_dest)
|
|
)
|
|
|
|
@property
|
|
def _from_options(self):
|
|
"""Get option from arguments parser options"""
|
|
return not self._default_in_config if self._isset_in_options else self._default_in_config
|
|
|
|
@property
|
|
def parser_action(self):
|
|
return "store_true"
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return None
|
|
|
|
@property
|
|
def parser_argument_name(self):
|
|
"""Get option argument name in parser options"""
|
|
# pylint: disable=consider-using-f-string
|
|
return (
|
|
self.arg
|
|
if self.arg
|
|
else "--{}-{}-{}".format(
|
|
self.section.name, "enable" if not self._default_in_config else "disable", self.name
|
|
)
|
|
.lower()
|
|
.replace("_", "-")
|
|
)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
default_value = self.get()
|
|
prompt = f"{self.name}: "
|
|
if default_value:
|
|
prompt += "[Y/n] "
|
|
else:
|
|
prompt += "[y/N] "
|
|
while True:
|
|
value = super()._ask_value(prompt, **kwargs)
|
|
if value in ["", None, default_value]:
|
|
return default_value
|
|
if value.lower() == "y":
|
|
return True
|
|
if value.lower() == "n":
|
|
return False
|
|
|
|
print("Invalid answer. Possible values: Y or N (case insensitive)")
|
|
|
|
|
|
class FloatOption(BaseOption):
|
|
"""Float configuration option class"""
|
|
|
|
@property
|
|
def _from_config(self):
|
|
"""Get option value from ConfigParser"""
|
|
return self.config.config_parser.getfloat(self.section.name, self.name)
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return float
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
default_value = self.get()
|
|
while True:
|
|
value = super()._ask_value(prompt, **kwargs)
|
|
if value in ["", None, default_value]:
|
|
return default_value
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
print('Invalid answer. Must a numeric value, for instance "12" or "12.5"')
|
|
|
|
|
|
class IntegerOption(BaseOption):
|
|
"""Integer configuration option class"""
|
|
|
|
@property
|
|
def _from_config(self):
|
|
"""Get option value from ConfigParser"""
|
|
return self.config.config_parser.getint(self.section.name, self.name)
|
|
|
|
def to_config(self, value=None):
|
|
"""Format value as stored in configuration file"""
|
|
value = value if value is not None else self.get()
|
|
return str(int(value)) if value is not None else ""
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return int
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
default_value = kwargs.pop("default_value", self.get())
|
|
while True:
|
|
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
|
|
if value in ["", None, default_value]:
|
|
return default_value
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
print("Invalid answer. Must a integer value")
|
|
|
|
|
|
class OctalOption(BaseOption):
|
|
"""Octal configuration option class"""
|
|
|
|
@staticmethod
|
|
def octal(value):
|
|
"""Convert integer of integer string as octal string"""
|
|
return int(str(value), 8)
|
|
|
|
@property
|
|
def _from_config(self):
|
|
"""Get option value from ConfigParser"""
|
|
value = self.config.config_parser.getint(self.section.name, self.name)
|
|
print(value)
|
|
return self.octal(value)
|
|
|
|
def to_config(self, value=None):
|
|
"""Format value as stored in configuration file"""
|
|
value = value if value is not None else self.get()
|
|
return oct(value)[2:] if value is not None else ""
|
|
|
|
@property
|
|
def parser_type(self):
|
|
return self.octal
|
|
|
|
@property
|
|
def parser_help(self):
|
|
"""Get option help message in arguments parser options"""
|
|
if self.arg_help and self.default is not None:
|
|
# pylint: disable=consider-using-f-string
|
|
return "{} (Default: {})".format(
|
|
self.arg_help, re.sub(r"%([^%])", r"%%\1", oct(self._default_in_config)[2:])
|
|
)
|
|
if self.arg_help:
|
|
return self.arg_help
|
|
return None
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
default_value = kwargs.pop("default_value", self.get())
|
|
while True:
|
|
value = super()._ask_value(prompt, default_value=default_value, **kwargs)
|
|
if value in ["", None, default_value]:
|
|
return default_value
|
|
try:
|
|
return self.octal(value)
|
|
except ValueError:
|
|
print("Invalid answer. Must an octal value")
|
|
|
|
|
|
class PasswordOption(StringOption):
|
|
"""Password configuration option class"""
|
|
|
|
def __init__(self, *arg, username_option=None, keyring_value=None, **kwargs):
|
|
super().__init__(*arg, **kwargs)
|
|
self.username_option = username_option
|
|
self.keyring_value = keyring_value if keyring_value is not None else "keyring"
|
|
|
|
@property
|
|
def _keyring_service_name(self):
|
|
"""Return keyring service name"""
|
|
return ".".join([self.config.shortname, self.section.name, self.name])
|
|
|
|
@property
|
|
def _keyring_username(self):
|
|
"""Return keyring username"""
|
|
return self.section.get(self.username_option) if self.username_option else self.name
|
|
|
|
def get(self):
|
|
"""Get option value"""
|
|
value = super().get()
|
|
|
|
if value != self.keyring_value:
|
|
return value
|
|
|
|
service_name = self._keyring_service_name
|
|
username = self._keyring_username
|
|
log.debug("Retreive password %s for username=%s from keyring", service_name, username)
|
|
value = keyring.get_password(service_name, username)
|
|
|
|
if value is None:
|
|
# pylint: disable=consider-using-f-string
|
|
value = getpass(
|
|
"Please enter {}{}: ".format(
|
|
f"{self.section.name} {self.name}",
|
|
f" for {username}" if username != self.name else "",
|
|
)
|
|
)
|
|
keyring.set_password(service_name, username, value)
|
|
return value
|
|
|
|
def to_config(self, value=None):
|
|
"""Format value as stored in configuration file"""
|
|
if super().get() == self.keyring_value:
|
|
return self.keyring_value
|
|
return super().to_config(value)
|
|
|
|
def set(self, value, use_keyring=None): # pylint: disable=arguments-differ
|
|
"""Set option value to config file"""
|
|
if (use_keyring is None and super().get() == self.keyring_value) or use_keyring:
|
|
keyring.set_password(self._keyring_service_name, self._keyring_username, value)
|
|
value = self.keyring_value
|
|
super().set(value)
|
|
|
|
def _ask_value(self, prompt=None, **kwargs):
|
|
"""Ask to user to enter value of this option and return it"""
|
|
if self.comment:
|
|
print("# " + self.comment)
|
|
default_value = kwargs.pop("default_value", self.get())
|
|
if not prompt:
|
|
prompt = f"{self.name}: "
|
|
if default_value is not None:
|
|
# Hide value only if it differed from default value
|
|
if default_value == self.default:
|
|
prompt += f"[{default_value}] "
|
|
else:
|
|
prompt += "[secret defined, leave to empty to keep it as unchange] "
|
|
value = getpass(prompt)
|
|
return default_value if value == "" else value
|
|
|
|
def ask_value(self, set_it=True):
|
|
"""
|
|
Ask to user to enter value of this option and set or
|
|
return it regarding set parameter
|
|
"""
|
|
value = self._ask_value()
|
|
if set_it:
|
|
use_keyring = None
|
|
default_use_keyring = super().get() == self.keyring_value
|
|
while use_keyring is None:
|
|
prompt = (
|
|
f"Do you want to use XDG keyring ? [{'Y/n' if default_use_keyring else 'y/N'}] "
|
|
)
|
|
result = input(prompt).lower()
|
|
if result == "":
|
|
use_keyring = default_use_keyring
|
|
elif result == "y":
|
|
use_keyring = True
|
|
elif result == "n":
|
|
use_keyring = False
|
|
else:
|
|
print("Invalid answer. Possible values: Y or N (case insensitive)")
|
|
return self.set(value, use_keyring=use_keyring)
|
|
return value
|
|
|
|
|
|
class ConfigSection:
|
|
"""Configuration section class"""
|
|
|
|
def __init__(self, config, name, comment=None, order=None):
|
|
self.config = config
|
|
self.name = name
|
|
self.options = {}
|
|
self.comment = comment
|
|
self.order = order if isinstance(order, int) else 10
|
|
|
|
def add_option(self, _type, name, **kwargs):
|
|
"""
|
|
Add option
|
|
|
|
:param _type: Option type, derivated from BaseOption
|
|
:param name: Option name
|
|
:param **kwargs: Dict of raw option for type class
|
|
"""
|
|
assert not self.defined(name), f"Duplicated option {name}"
|
|
self.options[name] = _type(self.config, self, name, **kwargs)
|
|
return self.options[name]
|
|
|
|
def defined(self, option):
|
|
"""Check if option is defined"""
|
|
return option in self.options
|
|
|
|
def isset(self, option):
|
|
"""Check if option is set"""
|
|
return self.defined(option) and self.options[option].isset()
|
|
|
|
def get(self, option):
|
|
"""Get option value"""
|
|
assert self.defined(option), f"Option {option} unknown"
|
|
return self.options[option].get()
|
|
|
|
def set(self, option, value):
|
|
"""Set option value"""
|
|
assert self.defined(option), f"Option {option} unknown"
|
|
return self.options[option].set(value)
|
|
|
|
def set_default(self, option, default_value):
|
|
"""Set default option value"""
|
|
assert self.defined(option), f"Option {option} unknown"
|
|
return self.options[option].set_default(default_value)
|
|
|
|
def set_defaults(self, **default_values):
|
|
"""Set default options value"""
|
|
for option, default_value in default_values.items():
|
|
self.set_default(option, default_value)
|
|
|
|
def add_options_to_parser(self, parser):
|
|
"""Add section to argparse.ArgumentParser"""
|
|
assert isinstance(parser, argparse.ArgumentParser)
|
|
section_opts = parser.add_argument_group(
|
|
self.comment if self.comment else self.name.capitalize()
|
|
)
|
|
|
|
for option in self.options: # pylint: disable=consider-using-dict-items
|
|
self.options[option].add_option_to_parser(section_opts)
|
|
|
|
def export_to_config(self):
|
|
"""Export section and their options to configuration file"""
|
|
lines = []
|
|
if self.comment:
|
|
lines.append(f"# {self.comment}")
|
|
lines.append(f"[{self.name}]")
|
|
for option in self.options: # pylint: disable=consider-using-dict-items
|
|
lines.append(self.options[option].export_to_config())
|
|
return "\n".join(lines)
|
|
|
|
def ask_values(self, set_it=True):
|
|
"""
|
|
Ask user to enter value for each configuration option of the section
|
|
|
|
:param set_it: If True (default), option value will be updated with user input
|
|
|
|
:return: If set_it is True, return True if valid value for each configuration
|
|
option have been retrieved and set. If False, return a dict of configuration
|
|
options and their value.
|
|
:rtype: bool of dict
|
|
"""
|
|
if self.comment:
|
|
print(f"# {self.comment}")
|
|
print(f"[{self.name}]\n")
|
|
result = {}
|
|
error = False
|
|
for name, option in self.options.items():
|
|
option_result = option.ask_value(set_it=set_it)
|
|
if set_it:
|
|
result[name] = option_result
|
|
elif not option_result:
|
|
error = True
|
|
print()
|
|
print()
|
|
if set_it:
|
|
return not error
|
|
return result
|
|
|
|
|
|
class RawWrappedTextHelpFormatter(argparse.RawDescriptionHelpFormatter):
|
|
"""
|
|
Custom TextHelpFormatter for argparse.ArgumentParser that allowing line to keep line return
|
|
"""
|
|
|
|
def _split_lines(self, text, width):
|
|
result = []
|
|
for line in textwrap.dedent(text).splitlines():
|
|
# Keep empty line
|
|
if line == "":
|
|
result.append(line)
|
|
continue
|
|
# Split ident prefix and line text
|
|
m = re.match("^( *)(.*)$", line)
|
|
ident = m.group(1)
|
|
line_text = m.group(2)
|
|
# Wrap each lines and add in result with ident prefix
|
|
for subline in textwrap.wrap(line_text, width - len(ident)):
|
|
result.append(ident + subline)
|
|
return result
|
|
|
|
|
|
class Config: # pylint: disable=too-many-instance-attributes
|
|
"""Configuration helper"""
|
|
|
|
def __init__(
|
|
self,
|
|
appname,
|
|
shortname=None,
|
|
version=None,
|
|
encoding=None,
|
|
config_file_env_variable=None,
|
|
default_config_dirpath=None,
|
|
):
|
|
self.appname = appname
|
|
self.shortname = shortname
|
|
self.version = version if version else "0.0"
|
|
self.encoding = encoding if encoding else "utf-8"
|
|
self.config_parser = None
|
|
self.options_parser = None
|
|
self.options = None
|
|
self.sections = {}
|
|
self._loaded_callbacks = []
|
|
self._loaded_callbacks_executed = []
|
|
self._filepath = None
|
|
self.config_file_env_variable = config_file_env_variable
|
|
self.default_config_dirpath = default_config_dirpath
|
|
self._init_config_parser()
|
|
|
|
def add_section(self, name, loaded_callback=None, **kwargs):
|
|
"""
|
|
Add section
|
|
|
|
: param name: The section name
|
|
: param loaded_callback: An optional callback method that will be executed after
|
|
configuration is loaded. The specified callback method will receive
|
|
Config object as parameter.
|
|
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
|
|
"""
|
|
assert name not in self.sections, f"Duplicated section {name}"
|
|
|
|
self.sections[name] = ConfigSection(self, name, **kwargs)
|
|
if loaded_callback:
|
|
self._loaded_callbacks.append(loaded_callback)
|
|
# If configuration is already loaded, execute callback immediatly
|
|
if self._filepath or self.options:
|
|
self._loaded()
|
|
return self.sections[name]
|
|
|
|
def defined(self, section, option):
|
|
"""Check option is defined in specified section"""
|
|
return section in self.sections and self.sections[section].defined(option)
|
|
|
|
def isset(self, section, option):
|
|
"""Check option is set in specified section"""
|
|
return section in self.sections and self.sections[section].isset(option)
|
|
|
|
def get(self, section, option):
|
|
"""Get option value"""
|
|
assert self.defined(section, option), f"Unknown option {section}.{option}"
|
|
value = self.sections[section].get(option)
|
|
log.debug("get(%s, %s): %s (%s)", section, option, value, type(value))
|
|
return value
|
|
|
|
def get_option(self, option, default=None):
|
|
"""Get an argument parser option value"""
|
|
if self.options and hasattr(self.options, option):
|
|
return getattr(self.options, option)
|
|
return default
|
|
|
|
def __getitem__(self, key):
|
|
assert key in self.sections, f"Unknown section {key}"
|
|
return ConfigSectionAsDictWrapper(self.sections[key])
|
|
|
|
def set(self, section, option, value):
|
|
"""Set option value"""
|
|
assert self.defined(section, option), f"Unknown option {section}.{option}"
|
|
self._init_config_parser()
|
|
self.sections[section].set(option, value)
|
|
|
|
def set_default(self, section, option, default_value):
|
|
"""Set default option value"""
|
|
assert self.defined(section, option), f"Unknown option {section}.{option}"
|
|
self._init_config_parser()
|
|
self.sections[section].set_default(option, default_value)
|
|
|
|
def set_defaults(self, section, **default_values):
|
|
"""Set default options value"""
|
|
assert section in self.sections, f"Unknown section {section}"
|
|
self._init_config_parser()
|
|
self.sections[section].set_defaults(**default_values)
|
|
|
|
def _init_config_parser(self, force=False):
|
|
"""Initialize ConfigParser object"""
|
|
if not self.config_parser or force:
|
|
self.config_parser = ConfigParser(defaults={"hash": "#"})
|
|
|
|
def load_file(self, filepath, execute_callback=True):
|
|
"""Read configuration file"""
|
|
self._init_config_parser(force=True)
|
|
self._filepath = filepath
|
|
|
|
# Checking access of configuration file
|
|
if not os.path.isfile(filepath):
|
|
return True
|
|
|
|
if not os.access(filepath, os.R_OK):
|
|
return False
|
|
|
|
try:
|
|
self.config_parser.read(filepath, encoding=self.encoding)
|
|
except Exception: # pylint: disable=broad-except
|
|
self._init_config_parser(force=True)
|
|
log.exception("Failed to read configuration file %s", filepath)
|
|
return False
|
|
|
|
# Logging initialization
|
|
if self.config_parser.has_section("loggers"):
|
|
fileConfig(filepath)
|
|
else:
|
|
# Otherwise, use systemd journal handler
|
|
handler = JournalHandler(SYSLOG_IDENTIFIER=self.shortname)
|
|
handler.setFormatter(logging.Formatter("%(levelname)s | %(name)s | %(message)s"))
|
|
logging.getLogger().addHandler(handler)
|
|
|
|
self._filepath = filepath
|
|
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
return True
|
|
|
|
def _loaded(self):
|
|
"""Execute loaded callbacks"""
|
|
error = False
|
|
for callback in self._loaded_callbacks:
|
|
if callback in self._loaded_callbacks_executed:
|
|
continue
|
|
if not callback(self):
|
|
error = True
|
|
self._loaded_callbacks_executed.append(callback)
|
|
return not error
|
|
|
|
def save(self, filepath=None):
|
|
"""Save configuration file"""
|
|
filepath = filepath if filepath else self._filepath
|
|
assert filepath, "Configuration filepath is not set or provided"
|
|
|
|
# Checking access of target file/directory
|
|
dirpath = os.path.dirname(filepath)
|
|
if os.path.isfile(filepath):
|
|
if not os.access(filepath, os.W_OK):
|
|
log.error('Configuration file "%s" is not writable', filepath)
|
|
return False
|
|
elif not os.path.isdir(dirpath) or not os.access(dirpath, os.R_OK | os.W_OK | os.X_OK):
|
|
log.error('Configuration directory "%s" does not exist (or not writable)', dirpath)
|
|
return False
|
|
|
|
lines = [f"#\n# {self.appname} configuration\n#\n"]
|
|
|
|
for section_name in self._ordered_section_names:
|
|
lines.append("")
|
|
lines.append(self.sections[section_name].export_to_config())
|
|
|
|
try:
|
|
with open(filepath, "wb") as fd:
|
|
fd.write("\n".join(lines).encode(self.encoding))
|
|
|
|
# Privacy!
|
|
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
|
|
except Exception: # pylint: disable=broad-except
|
|
log.exception("Failed to write generated configuration file %s", filepath)
|
|
return False
|
|
self.load_file(filepath)
|
|
return True
|
|
|
|
@property
|
|
def _ordered_section_names(self):
|
|
"""Get ordered list of section names"""
|
|
return sorted(self.sections.keys(), key=lambda section: self.sections[section].order)
|
|
|
|
def get_arguments_parser(self, reconfigure=False, **kwargs):
|
|
"""Get arguments parser"""
|
|
if self.options_parser:
|
|
return self.options_parser
|
|
|
|
self.options_parser = argparse.ArgumentParser(
|
|
description=kwargs.pop("description", self.appname),
|
|
formatter_class=RawWrappedTextHelpFormatter,
|
|
**kwargs,
|
|
)
|
|
|
|
config_file_help = f"Configuration file to use (default: {self.config_filepath})"
|
|
if self.config_file_env_variable:
|
|
config_file_help += (
|
|
"\n\nYou also could set "
|
|
f"{self.config_file_env_variable} environment variable to "
|
|
"specify your configuration file path."
|
|
)
|
|
self.options_parser.add_argument(
|
|
"-c", "--config", default=self.config_filepath, help=config_file_help
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
"--save",
|
|
action="store_true",
|
|
dest="save",
|
|
help="Save current configuration to file",
|
|
)
|
|
|
|
if reconfigure:
|
|
self.options_parser.add_argument(
|
|
"--reconfigure",
|
|
action="store_true",
|
|
dest="mylib_config_reconfigure",
|
|
help="Reconfigure and update configuration file",
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
"-d", "--debug", action="store_true", help="Show debug messages"
|
|
)
|
|
|
|
self.options_parser.add_argument(
|
|
"-v", "--verbose", action="store_true", help="Show verbose messages"
|
|
)
|
|
|
|
console_section = self.add_section("console", comment="Console logging")
|
|
console_section.add_option(
|
|
BooleanOption,
|
|
"enabled",
|
|
default=False,
|
|
arg="--console",
|
|
short_arg="-C",
|
|
comment="Enable/disable console log",
|
|
)
|
|
console_section.add_option(
|
|
BooleanOption,
|
|
"force_stderr",
|
|
default=False,
|
|
arg="--console-stderr",
|
|
comment="Force console log on stderr",
|
|
)
|
|
console_section.add_option(
|
|
StringOption,
|
|
"log_format",
|
|
default=DEFAULT_CONSOLE_LOG_FORMAT,
|
|
arg="--console-log-format",
|
|
comment="Console log format",
|
|
)
|
|
console_section.add_option(
|
|
StringOption,
|
|
"log_level",
|
|
comment=(
|
|
"Console log level limit : by default, all logged messages (according to main log "
|
|
"level) will be logged to the console, but you can set a minimal level if you "
|
|
# logging.getLevelNamesMapping() not available in python 3.9
|
|
# pylint: disable=protected-access
|
|
f"want. Possible values: {', '.join(logging._nameToLevel)}."
|
|
),
|
|
)
|
|
|
|
logfile_section = self.add_section("logfile", comment="Logging file")
|
|
logfile_section.add_option(StringOption, "path", comment="File log path")
|
|
logfile_section.add_option(
|
|
StringOption,
|
|
"format",
|
|
default=DEFAULT_FILELOG_FORMAT,
|
|
comment="File log format",
|
|
)
|
|
logfile_section.add_option(
|
|
StringOption,
|
|
"level",
|
|
comment=(
|
|
"File log level limit : by default, all logged messages (according to main log "
|
|
"level) will be logged to the log file, but you can set a minimal level if you "
|
|
# logging.getLevelNamesMapping() not available in python 3.9
|
|
# pylint: disable=protected-access
|
|
f"want. Possible values: {', '.join(logging._nameToLevel)}."
|
|
),
|
|
)
|
|
|
|
self.add_options_to_parser(self.options_parser)
|
|
|
|
return self.options_parser
|
|
|
|
def parse_arguments_options(
|
|
self,
|
|
argv=None,
|
|
parser=None,
|
|
create=True,
|
|
ask_values=True,
|
|
exit_after_created=True,
|
|
execute_callback=True,
|
|
hardcoded_options=None,
|
|
):
|
|
"""
|
|
Parse arguments options
|
|
|
|
:param argv: Optional arguments list to parse (default: sys.argv[1:])
|
|
:param parser: Optional argparse.ArgumentParser use
|
|
(default: generated by self.get_arguments_parser())
|
|
:param create: If True, configuration file will be created if it does not exits
|
|
(default: True)
|
|
:param ask_values: If True, ask user to enter valor of each configuration options
|
|
(default: True)
|
|
:param exit_after_created: If True, script will end after configuration file
|
|
creation (default: True)
|
|
:param execute_callback: Sections's loaded callbacks will be executed only if True
|
|
(default: True)
|
|
:param hardcoded_options: Optional hard-coded options to set after loading arguments
|
|
and configuration file. These options are passed using an
|
|
list of tuple of 3 elements: the section and the option
|
|
names and the value.
|
|
[('section1', 'option1', value), ...]
|
|
"""
|
|
parser = parser if parser else self.get_arguments_parser()
|
|
argcomplete.autocomplete(parser)
|
|
options = parser.parse_args(argv if argv is not None else sys.argv[1:])
|
|
self.load_options(options, execute_callback=False)
|
|
|
|
if options.config:
|
|
options.config = os.path.abspath(options.config)
|
|
|
|
already_saved = False
|
|
if not os.path.isfile(options.config) and (create or options.save):
|
|
log.warning("Configuration file is missing, generate it (%s)", options.config)
|
|
if ask_values:
|
|
self.ask_values(set_it=True)
|
|
self.save(options.config)
|
|
if exit_after_created:
|
|
sys.exit(0)
|
|
already_saved = True
|
|
|
|
# Load configuration file
|
|
if os.path.isfile(options.config) and not self.load_file(
|
|
options.config, execute_callback=False
|
|
):
|
|
parser.error(f"Failed to load configuration from file {options.config}")
|
|
|
|
if options.save and not already_saved:
|
|
self.save()
|
|
sys.exit(0)
|
|
|
|
if options.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
elif options.verbose:
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
if hardcoded_options:
|
|
assert isinstance(hardcoded_options, list), (
|
|
"hardcoded_options must be a list of tuple of 3 elements: "
|
|
"the section and the option names and the value."
|
|
)
|
|
for opt_info in hardcoded_options:
|
|
assert isinstance(opt_info, tuple) and len(opt_info) == 3, (
|
|
"Invalid hard-coded option value: it must be a tuple of 3 "
|
|
"elements: the section and the option names and the value."
|
|
)
|
|
self.set(*opt_info)
|
|
|
|
if self.get("console", "enabled"):
|
|
console_log_level = (
|
|
# logging.getLevelNamesMapping() not available in python 3.9
|
|
# pylint: disable=protected-access
|
|
logging._nameToLevel.get(self.get("console", "log_level"))
|
|
if self.get("console", "log_level")
|
|
else logging.DEBUG
|
|
)
|
|
if console_log_level < logging.WARNING:
|
|
stdout_console_handler = logging.StreamHandler(
|
|
sys.stderr if self.get("console", "force_stderr") else sys.stdout
|
|
)
|
|
stdout_console_handler.addFilter(StdoutInfoFilter())
|
|
stdout_console_handler.setLevel(console_log_level)
|
|
|
|
stderr_console_handler = logging.StreamHandler(sys.stderr)
|
|
stderr_console_handler.setLevel(
|
|
console_log_level if console_log_level > logging.WARNING else logging.WARNING
|
|
)
|
|
|
|
if self.get("console", "log_format"):
|
|
console_formater = logging.Formatter(self.get("console", "log_format"))
|
|
if console_log_level < logging.WARNING:
|
|
stdout_console_handler.setFormatter(console_formater)
|
|
stderr_console_handler.setFormatter(console_formater)
|
|
|
|
if console_log_level < logging.WARNING:
|
|
logging.getLogger().addHandler(stdout_console_handler)
|
|
logging.getLogger().addHandler(stderr_console_handler)
|
|
|
|
if self.get("logfile", "path"):
|
|
logfile_handler = logging.FileHandler(self.get("logfile", "path"))
|
|
logfile_level = (
|
|
# logging.getLevelNamesMapping() not available in python 3.9
|
|
# pylint: disable=protected-access
|
|
logging._nameToLevel.get(self.get("logfile", "level"))
|
|
if self.get("logfile", "level")
|
|
else logging.DEBUG
|
|
)
|
|
if logfile_level is None:
|
|
log.fatal("Invalid log file level specified (%s)", self.get("logfile", "level"))
|
|
sys.exit(1)
|
|
logfile_handler.setLevel(logfile_level)
|
|
|
|
if self.get("logfile", "format"):
|
|
logfile_formater = logging.Formatter(self.get("logfile", "format"))
|
|
logfile_handler.setFormatter(logfile_formater)
|
|
|
|
logging.getLogger().addHandler(logfile_handler)
|
|
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
if self.get_option("mylib_config_reconfigure", default=False):
|
|
if self.ask_values(set_it=True) and self.save():
|
|
sys.exit(0)
|
|
sys.exit(1)
|
|
|
|
return options
|
|
|
|
def load_options(self, options, execute_callback=True):
|
|
"""Register arguments parser options"""
|
|
assert isinstance(options, argparse.Namespace)
|
|
self.options = options
|
|
log.debug("Argument options: %s", options)
|
|
if execute_callback:
|
|
self._loaded()
|
|
|
|
def add_options_to_parser(self, parser):
|
|
"""Add sections and their options to parser"""
|
|
for section in self._ordered_section_names:
|
|
self.sections[section].add_options_to_parser(parser)
|
|
|
|
def ask_values(self, set_it=True, execute_callback=False):
|
|
"""
|
|
Ask user to enter value for each configuration option
|
|
|
|
:param set_it: If True (default), option value will be updated with user input
|
|
:param execute_callback: Sections's loaded callbacks will be finally executed
|
|
(only if set_it is True, default: False)
|
|
|
|
:return: If set_it is True, return True if valid value for each configuration
|
|
option have been retrieved and set. If False, return a dict of configuration
|
|
section and their options value.
|
|
:rtype: bool of dict
|
|
"""
|
|
result = {}
|
|
error = False
|
|
for name, section in self.sections.items():
|
|
section_result = section.ask_values(set_it=set_it)
|
|
if not set_it:
|
|
result[name] = section_result
|
|
elif not section_result:
|
|
error = True
|
|
if set_it:
|
|
if error:
|
|
return False
|
|
if execute_callback:
|
|
self._loaded()
|
|
return True
|
|
return result
|
|
|
|
def configure(self, argv=None, description=False):
|
|
"""
|
|
Entry point of a script you could use to created your configuration file
|
|
|
|
Note: make sure to load to define all your configuration section and options
|
|
before running it.
|
|
"""
|
|
parser = self.get_arguments_parser(
|
|
description=description if description else "Generate configuration file"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-i",
|
|
"--interactive",
|
|
action="store_true",
|
|
dest="interactive",
|
|
help="Enable configuration interactive mode",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-O",
|
|
"--overwrite",
|
|
action="store_true",
|
|
dest="overwrite",
|
|
help="Overwrite configuration file if exists",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-V",
|
|
"--validate",
|
|
action="store_true",
|
|
dest="validate",
|
|
help=(
|
|
"Validate configuration: initialize application to test if provided parameters"
|
|
" works.\n\nNote: Validation will occured after configuration file creation or"
|
|
" update. On error, re-run with -O/--overwrite parameter to fix it."
|
|
),
|
|
)
|
|
|
|
options = self.parse_arguments_options(argv, create=False, execute_callback=False)
|
|
|
|
if os.path.exists(options.config) and not options.overwrite:
|
|
print(f"Configuration file {options.config} already exists")
|
|
sys.exit(1)
|
|
|
|
if options.interactive:
|
|
self.ask_values(set_it=True)
|
|
|
|
if self.save(options.config):
|
|
print(f"Configuration file {options.config} created.")
|
|
if options.validate:
|
|
print("Validate your configuration...")
|
|
try:
|
|
if self._loaded():
|
|
print("Your configuration seem valid.")
|
|
else:
|
|
print(
|
|
"Error(s) occurred validating your configuration. See logs for details."
|
|
)
|
|
sys.exit(1)
|
|
except Exception: # pylint: disable=broad-except
|
|
print(
|
|
"Exception occurred validating your configuration:\n"
|
|
f"{traceback.format_exc()}"
|
|
"\n\nSee logs for details."
|
|
)
|
|
sys.exit(2)
|
|
else:
|
|
print(f"Error occured creating configuration file {options.config}")
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|
|
@property
|
|
def config_dir(self):
|
|
"""Retrieve configuration directory path"""
|
|
if self._filepath:
|
|
return os.path.dirname(self._filepath)
|
|
if self.default_config_dirpath:
|
|
return self.default_config_dirpath
|
|
return DEFAULT_CONFIG_DIRPATH
|
|
|
|
@property
|
|
def config_filepath(self):
|
|
"""Retrieve configuration file path"""
|
|
if self._filepath:
|
|
return self._filepath
|
|
if self.config_file_env_variable and os.environ.get(self.config_file_env_variable):
|
|
return os.environ.get(self.config_file_env_variable)
|
|
return os.path.join(
|
|
self.config_dir, f"{self.shortname}.ini" if self.shortname else "config.ini"
|
|
)
|
|
|
|
|
|
class ConfigurableObject:
|
|
"""
|
|
Base class of configurable object
|
|
|
|
This class provide a way to configure an object using :
|
|
- mylib.config.Config object
|
|
- argparse.Namespace object
|
|
- kwargs passed to __init__ method
|
|
"""
|
|
|
|
# Configuration object name (used for default options prefix and config section)
|
|
# Note: required if options_prefix or/and config_section parameters not provided
|
|
# to __init__ method.
|
|
_config_name = None
|
|
|
|
# Configuration comment (used for config section)
|
|
_config_comment = None
|
|
|
|
# Default options value
|
|
# Important: all supported options MUST HAVE a default value defined
|
|
_defaults = {
|
|
"just_try": None,
|
|
}
|
|
|
|
# Store options passed throuht __init__ method
|
|
_kwargs = {}
|
|
_options = {}
|
|
_options_prefix = None
|
|
_config = None
|
|
_config_section = None
|
|
|
|
def __init__(
|
|
self, options=None, options_prefix=None, config=None, config_section=None, **kwargs
|
|
):
|
|
for key, value in kwargs.items():
|
|
assert key in self._defaults, f"Unknown {key} option"
|
|
self._kwargs[key] = value
|
|
|
|
if options:
|
|
self._options = options
|
|
if options_prefix is not None:
|
|
self._options_prefix = options_prefix
|
|
elif self._config_name:
|
|
self._options_prefix = self._config_name + "_"
|
|
else:
|
|
raise ConfigException(f"No configuration name defined for {__name__}")
|
|
|
|
if config:
|
|
self._config = config
|
|
if config_section:
|
|
self._config_section = config_section
|
|
elif self._config_name:
|
|
self._config_section = self._config_name
|
|
else:
|
|
raise ConfigException(f"No configuration name defined for {__name__}")
|
|
|
|
def _get_option(self, option, default=None, required=False):
|
|
"""Retreive option value"""
|
|
if self._kwargs and option in self._kwargs:
|
|
return self._kwargs[option]
|
|
|
|
if self._options and hasattr(self._options, self._options_prefix + option):
|
|
return getattr(self._options, self._options_prefix + option)
|
|
|
|
if self._config and self._config.defined(self._config_section, option):
|
|
return self._config.get(self._config_section, option)
|
|
|
|
assert not required, f"Options {option} not defined"
|
|
|
|
return default if default is not None else self._defaults.get(option)
|
|
|
|
def set_default(self, option, default_value):
|
|
"""Set option default value"""
|
|
assert option in self._defaults, f"Unkown option {option}"
|
|
self._defaults[option] = default_value
|
|
|
|
def set_defaults(self, **default_values):
|
|
"""Set options default value"""
|
|
for option, default_value in default_values.items():
|
|
self.set_default(option, default_value)
|
|
|
|
def configure(
|
|
self,
|
|
comment=None,
|
|
just_try=False,
|
|
just_try_default=False,
|
|
just_try_help="Just-try mode",
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Configure options on registered mylib.Config object
|
|
:param comment: Configuration section comment (default: self._config_comment)
|
|
:param just_try: Add just-try mode option (default: False)
|
|
:param just_try_default: Default just-try mode option value (default: False)
|
|
:param just_try_help: Default just-try mode option help message (default: "Just-try mode")
|
|
:param kwargs: Other provided parameters are directly passed to Config.add_section() method
|
|
"""
|
|
assert self._config, (
|
|
"mylib.Config object not registered. Must be passed to __init__ as config keyword"
|
|
" argument."
|
|
)
|
|
|
|
section = self._config.add_section(
|
|
self._config_section,
|
|
comment=comment if comment else self._config_comment,
|
|
loaded_callback=self.initialize,
|
|
**kwargs,
|
|
)
|
|
|
|
if just_try:
|
|
self._defaults["just_try"] = just_try_default
|
|
section.add_option(
|
|
BooleanOption,
|
|
"just_try",
|
|
default=self._defaults["just_try"],
|
|
comment=just_try_help if just_try_help else "Just-try mode",
|
|
)
|
|
|
|
return section
|
|
|
|
def initialize(self, loaded_config=None):
|
|
"""Configuration initialized hook"""
|
|
if loaded_config:
|
|
self.config = loaded_config # pylint: disable=attribute-defined-outside-init
|
|
|
|
@property
|
|
def _just_try(self):
|
|
"""Check if just-try mode is enabled"""
|
|
# If "just_try" provided to constructor, use it value
|
|
if "just_try" in self._kwargs:
|
|
log.debug(
|
|
"Just-try mode is %s by value passed to constructor",
|
|
"enabled" if self._kwargs["just_try"] else "disabled",
|
|
)
|
|
return self._kwargs["just_try"]
|
|
|
|
# If options provided and just-try option exist and is enabled, just-try mode enabled
|
|
if (
|
|
self._options
|
|
and hasattr(self._options, f"{self._options_prefix}just_try")
|
|
and getattr(self._options, f"{self._options_prefix}just_try")
|
|
):
|
|
log.debug("Just-try mode for %s is enabled", __class__.__name__)
|
|
return True
|
|
|
|
# If options provided and a just_try option exist and is enabled, just-try mode enabled
|
|
if (
|
|
self._options
|
|
and hasattr(self._options, "just_try")
|
|
and getattr(self._options, "just_try")
|
|
):
|
|
log.debug("Just-try mode is globally enabled")
|
|
return True
|
|
|
|
# If Config provided, config section defined and just-try enabled in config, just-try mode
|
|
# enabled
|
|
if (
|
|
self._config
|
|
and self._config.defined(self._config_section, "just_try")
|
|
and self._config.get(self._config_section, "just_try")
|
|
):
|
|
log.debug("Just-try mode for %s is enabled in configuration", self._config_section)
|
|
return True
|
|
|
|
# If Config provided, use it's get_option() method to obtain a global just_try parameter
|
|
# value with a defaut to False, otherwise always false
|
|
return self._config.get_option("just_try", default=False) if self._config else False
|
|
|
|
|
|
class ConfigSectionAsDictWrapper:
|
|
"""
|
|
Wrapper for ConfigSection that offer __getitems__ and __setitem__ methods
|
|
to allow access to section options as with a dictionary.
|
|
"""
|
|
|
|
__section = None
|
|
|
|
def __init__(self, section):
|
|
self.__section = section
|
|
|
|
def __getitem__(self, key):
|
|
return self.__section.get(key)
|
|
|
|
def __setitem__(self, key, value):
|
|
self.__section.set(key, value)
|
|
|
|
def __delitem__(self, key):
|
|
raise ConfigException("Deleting a configuration option is not supported")
|
|
|
|
|
|
# pylint: disable=too-few-public-methods
|
|
class StdoutInfoFilter(logging.Filter):
|
|
"""Logging filter to keep messages only >= logging.INFO"""
|
|
|
|
def filter(self, record):
|
|
return record.levelno in (logging.DEBUG, logging.INFO)
|