Source code for tools.configmanager

"""Module to handle the configuration of the repository
(the YAML files in the ``setup/`` folder)
"""
from __future__ import annotations
import typing
import os.path as op
import argparse
import copy
import yaml
from definitions import dconfig
from . import tpaths, envvar


@typing.overload
def override_nested_dict(
    dict_to_insert: dict, dict_to_override: dict, inplace: bool = True
) -> None:
    ...


@typing.overload
def override_nested_dict(
    dict_to_insert: dict, dict_to_override: dict, inplace: bool = False
) -> dict:
    ...


[docs] def override_nested_dict( dict_to_insert: dict, dict_to_override: dict, inplace: bool = False ) -> dict | None: """Override a nested dictionnary by another nested dictionnary Args: dict_to_insert: dictionnary to add to ``dict_to_override`` dict_to_override: base dictionnary which ``dict_to_insert`` is added to inplace: whether to override in place Returns: overriden dictionnary if not ``inplace`` """ dict_to_override_ = dict_to_override if inplace else copy.deepcopy(dict_to_override) for key, value in dict_to_insert.items(): if isinstance(value, dict): if key in dict_to_override_: assert isinstance(dict_to_override_[key], dict) override_nested_dict(value, dict_to_override_[key], inplace=True) else: dict_to_override_[key] = value else: dict_to_override_[key] = value if not inplace: return dict_to_override_
[docs] def resolve_paths_in_config(config: dict, config_path: str): """If relative paths or paths with wildcards are given in a configuration file, * Replace the wildcards by the actual environment variable values * express them relative to the directory where the YAML configuration file was. The function is in-place. Args: config: configuration in ``config_path`` config_path: path to the YAML file that contained ``config`` Notes: This is applied to the options specified in :py:data:`definitions.dconfig.path_in_config` """ for path_in_config in dconfig.path_list_location: if isinstance(path_in_config, str): section = path_in_config if section in config: config[section] = tpaths.resolve_relative_paths( envvar.resolve_wildcards(config[section]), op.dirname(config_path) ) else: # It must be a dictionnary assert isinstance(path_in_config, dict) for section, param_names in path_in_config.items(): if section in config: for param_name in param_names: if ( param_name in config[section] and config[section][param_name] is not None ): config[section][param_name] = tpaths.resolve_relative_paths( paths=envvar.resolve_wildcards( config[section][param_name] ), reference=op.dirname(config_path), )
[docs] def assert_config_validity(config: dict): """Check the the configuration has the correct formatting. Args: config: configuration as configured in YAML files. """ assert isinstance( config, dict ), "Configuration should be represented by a dictionnary" for section, section_config in config.items(): assert section in dconfig.config_sections, ( "Section `" + str(section) + "` not recognised" ) assert isinstance(section_config, dict) for param in section_config: assert param in dconfig.config_description.get(section, {}), ( "Parameter `" + str(param) + "` is not in the section `" + str(section) + "`" )
[docs] def import_config(path: str, resolve_relative_paths: bool = True) -> dict: """Import the configuration and asserts its format is valid. Also supports an ``include:`` section where a YAML file can be included. Args: path: path to the YAML configuration file resolve_relative_paths: whether to resolve the relative paths using the :py:func:`resolve_paths_in_config` function Returns: Configuration """ with open(path) as config_file: config = yaml.load(config_file, Loader=yaml.SafeLoader) if resolve_relative_paths: resolve_paths_in_config(config, path) if "include" in config: included_config_paths = config["include"] del config["include"] if isinstance(included_config_paths, str): included_config_paths = [included_config_paths] config = merge_configs(included_config_paths + [config]) assert_config_validity(config) return config
@typing.overload def update_config( config: str | dict, base_config: dict, inplace: typing.Literal[True] = False ) -> dict: ... @typing.overload def update_config( config: str | dict, base_config: dict, inplace: typing.Literal[False] = False ) -> None: ...
[docs] def update_config( config: str | dict, base_config: str | dict, inplace: bool = False ) -> dict | None: """Update the configuration with another configuration Args: config: path to the new YAML configuration or actual configuration base_config: base configuration that will be updated Returns: Updated configuration if ``inplace`` set to ``False`` """ if isinstance(config, str): config = import_config(config) if inplace: assert isinstance(base_config, dict) if isinstance(base_config, str): base_config = import_config(base_config) return override_nested_dict(config, base_config, inplace=inplace)
[docs] def merge_configs(configs: typing.List[str | dict]) -> dict: """Merge configurations. Configurations are either given by their path ( in this case they are loaded) or the actual dictionnary. Args: configs: list of configs or their paths Returns: Merged configuration, by override the configuration one by one """ assert len(configs) >= 2 base_config = configs[0] other_configs = configs[1:] merged_config = copy.deepcopy(base_config) for other_config in other_configs: merged_config = update_config(other_config, merged_config, inplace=False) return merged_config
[docs] def filter_config( config: dict, sections: typing.Optional[typing.Iterable[str]] = None, constraints: typing.Optional[typing.Dict[str]] = None, ) -> dict: """Filter the configuration to keep only what is necessary for the algorithm to run. Args: config: actual configuration sections: parameter sections to add constraints: associates a section with the parameters to include. If the section is not in this dictionnary, all the parameters are included. Returns: Filtered configuration """ if constraints is None: constraints = {} if sections is None and not constraints: return config filtered_config = {} if sections is not None: for section in sections: section_params = ( constraints[section] if section in constraints else config[section].keys() ) filtered_config[section] = {} for param in section_params: filtered_config[section][param] = copy.deepcopy(config[section][param]) return filtered_config
[docs] def import_default_config( repo: str, sections: typing.Optional[typing.Iterable[str]] = None, constraints: typing.Optional[typing.Dict[str]] = None, ) -> dict: """Import the default configuration. Args: repo: path to the root of the repository Notes: The default configuration is in ``<repo>/setup/config_default.yaml``. The custom configuration is in ``<repo>/setup/config.yaml`` """ default_path = op.join(repo, "setup", "config_default.yaml") custom_path = op.join(repo, "setup", "config.yaml") default_config = import_config(default_path) if op.exists(custom_path): config = update_config(custom_path, default_config) else: config = default_config return filter_config(config, sections=sections, constraints=constraints)
[docs] def str2bool(value: str) -> bool: """Correct parsing of a boolean. Args: value: value to parse Returns: ``True`` if ``value`` represents ``True`` (``yes``, ``true``, ...). ``False`` if ``value`` represents ``False`` (``no``, ``false``, ...) Notes: Taken from https://stackoverflow.com/questions/15008758 """ if isinstance(value, bool): return value if value.lower() in ("yes", "true", "t", "y", "1"): return True elif value.lower() in ("no", "false", "f", "n", "0"): return False else: raise argparse.ArgumentTypeError("Boolean value expected.")
@typing.overload def prepare_parser( parser: str, sections: typing.Iterable[str], default_config: dict ) -> argparse.ArgumentParser: ... @typing.overload def prepare_parser( parser: argparse.ArgumentParser, sections: typing.Iterable[str], default_config: dict, ) -> None: ...
[docs] def prepare_parser( parser: argparse.ArgumentParser | str, sections: typing.Iterable[str], program_args: typing.Optional[dict] = None, constraints: typing.Optional[typing.Dict[str, typing.Iterable[str]]] = None, with_config: bool = True, ) -> argparse.ArgumentParser | None: """Add sections to a parser. Args: parser: parser to complete (type ``argparse.ArgumentParser``) or description of the parser to create (if type ``str``) sections: parameter sections to add program_args: Dictionnary that allows to specify how to configure the subparsers that define which program to uses. It contains the key-value ``dest`` that specifies which program to run, and the possible programs ``sections`` (as a program is also associated with a section.) The other key-value couples (such as ``help``) are passed to the :py:func:`add_subparsers` method. constraints: associates a section with the parameters to include. If the section is not in this dictionnary, all the parameters are included. with_config: whether to add the ``--config`` argument. Returns: The parser that was created, if ``parser`` of type ``str`` """ if constraints is None: constraints = {} if isinstance(parser, str): parser_ = argparse.ArgumentParser(description=parser) else: parser_ = parser if program_args: subparser_sections = program_args["sections"] subparser_args_ = copy.deepcopy(program_args) del subparser_args_["sections"] subparsers = parser_.add_subparsers(**subparser_args_) subparsers.required = True for subparser_section in subparser_sections: subparser = subparsers.add_parser(subparser_section) prepare_parser( parser=subparser, sections=[subparser_section] + sections, constraints=constraints, ) else: if with_config: parser_.add_argument( "-c", "--config", help="YAML file(s) that configures the algorithm", nargs="+", ) for section in sections: assert section in dconfig.config_sections, ( "The section " + str(section) + " is not defined." ) section_description = dconfig.config_sections[section] group = parser_.add_argument_group(section, section_description) section_params = ( dconfig.config_description.get(section, {}).keys() if section not in constraints else constraints[section] ) for param in section_params: param_config = copy.deepcopy(dconfig.config_description[section][param]) if ( "type" in param_config and param_config["type"] is bool and "const" not in param_config and "nargs" not in param_config ): # If ``True`` is not provided to a boolean flag, it is implicitly # provided param_config["nargs"] = "?" param_config["const"] = True param_config["type"] = str2bool elif ( param_config.get("type", str) is str and "const" not in param_config ): # For a string, no value means ``None`` param_config["const"] = None group.add_argument( "--" + param, default=argparse.SUPPRESS, **param_config ) if isinstance(parser, str): return parser_
[docs] def group_argparse_args( parser_args: argparse.Namespace | dict, sections: typing.Iterable[str], constraints: typing.Optional[typing.Dict[str, typing.Iterable[str]]] = None, ignore: typing.Optional[typing.List[str]] = None, ) -> dict: """Group the argparse parameters by section according to as it is supposed to be for a configuration Args: parser_args: result of the ``parse_args`` method sections: parameter sections to include constraints: associates a section with the parameters to include. If the section is not in this dictionnary, all the parameters are included. ignore: list of variables to ignore in the grouping Returns: Configuration described by ``parser_args`` """ if ignore is None: ignore = [] if constraints is None: constraints = {} parser_args_ = copy.deepcopy(parser_args) grouped_argparse_args = {} if not isinstance(parser_args_, dict): parser_args_ = vars(parser_args_) for section in sections: section_params = ( dconfig.config_description.get(section, {}).keys() if section not in constraints else constraints[section] ) for param in section_params: if param in parser_args: if section not in grouped_argparse_args: grouped_argparse_args[section] = {} grouped_argparse_args[section][param] = parser_args_[param] del parser_args_[param] for remaining_param, value in parser_args_.items(): if remaining_param not in ignore: raise Exception(str(remaining_param) + " does not belong to any section.") return grouped_argparse_args
@typing.overload def return_config_from_parser( repo: str, sections: typing.Optional[typing.Iterable[str]], constraints: typing.Optional[typing.Dict[str]] = None, parse_known_args: typing.Literal[True] = None, ) -> typing.Tuple[dict, typing.List[str]]: ... @typing.overload def return_config_from_parser( repo: str, sections: typing.Optional[typing.Iterable[str]], constraints: typing.Optional[typing.Dict[str]] = None, parse_known_args: typing.Literal[False] = None, ) -> dict: ...
[docs] def return_config_from_parser( repo: str, sections: typing.Optional[typing.Iterable[str]], program_args: typing.Optional[dict] = None, constraints: typing.Optional[typing.Dict[str]] = None, parse_known_args: bool = False, ) -> dict | typing.Tuple[dict, typing.List[str]]: """Load the default configuration. Create a parser that allows to parse (a) YAML configuration file(s) as well as arguments to change the configuration. Returns the configuration altered by the arguments that are parsed. Args: repo: path to the root of the repository sections: parameter sections to add constraints: associates a section with the parameters to include. If the section is not in this dictionnary, all the parameters are included. program_args: Dictionnary that allows to specify how to configure the subparsers that define which program to use. It contains the key-value ``dest`` that specifies which program to run, and the possible programs ``sections`` (as a program is also associated with a section.) The other key-value couples (such as ``help``) are passed to the :py:func:`add_subparsers` method. parse_known_args: whether to only parse known arguments. The other arguments that are unknown given ``sections`` and ``constraints`` will be passed to the other scripts. return_last_config_path: whether to return the last configuration path that was used. The latter is used to determine the output directory in the case where ``auto_output_mode`` is set to ``same``. Returns: Configuration. If ``parse_known_args`` is set to ``True``, alse returns the arguments that were not parsed.. If ``return_last_config_path``, also return the path of the last configuration file that was used (or ``None`` if there were no configuration file) """ # Setup the parser parser = argparse.ArgumentParser( description=( "Save the hits and MC particles stored in (a) (X)DIGI file(s) " "into CSV files" ) ) prepare_parser( parser=parser, sections=sections, program_args=program_args, constraints=constraints, with_config=True, ) if parse_known_args: parser_args, unknown_args = parser.parse_known_args() parser_args = vars(parser_args) else: parser_args = vars(parser.parse_args()) if program_args: program_type = program_args["dest"] chosen_program = parser_args[program_type] sections = sections + [chosen_program] # (copy) default_config = import_default_config( repo=repo, sections=sections, constraints=constraints ) # Overwrite the configuration according to what has been parsed config_paths = parser_args["config"] config = default_config # /!\ not a copy if config_paths: for config_path in config_paths: print("Load config", config_path) # If relative paths are given as input, express them relative to # the directory where the YAML configuration file is new_config = import_config(config_path) override_nested_dict(new_config, config, inplace=True) # Arguments to ignore when grouping by section ignore = ["config"] if program_args: ignore.append(program_type) grouped_argparse_args = group_argparse_args( parser_args, sections=sections, constraints=constraints, ignore=ignore ) override_nested_dict(grouped_argparse_args, config, inplace=True) if program_args: config[program_type] = chosen_program # Return if parse_known_args: return config, unknown_args else: return config