"""Module to handle the configuration of the repository
(the YAML files in the ``setup/`` folder)
"""
from __future__ import annotations
import typing
import os.path as op
import argparse
import copy
import yaml
from definitions import dconfig
from . import tpaths, envvar
@typing.overload
def override_nested_dict(
dict_to_insert: dict, dict_to_override: dict, inplace: bool = True
) -> None:
...
@typing.overload
def override_nested_dict(
dict_to_insert: dict, dict_to_override: dict, inplace: bool = False
) -> dict:
...
[docs]
def override_nested_dict(
dict_to_insert: dict, dict_to_override: dict, inplace: bool = False
) -> dict | None:
"""Override a nested dictionnary by another nested dictionnary
Args:
dict_to_insert: dictionnary to add to ``dict_to_override``
dict_to_override: base dictionnary which ``dict_to_insert`` is added to
inplace: whether to override in place
Returns:
overriden dictionnary if not ``inplace``
"""
dict_to_override_ = dict_to_override if inplace else copy.deepcopy(dict_to_override)
for key, value in dict_to_insert.items():
if isinstance(value, dict):
if key in dict_to_override_:
assert isinstance(dict_to_override_[key], dict)
override_nested_dict(value, dict_to_override_[key], inplace=True)
else:
dict_to_override_[key] = value
else:
dict_to_override_[key] = value
if not inplace:
return dict_to_override_
[docs]
def resolve_paths_in_config(config: dict, config_path: str):
"""If relative paths or paths with wildcards are given in a configuration file,
* Replace the wildcards by the actual environment variable values
* express them relative to the directory where the YAML configuration file was.
The function is in-place.
Args:
config: configuration in ``config_path``
config_path: path to the YAML file that contained ``config``
Notes:
This is applied to the options specified in
:py:data:`definitions.dconfig.path_in_config`
"""
for path_in_config in dconfig.path_list_location:
if isinstance(path_in_config, str):
section = path_in_config
if section in config:
config[section] = tpaths.resolve_relative_paths(
envvar.resolve_wildcards(config[section]), op.dirname(config_path)
)
else:
# It must be a dictionnary
assert isinstance(path_in_config, dict)
for section, param_names in path_in_config.items():
if section in config:
for param_name in param_names:
if (
param_name in config[section]
and config[section][param_name] is not None
):
config[section][param_name] = tpaths.resolve_relative_paths(
paths=envvar.resolve_wildcards(
config[section][param_name]
),
reference=op.dirname(config_path),
)
[docs]
def assert_config_validity(config: dict):
"""Check the the configuration has the correct formatting.
Args:
config: configuration as configured in YAML files.
"""
assert isinstance(
config, dict
), "Configuration should be represented by a dictionnary"
for section, section_config in config.items():
assert section in dconfig.config_sections, (
"Section `" + str(section) + "` not recognised"
)
assert isinstance(section_config, dict)
for param in section_config:
assert param in dconfig.config_description.get(section, {}), (
"Parameter `"
+ str(param)
+ "` is not in the section `"
+ str(section)
+ "`"
)
[docs]
def import_config(path: str, resolve_relative_paths: bool = True) -> dict:
"""Import the configuration and asserts its format is valid.
Also supports an ``include:`` section where a YAML file can be included.
Args:
path: path to the YAML configuration file
resolve_relative_paths: whether to resolve the relative paths using
the :py:func:`resolve_paths_in_config`
function
Returns:
Configuration
"""
with open(path) as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
if resolve_relative_paths:
resolve_paths_in_config(config, path)
if "include" in config:
included_config_paths = config["include"]
del config["include"]
if isinstance(included_config_paths, str):
included_config_paths = [included_config_paths]
config = merge_configs(included_config_paths + [config])
assert_config_validity(config)
return config
@typing.overload
def update_config(
config: str | dict, base_config: dict, inplace: typing.Literal[True] = False
) -> dict:
...
@typing.overload
def update_config(
config: str | dict, base_config: dict, inplace: typing.Literal[False] = False
) -> None:
...
[docs]
def update_config(
config: str | dict, base_config: str | dict, inplace: bool = False
) -> dict | None:
"""Update the configuration with another configuration
Args:
config: path to the new YAML configuration or actual configuration
base_config: base configuration that will be updated
Returns:
Updated configuration if ``inplace`` set to ``False``
"""
if isinstance(config, str):
config = import_config(config)
if inplace:
assert isinstance(base_config, dict)
if isinstance(base_config, str):
base_config = import_config(base_config)
return override_nested_dict(config, base_config, inplace=inplace)
[docs]
def merge_configs(configs: typing.List[str | dict]) -> dict:
"""Merge configurations. Configurations are either given by their path (
in this case they are loaded) or the actual dictionnary.
Args:
configs: list of configs or their paths
Returns:
Merged configuration, by override the configuration one by one
"""
assert len(configs) >= 2
base_config = configs[0]
other_configs = configs[1:]
merged_config = copy.deepcopy(base_config)
for other_config in other_configs:
merged_config = update_config(other_config, merged_config, inplace=False)
return merged_config
[docs]
def filter_config(
config: dict,
sections: typing.Optional[typing.Iterable[str]] = None,
constraints: typing.Optional[typing.Dict[str]] = None,
) -> dict:
"""Filter the configuration to keep only what is necessary for the algorithm
to run.
Args:
config: actual configuration
sections: parameter sections to add
constraints: associates a section with the parameters to include. If the
section is not in this dictionnary, all the parameters are included.
Returns:
Filtered configuration
"""
if constraints is None:
constraints = {}
if sections is None and not constraints:
return config
filtered_config = {}
if sections is not None:
for section in sections:
section_params = (
constraints[section]
if section in constraints
else config[section].keys()
)
filtered_config[section] = {}
for param in section_params:
filtered_config[section][param] = copy.deepcopy(config[section][param])
return filtered_config
[docs]
def import_default_config(
repo: str,
sections: typing.Optional[typing.Iterable[str]] = None,
constraints: typing.Optional[typing.Dict[str]] = None,
) -> dict:
"""Import the default configuration.
Args:
repo: path to the root of the repository
Notes:
The default configuration is in ``<repo>/setup/config_default.yaml``.
The custom configuration is in ``<repo>/setup/config.yaml``
"""
default_path = op.join(repo, "setup", "config_default.yaml")
custom_path = op.join(repo, "setup", "config.yaml")
default_config = import_config(default_path)
if op.exists(custom_path):
config = update_config(custom_path, default_config)
else:
config = default_config
return filter_config(config, sections=sections, constraints=constraints)
[docs]
def str2bool(value: str) -> bool:
"""Correct parsing of a boolean.
Args:
value: value to parse
Returns:
``True`` if ``value`` represents ``True`` (``yes``, ``true``, ...).
``False`` if ``value`` represents ``False`` (``no``, ``false``, ...)
Notes:
Taken from https://stackoverflow.com/questions/15008758
"""
if isinstance(value, bool):
return value
if value.lower() in ("yes", "true", "t", "y", "1"):
return True
elif value.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
@typing.overload
def prepare_parser(
parser: str, sections: typing.Iterable[str], default_config: dict
) -> argparse.ArgumentParser:
...
@typing.overload
def prepare_parser(
parser: argparse.ArgumentParser,
sections: typing.Iterable[str],
default_config: dict,
) -> None:
...
[docs]
def prepare_parser(
parser: argparse.ArgumentParser | str,
sections: typing.Iterable[str],
program_args: typing.Optional[dict] = None,
constraints: typing.Optional[typing.Dict[str, typing.Iterable[str]]] = None,
with_config: bool = True,
) -> argparse.ArgumentParser | None:
"""Add sections to a parser.
Args:
parser: parser to complete (type ``argparse.ArgumentParser``)
or description of the parser to create (if type ``str``)
sections: parameter sections to add
program_args: Dictionnary that allows to specify how to configure the
subparsers that define which program to uses. It contains the key-value
``dest`` that specifies which program to run, and the possible
programs ``sections`` (as a program is also associated with a section.)
The other key-value couples (such as ``help``) are passed to
the :py:func:`add_subparsers` method.
constraints: associates a section with the parameters to include. If the
section is not in this dictionnary, all the parameters are included.
with_config: whether to add the ``--config`` argument.
Returns:
The parser that was created, if ``parser`` of type ``str``
"""
if constraints is None:
constraints = {}
if isinstance(parser, str):
parser_ = argparse.ArgumentParser(description=parser)
else:
parser_ = parser
if program_args:
subparser_sections = program_args["sections"]
subparser_args_ = copy.deepcopy(program_args)
del subparser_args_["sections"]
subparsers = parser_.add_subparsers(**subparser_args_)
subparsers.required = True
for subparser_section in subparser_sections:
subparser = subparsers.add_parser(subparser_section)
prepare_parser(
parser=subparser,
sections=[subparser_section] + sections,
constraints=constraints,
)
else:
if with_config:
parser_.add_argument(
"-c",
"--config",
help="YAML file(s) that configures the algorithm",
nargs="+",
)
for section in sections:
assert section in dconfig.config_sections, (
"The section " + str(section) + " is not defined."
)
section_description = dconfig.config_sections[section]
group = parser_.add_argument_group(section, section_description)
section_params = (
dconfig.config_description.get(section, {}).keys()
if section not in constraints
else constraints[section]
)
for param in section_params:
param_config = copy.deepcopy(dconfig.config_description[section][param])
if (
"type" in param_config
and param_config["type"] is bool
and "const" not in param_config
and "nargs" not in param_config
):
# If ``True`` is not provided to a boolean flag, it is implicitly
# provided
param_config["nargs"] = "?"
param_config["const"] = True
param_config["type"] = str2bool
elif (
param_config.get("type", str) is str and "const" not in param_config
):
# For a string, no value means ``None``
param_config["const"] = None
group.add_argument(
"--" + param, default=argparse.SUPPRESS, **param_config
)
if isinstance(parser, str):
return parser_
[docs]
def group_argparse_args(
parser_args: argparse.Namespace | dict,
sections: typing.Iterable[str],
constraints: typing.Optional[typing.Dict[str, typing.Iterable[str]]] = None,
ignore: typing.Optional[typing.List[str]] = None,
) -> dict:
"""Group the argparse parameters by section according to as it is supposed
to be for a configuration
Args:
parser_args: result of the ``parse_args`` method
sections: parameter sections to include
constraints: associates a section with the parameters to include. If the
section is not in this dictionnary, all the parameters are included.
ignore: list of variables to ignore in the grouping
Returns:
Configuration described by ``parser_args``
"""
if ignore is None:
ignore = []
if constraints is None:
constraints = {}
parser_args_ = copy.deepcopy(parser_args)
grouped_argparse_args = {}
if not isinstance(parser_args_, dict):
parser_args_ = vars(parser_args_)
for section in sections:
section_params = (
dconfig.config_description.get(section, {}).keys()
if section not in constraints
else constraints[section]
)
for param in section_params:
if param in parser_args:
if section not in grouped_argparse_args:
grouped_argparse_args[section] = {}
grouped_argparse_args[section][param] = parser_args_[param]
del parser_args_[param]
for remaining_param, value in parser_args_.items():
if remaining_param not in ignore:
raise Exception(str(remaining_param) + " does not belong to any section.")
return grouped_argparse_args
@typing.overload
def return_config_from_parser(
repo: str,
sections: typing.Optional[typing.Iterable[str]],
constraints: typing.Optional[typing.Dict[str]] = None,
parse_known_args: typing.Literal[True] = None,
) -> typing.Tuple[dict, typing.List[str]]:
...
@typing.overload
def return_config_from_parser(
repo: str,
sections: typing.Optional[typing.Iterable[str]],
constraints: typing.Optional[typing.Dict[str]] = None,
parse_known_args: typing.Literal[False] = None,
) -> dict:
...
[docs]
def return_config_from_parser(
repo: str,
sections: typing.Optional[typing.Iterable[str]],
program_args: typing.Optional[dict] = None,
constraints: typing.Optional[typing.Dict[str]] = None,
parse_known_args: bool = False,
) -> dict | typing.Tuple[dict, typing.List[str]]:
"""Load the default configuration.
Create a parser that allows to parse (a) YAML configuration file(s)
as well as arguments to change the configuration. Returns the configuration
altered by the arguments that are parsed.
Args:
repo: path to the root of the repository
sections: parameter sections to add
constraints: associates a section with the parameters to include. If the
section is not in this dictionnary, all the parameters are included.
program_args: Dictionnary that allows to specify how to configure the
subparsers that define which program to use. It contains the key-value
``dest`` that specifies which program to run, and the possible
programs ``sections`` (as a program is also associated with a section.)
The other key-value couples (such as ``help``) are passed to
the :py:func:`add_subparsers` method.
parse_known_args: whether to only parse known arguments. The other arguments
that are unknown given ``sections`` and ``constraints`` will be passed
to the other scripts.
return_last_config_path: whether to return the last configuration path
that was used. The latter is used to determine the output directory
in the case where ``auto_output_mode`` is set to ``same``.
Returns:
Configuration. If ``parse_known_args`` is set to ``True``, alse returns
the arguments that were not parsed..
If ``return_last_config_path``, also return the path of the last configuration
file that was used (or ``None`` if there were no configuration file)
"""
# Setup the parser
parser = argparse.ArgumentParser(
description=(
"Save the hits and MC particles stored in (a) (X)DIGI file(s) "
"into CSV files"
)
)
prepare_parser(
parser=parser,
sections=sections,
program_args=program_args,
constraints=constraints,
with_config=True,
)
if parse_known_args:
parser_args, unknown_args = parser.parse_known_args()
parser_args = vars(parser_args)
else:
parser_args = vars(parser.parse_args())
if program_args:
program_type = program_args["dest"]
chosen_program = parser_args[program_type]
sections = sections + [chosen_program] # (copy)
default_config = import_default_config(
repo=repo, sections=sections, constraints=constraints
)
# Overwrite the configuration according to what has been parsed
config_paths = parser_args["config"]
config = default_config # /!\ not a copy
if config_paths:
for config_path in config_paths:
print("Load config", config_path)
# If relative paths are given as input, express them relative to
# the directory where the YAML configuration file is
new_config = import_config(config_path)
override_nested_dict(new_config, config, inplace=True)
# Arguments to ignore when grouping by section
ignore = ["config"]
if program_args:
ignore.append(program_type)
grouped_argparse_args = group_argparse_args(
parser_args, sections=sections, constraints=constraints, ignore=ignore
)
override_nested_dict(grouped_argparse_args, config, inplace=True)
if program_args:
config[program_type] = chosen_program
# Return
if parse_known_args:
return config, unknown_args
else:
return config