Compare commits

...

2 commits

Author SHA1 Message Date
Benjamin Renard
eb183b0d3b config: split console logging between stdout & stderr base on level
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2023-01-06 18:19:48 +01:00
Benjamin Renard
55782df47c config: code cleaning 2023-01-06 18:19:48 +01:00
2 changed files with 96 additions and 51 deletions

View file

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# pylint: disable=too-many-lines
""" Configuration & options parser """ """ Configuration & options parser """
@ -136,14 +137,17 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
@property @property
def parser_dest(self): def parser_dest(self):
""" Get option name in arguments parser options """ """ Get option name in arguments parser options """
return '{0}_{1}'.format(self.section.name, self.name) return f'{self.section.name}_{self.name}'
@property @property
def parser_help(self): def parser_help(self):
""" Get option help message in arguments parser options """ """ Get option help message in arguments parser options """
if self.arg_help and self.default is not None: if self.arg_help and self.default is not None:
# pylint: disable=consider-using-f-string
return '{0} (Default: {1})'.format( return '{0} (Default: {1})'.format(
self.arg_help, re.sub(r'%([^%])', r'%%\1', str(self.default))) self.arg_help,
re.sub(r'%([^%])', r'%%\1', str(self._default_in_config))
)
if self.arg_help: if self.arg_help:
return self.arg_help return self.arg_help
return None return None
@ -153,9 +157,7 @@ class BaseOption: # pylint: disable=too-many-instance-attributes
""" Get option argument name in parser options """ """ Get option argument name in parser options """
return ( return (
self.arg if self.arg else self.arg if self.arg else
'--{0}-{1}'.format( f'--{self.section.name}-{self.name}'.lower().replace('_', '-')
self.section.name, self.name
).lower().replace('_', '-')
) )
def add_option_to_parser(self, section_opts): def add_option_to_parser(self, section_opts):
@ -292,6 +294,7 @@ class BooleanOption(BaseOption):
@property @property
def parser_argument_name(self): def parser_argument_name(self):
""" Get option argument name in parser options """ """ Get option argument name in parser options """
# pylint: disable=consider-using-f-string
return ( return (
self.arg if self.arg else self.arg if self.arg else
'--{0}-{1}-{2}'.format( '--{0}-{1}-{2}'.format(
@ -304,7 +307,7 @@ class BooleanOption(BaseOption):
def _ask_value(self, prompt=None, **kwargs): def _ask_value(self, prompt=None, **kwargs):
""" Ask to user to enter value of this option and return it """ """ Ask to user to enter value of this option and return it """
default_value = self.get() default_value = self.get()
prompt = "%s: " % self.name prompt = f'{self.name}: '
if default_value: if default_value:
prompt += '[Y/n] ' prompt += '[Y/n] '
else: else:
@ -413,12 +416,11 @@ class PasswordOption(StringOption):
value = keyring.get_password(service_name, username) value = keyring.get_password(service_name, username)
if value is None: if value is None:
# pylint: disable=consider-using-f-string
value = getpass( value = getpass(
'Please enter %s%s: ' % ( 'Please enter {0}{1}: '.format(
self.comment if self.comment else f'{self.section.name} {self.name}',
'%s %s' % (self.section.name, self.name), f' for {username}' if username != self.name else ''
(' for %s' % username)
if username != self.name else ''
) )
) )
keyring.set_password(service_name, username, value) keyring.set_password(service_name, username, value)
@ -445,11 +447,11 @@ class PasswordOption(StringOption):
print('# ' + self.comment) print('# ' + self.comment)
default_value = kwargs.pop('default_value', self.get()) default_value = kwargs.pop('default_value', self.get())
if not prompt: if not prompt:
prompt = '%s: ' % self.name prompt = f'{self.name}: '
if default_value is not None: if default_value is not None:
# Hide value only if it differed from default value # Hide value only if it differed from default value
if default_value == self.default: if default_value == self.default:
prompt += '[%s] ' % default_value prompt += f'[{default_value}] '
else: else:
prompt += '[secret defined, leave to empty to keep it as unchange] ' prompt += '[secret defined, leave to empty to keep it as unchange] '
value = getpass(prompt) value = getpass(prompt)
@ -466,8 +468,8 @@ class PasswordOption(StringOption):
default_use_keyring = (super().get() == self.keyring_value) default_use_keyring = (super().get() == self.keyring_value)
while use_keyring is None: while use_keyring is None:
prompt = ( prompt = (
'Do you want to use XDG keyring ? [%s] ' % 'Do you want to use XDG keyring ? '
('Y/n' if default_use_keyring else 'y/N') f"[{'Y/n' if default_use_keyring else 'y/N'}] "
) )
result = input(prompt).lower() result = input(prompt).lower()
if result == '': if result == '':
@ -488,7 +490,7 @@ class ConfigSection:
def __init__(self, config, name, comment=None, order=None): def __init__(self, config, name, comment=None, order=None):
self.config = config self.config = config
self.name = name self.name = name
self.options = dict() self.options = {}
self.comment = comment self.comment = comment
self.order = order if isinstance(order, int) else 10 self.order = order if isinstance(order, int) else 10
@ -500,7 +502,7 @@ class ConfigSection:
:param name: Option name :param name: Option name
:param **kwargs: Dict of raw option for type class :param **kwargs: Dict of raw option for type class
""" """
assert not self.defined(name), "Duplicated option %s" % name assert not self.defined(name), f'Duplicated option {name}'
self.options[name] = _type(self.config, self, name, **kwargs) self.options[name] = _type(self.config, self, name, **kwargs)
return self.options[name] return self.options[name]
@ -514,12 +516,12 @@ class ConfigSection:
def get(self, option): def get(self, option):
""" Get option value """ """ Get option value """
assert self.defined(option), "Option %s unknown" % option assert self.defined(option), f'Option {option} unknown'
return self.options[option].get() return self.options[option].get()
def set(self, option, value): def set(self, option, value):
""" Set option value """ """ Set option value """
assert self.defined(option), "Option %s unknown" % option assert self.defined(option), f'Option {option} unknown'
return self.options[option].set(value) return self.options[option].set(value)
def add_options_to_parser(self, parser): def add_options_to_parser(self, parser):
@ -530,16 +532,16 @@ class ConfigSection:
else self.name.capitalize() else self.name.capitalize()
) )
for option in self.options: for option in self.options: # pylint: disable=consider-using-dict-items
self.options[option].add_option_to_parser(section_opts) self.options[option].add_option_to_parser(section_opts)
def export_to_config(self): def export_to_config(self):
""" Export section and their options to configuration file """ """ Export section and their options to configuration file """
lines = [] lines = []
if self.comment: if self.comment:
lines.append('# %s' % self.comment) lines.append(f'# {self.comment}')
lines.append('[%s]' % self.name) lines.append(f'[{self.name}]')
for option in self.options: for option in self.options: # pylint: disable=consider-using-dict-items
lines.append(self.options[option].export_to_config()) lines.append(self.options[option].export_to_config())
return '\n'.join(lines) return '\n'.join(lines)
@ -555,9 +557,9 @@ class ConfigSection:
:rtype: bool of dict :rtype: bool of dict
""" """
if self.comment: if self.comment:
print('# %s' % self.comment) print(f'# {self.comment}')
print('[%s]\n' % self.name) print(f'[{self.name}]''\n')
result = dict() result = {}
error = False error = False
for name, option in self.options.items(): for name, option in self.options.items():
option_result = option.ask_value(set_it=set_it) option_result = option.ask_value(set_it=set_it)
@ -622,7 +624,7 @@ class Config: # pylint: disable=too-many-instance-attributes
The specified callback method will receive Config object as parameter. The specified callback method will receive Config object as parameter.
: param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method : param ** kwargs: Raw parameters dict pass to ConfigSection __init__() method
""" """
assert name not in self.sections, "Duplicated section %s" % name assert name not in self.sections, f'Duplicated section {name}'
self.sections[name] = ConfigSection(self, name, **kwargs) self.sections[name] = ConfigSection(self, name, **kwargs)
if loaded_callback: if loaded_callback:
@ -644,7 +646,7 @@ class Config: # pylint: disable=too-many-instance-attributes
""" Get option value """ """ Get option value """
assert self.config_parser or self.options, 'Unconfigured options parser' assert self.config_parser or self.options, 'Unconfigured options parser'
assert self.defined( assert self.defined(
section, option), 'Unknown option %s.%s' % (section, option) section, option), f'Unknown option {section}.{option}'
value = self.sections[section].get(option) value = self.sections[section].get(option)
log.debug('get(%s, %s): %s (%s)', section, option, value, type(value)) log.debug('get(%s, %s): %s (%s)', section, option, value, type(value))
return value return value
@ -664,7 +666,7 @@ class Config: # pylint: disable=too-many-instance-attributes
""" Set option value """ """ Set option value """
assert self.config_parser, 'Unconfigured options parser' assert self.config_parser, 'Unconfigured options parser'
assert self.defined( assert self.defined(
section, option), 'Unknown option %s.%s' % (section, option) section, option), f'Unknown option {section}.{option}'
self.sections[section].set(option, value) self.sections[section].set(option, value)
def load_file(self, filepath, execute_callback=True): def load_file(self, filepath, execute_callback=True):
@ -734,7 +736,11 @@ class Config: # pylint: disable=too-many-instance-attributes
'Configuration directory "%s" does not exist (or not writable)', dirpath) 'Configuration directory "%s" does not exist (or not writable)', dirpath)
return False return False
lines = ['#\n# %s configuration\n#\n' % self.appname] lines = [
'#\n'
f'# {self.appname} configuration'
'\n#\n'
]
for section_name in self._ordered_section_names: for section_name in self._ordered_section_names:
lines.append('') lines.append('')
@ -769,9 +775,15 @@ class Config: # pylint: disable=too-many-instance-attributes
formatter_class=RawWrappedTextHelpFormatter, formatter_class=RawWrappedTextHelpFormatter,
**kwargs) **kwargs)
config_file_help = 'Configuration file to use (default: %s)' % self.config_filepath config_file_help = (
f'Configuration file to use (default: {self.config_filepath})'
)
if self.config_file_env_variable: if self.config_file_env_variable:
config_file_help += '\n\nYou also could set %s environment variable to specify your configuration file path.' % self.config_file_env_variable config_file_help += (
'\n\nYou also could set '
f'{self.config_file_env_variable} environment variable to '
'specify your configuration file path.'
)
self.options_parser.add_argument( self.options_parser.add_argument(
'-c', '-c',
'--config', '--config',
@ -854,7 +866,7 @@ class Config: # pylint: disable=too-many-instance-attributes
# Load configuration file # Load configuration file
if os.path.isfile(options.config) and not self.load_file(options.config, execute_callback=False): if os.path.isfile(options.config) and not self.load_file(options.config, execute_callback=False):
parser.error( parser.error(
'Failed to load configuration from file %s' % options.config f'Failed to load configuration from file {options.config}'
) )
if options.save and not already_saved: if options.save and not already_saved:
@ -867,11 +879,20 @@ class Config: # pylint: disable=too-many-instance-attributes
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
if self.get('console', 'enabled'): if self.get('console', 'enabled'):
console_handler = logging.StreamHandler(sys.stdout) stdout_console_handler = logging.StreamHandler(sys.stdout)
stdout_console_handler.addFilter(StdoutInfoFilter())
stdout_console_handler.setLevel(logging.DEBUG)
stderr_console_handler = logging.StreamHandler(sys.stderr)
stderr_console_handler.setLevel(logging.WARNING)
if self.get('console', 'log_format'): if self.get('console', 'log_format'):
console_formater = logging.Formatter(self.get('console', 'log_format')) console_formater = logging.Formatter(self.get('console', 'log_format'))
console_handler.setFormatter(console_formater) stdout_console_handler.setFormatter(console_formater)
logging.getLogger().addHandler(console_handler) stderr_console_handler.setFormatter(console_formater)
logging.getLogger().addHandler(stdout_console_handler)
logging.getLogger().addHandler(stderr_console_handler)
if execute_callback: if execute_callback:
self._loaded() self._loaded()
@ -907,7 +928,7 @@ class Config: # pylint: disable=too-many-instance-attributes
# On set it mode, ensure configuration file parser is initialized # On set it mode, ensure configuration file parser is initialized
if set_it and not self.config_parser: if set_it and not self.config_parser:
self.config_parser = ConfigParser() self.config_parser = ConfigParser()
result = dict() result = {}
error = False error = False
for name, section in self.sections.items(): for name, section in self.sections.items():
section_result = section.ask_values(set_it=set_it) section_result = section.ask_values(set_it=set_it)
@ -960,14 +981,14 @@ class Config: # pylint: disable=too-many-instance-attributes
argv, create=False, execute_callback=False) argv, create=False, execute_callback=False)
if os.path.exists(options.config) and not options.overwrite: if os.path.exists(options.config) and not options.overwrite:
print('Configuration file %s already exists' % options.config) print(f'Configuration file {options.config} already exists')
sys.exit(1) sys.exit(1)
if options.interactive: if options.interactive:
self.ask_values(set_it=True) self.ask_values(set_it=True)
if self.save(options.config): if self.save(options.config):
print('Configuration file %s created.' % options.config) print(f'Configuration file {options.config} created.')
if options.validate: if options.validate:
print('Validate your configuration...') print('Validate your configuration...')
try: try:
@ -976,14 +997,15 @@ class Config: # pylint: disable=too-many-instance-attributes
else: else:
print('Error(s) occurred validating your configuration. See logs for details.') print('Error(s) occurred validating your configuration. See logs for details.')
sys.exit(1) sys.exit(1)
except Exception: except Exception: # pylint: disable=broad-except
print( print(
'Exception occurred validating your configuration: %s\n\nSee logs for details.' % 'Exception occurred validating your configuration:\n'
traceback.format_exc()) f'{traceback.format_exc()}'
'\n\nSee logs for details.'
)
sys.exit(2) sys.exit(2)
else: else:
print('Error occured creating configuration file %s' % print(f'Error occured creating configuration file {options.config}')
options.config)
sys.exit(1) sys.exit(1)
sys.exit(0) sys.exit(0)
@ -1006,8 +1028,8 @@ class Config: # pylint: disable=too-many-instance-attributes
return os.environ.get(self.config_file_env_variable) return os.environ.get(self.config_file_env_variable)
return os.path.join( return os.path.join(
self.config_dir, self.config_dir,
("%s.ini" % self.shortname) if self.shortname f'{self.shortname}.ini' if self.shortname
else "config.ini" else 'config.ini'
) )
@ -1044,7 +1066,7 @@ class ConfigurableObject:
**kwargs): **kwargs):
for key, value in kwargs.items(): for key, value in kwargs.items():
assert key in self._defaults, "Unknown %s option" % key assert key in self._defaults, f'Unknown {key} option'
self._kwargs[key] = value self._kwargs[key] = value
if options: if options:
@ -1054,7 +1076,7 @@ class ConfigurableObject:
elif self._config_name: elif self._config_name:
self._options_prefix = self._config_name + '_' self._options_prefix = self._config_name + '_'
else: else:
raise Exception('No configuration name defined for %s' % __name__) raise Exception(f'No configuration name defined for {__name__}')
if config: if config:
self._config = config self._config = config
@ -1063,7 +1085,7 @@ class ConfigurableObject:
elif self._config_name: elif self._config_name:
self._config_section = self._config_name self._config_section = self._config_name
else: else:
raise Exception('No configuration name defined for %s' % __name__) raise Exception(f'No configuration name defined for {__name__}')
def _get_option(self, option, default=None, required=False): def _get_option(self, option, default=None, required=False):
""" Retreive option value """ """ Retreive option value """
@ -1076,7 +1098,7 @@ class ConfigurableObject:
if self._config and self._config.defined(self._config_section, option): if self._config and self._config.defined(self._config_section, option):
return self._config.get(self._config_section, option) return self._config.get(self._config_section, option)
assert not required, "Options %s not defined" % option assert not required, f'Options {option} not defined'
return default if default is not None else self._defaults.get(option) return default if default is not None else self._defaults.get(option)
@ -1114,3 +1136,10 @@ class ConfigSectionAsDictWrapper:
def __delitem__(self, key): def __delitem__(self, key):
raise Exception('Deleting a configuration option is not supported') raise Exception('Deleting a configuration option is not supported')
# pylint: disable=too-few-public-methods
class StdoutInfoFilter(logging.Filter):
""" Logging filter to keep messages only >= logging.INFO """
def filter(self, record):
return record.levelno in (logging.DEBUG, logging.INFO)

View file

@ -1,10 +1,12 @@
# pylint: disable=redefined-outer-name,missing-function-docstring,protected-access,global-statement # pylint: disable=redefined-outer-name,missing-function-docstring,protected-access,global-statement
""" Tests on config lib """ """ Tests on config lib """
import logging
from mylib.config import Config, ConfigSection from mylib.config import Config, ConfigSection
from mylib.config import StringOption from mylib.config import StringOption
runned = dict() runned = {}
def test_config_init_default_args(): def test_config_init_default_args():
@ -208,3 +210,17 @@ def test_get_default():
config.parse_arguments_options(argv=[], create=False) config.parse_arguments_options(argv=[], create=False)
assert config.get(section_name, opt_name) == opt_default_value assert config.get(section_name, opt_name) == opt_default_value
def test_logging_splited_stdout_stderr(capsys):
config = Config('Test app')
config.parse_arguments_options(argv=['-C', '-v'], create=False)
info_msg = "[info]"
err_msg = "[error]"
logging.getLogger().info(info_msg)
logging.getLogger().error(err_msg)
captured = capsys.readouterr()
assert info_msg in captured.out
assert info_msg not in captured.err
assert err_msg in captured.err
assert err_msg not in captured.out