Source code for tools.inoutconfig
"""Tools to configure the input and output paths."""
from __future__ import annotations
import typing
import os
import tempfile
import subprocess
from definitions import dpaths, dconfig
from . import envvar, tpaths
[docs]
def get_outdir(
config: dict,
datatype: typing.Optional[str] = None,
) -> str:
"""Get the output path according to the output mode that was chosen.
Args:
config: current configuration
datatype: Type of the data (e.g., ``csv``). This is used if ``auto_output_mode``
is set to ``eos``.
Returns:
output path
"""
output_config = config["output"]
auto_output_mode = output_config["auto_output_mode"]
if auto_output_mode == "outdir":
assert isinstance(output_config["outdir"], str)
outdir = output_config["outdir"]
elif auto_output_mode == "eos":
assert datatype is not None
assert output_config["version"] is not None
assert output_config["dataname"] is not None
outdir = output_config["wildcard_data_outdir"].format(
datadir=config["global"]["datadir"],
datatype=datatype,
version=output_config["version"],
dataname=output_config["dataname"],
)
else:
raise ValueError(
"`auto_output_mode is `" + str(auto_output_mode) + "` which is unknown."
)
suboutdir = output_config["suboutdir"]
if suboutdir is not None:
return os.path.join(outdir, suboutdir)
else:
return outdir
[docs]
def generate_catalog(
paths: typing.Iterable[str], use_ganga: bool = False
) -> tempfile.NamedTemporaryFile | tempfile.TemporaryDirectory | None:
"""Get the PFNs given LFNs.
Args:
paths: list of paths, that can contain LFNs (Logical File Name).
LFNs must start with ``LFN:``
use_ganga: whether to use ``ganga``
Returns:
Temporary file that contains the XML catalog of the LFNs, or ``None``
if no LFNs were found.
Notes:
This function uses ganga.
"""
lfns = list(filter(lambda path: path.startswith("LFN:"), paths))
if lfns:
if use_ganga:
import ganga.ganga
dataset = ganga.LHCbDataset(files=lfns)
catalog_content = dataset.getCatalog()
catalog_tempfile = tempfile.NamedTemporaryFile(
suffix=".xml", mode="wb", delete=False
)
catalog_tempfile.write(catalog_content)
catalog_tempfile.close() # Need to close it, otherwise Moore can't read it
print("Generating the XML catalog associated with the LFNs paths...")
return catalog_tempfile
else:
catalog_tempdir = tempfile.TemporaryDirectory()
catalog_temppath = os.path.join(catalog_tempdir.name, "catalog.xml")
# Workaround in order not to use ``ganga``
to_run = [
"lb-dirac",
"dirac-bookkeeping-genXMLCatalog",
"--Ignore",
"--Catalog=" + catalog_temppath,
"-l",
",".join(lfns),
]
print("Generating the XML catalog associated with the LFNs paths...")
print("Run", " ".join(to_run))
result = subprocess.run(to_run)
assert result.returncode == 0
return catalog_tempdir
[docs]
def ban_storage_elements(
banned_storage_elements: typing.List[str],
paths: typing.List[str],
xml_catalog_path: str,
) -> typing.List[str]:
"""Remove the physical links to banned storage elements in the XML catalog file.
Args:
banned_storage_elements: storage elements that must not be used
paths: list of paths that may include LFN paths
xml_catalog_path: path to the XML catalog to alter
Returns:
List of LFNs to remove as they are only stored on banned storage elements
Notes:
All this is EXTREMELY ugly but I couldn't find another way of removing storage
elements while keeping XML files.
"""
import xml.etree.ElementTree as ET
xml_content = ET.parse(xml_catalog_path)
catalog = xml_content.getroot()
lfns_to_remove = []
missing_lfns = [path for path in paths if path.startswith("LFN:")]
for catalog_file in catalog[:]:
logical_elements = catalog_file.findall("logical")
assert len(logical_elements) == 1
lfn_elements = logical_elements[0].findall("lfn")
assert len(lfn_elements) == 1
lfn = "LFN:" + lfn_elements[0].attrib["name"]
if lfn in paths:
assert lfn in missing_lfns, (
"LFN was already remove, which means it appears 2 times "
"in the catalog. This shouldn't be the case?"
)
missing_lfns.remove(lfn)
physical_elements = catalog_file.findall("physical")
assert len(physical_elements) == 1
pfn_elements = physical_elements[0].findall("pfn")
no_pfn_element_left = True
for pfn_element in pfn_elements[:]:
if pfn_element.attrib["se"] in banned_storage_elements:
physical_elements[0].remove(pfn_element)
else:
no_pfn_element_left = False
if no_pfn_element_left:
catalog.remove(catalog_file)
lfns_to_remove.append(lfn)
# Also remove missing LFNs
lfns_to_remove += missing_lfns
if lfns_to_remove:
with open(xml_catalog_path, "wb") as xml_catalog_file:
xml_catalog_file.write(
bytes(
'<?xml version="1.0" encoding="UTF-8" standalone="no" ?>\n',
encoding="utf-8",
)
)
xml_catalog_file.write(
bytes(
'<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">\n', encoding="utf-8"
)
)
xml_content.write(xml_catalog_file, encoding="utf-8")
return lfns_to_remove
[docs]
def get_bookkeeping_lfns(
bookkeeping_path: str, start_index: int = 0, nb_files: int = -1
) -> typing.List[str]:
"""Get the LFNs associated with a bookkeeping path.
Args:
bookkeeping_path: path in the Dirac Bookkeeping browser
start_index: index of the first LFN to retrieve
nb_files: number of LFNs to retrieve
Returns:
List of LFNs from ``start_index`` to ``start_index + nb_files``
Notes:
This function uses ganga.
"""
import ganga.ganga
print("The bookkeeping path is", bookkeeping_path)
print("Starting index:", start_index)
print("Number of files:", nb_files)
print("Getting the LFNs associated with the bookkeeping path...")
dataset = ganga.BKQuery(bookkeeping_path).getDataset()
if nb_files != -1:
sliced_dataset = dataset[start_index : start_index + nb_files]
else:
sliced_dataset = dataset[start_index:]
lfns = sliced_dataset.getLFNs()
lfns = ["LFN:" + lfn for lfn in lfns]
print("The LFNs are")
for lfn in lfns:
print("-", lfn)
return lfns
[docs]
def get_moore_input_config(input_config: dict) -> dict | None:
"""Return the configuration dictionnary of the input file for Moore,
in the case where the ``python_input`` is not used, and then, the generic
python moore input file will be used.
Args:
input_config: section ``moore_input`` of the configuration
Returns:
configuration dictionnary of the input file for Moore, or ``None``
if a python file is used as input.
"""
python_input = input_config["python_input"]
bookkeeping_path = input_config["bookkeeping_path"]
paths = input_config["paths"]
assert (
(python_input is not None) ^ (bookkeeping_path is not None) ^ (len(paths) > 0)
), (
"You can either use `python_input`, `bookkeeping_path` or `paths` "
"but neither two of them at the same time nor none of them. \n"
"python_input: " + str(python_input) + "\n"
"bookkeeping_path: " + str(bookkeeping_path) + "\n"
"paths: " + str(paths)
)
if python_input:
return
else:
moore_input_config = {
param: input_config[param] for param in dconfig.input_config_params
}
start_index = input_config["start_index"]
nb_files = input_config["nb_files"]
if bookkeeping_path:
paths = get_bookkeeping_lfns(
bookkeeping_path=bookkeeping_path,
start_index=start_index,
nb_files=nb_files,
)
else:
if isinstance(paths, str):
paths = [paths]
paths = tpaths.expand_paths(paths)
if start_index != 0 or nb_files != -1:
if nb_files != -1:
paths = paths[start_index : start_index + nb_files]
else:
paths = paths[start_index:]
moore_input_config["paths"] = paths
return moore_input_config
[docs]
class MooreInputConfig:
"""This context manager allows the configure the input needed for Moore,
given the configuration.
It returns the path to the python file that is needed, that is, ``python_input``
in the configuration, or a custom python file that allows the configure
the input in the case where ``bookkeeping_path`` or ``paths`` are used.
In the latter, it creates the necessary temporary configuration file that
the custom python input file will read to configure the input.
When the context manager is exited, the environment variables and temporary
files are deleted.
Attributes:
config: configuration of ``moore_input``
repo: path to the root of the repository
return_paths: whether to return the paths as well
"""
def __init__(self, config: dict, repo: str, return_paths: bool = False):
self.config = config
self.repo = repo
self._input_config_tmpfile = None
self._catalog_tmpdir = None
self.return_paths = return_paths
def __enter__(
self,
) -> str | typing.Tuple[str, typing.List[str] | None, typing.List[str] | None]:
"""
Returns:
Input python path. If ``return_paths`` attribute set to ``True``,
also returns the list of paths or the list of banned paths.
"""
moore_input_config = get_moore_input_config(self.config)
# python input file
if moore_input_config is not None:
paths = moore_input_config["paths"]
python_input_to_use = dpaths.generic_moore_input.format(repo=self.repo)
self._catalog_tmpdir = generate_catalog(paths)
lfns_to_remove = None
if self._catalog_tmpdir is not None:
xml_catalog_path = os.path.join(
self._catalog_tmpdir.name, "catalog.xml"
)
moore_input_config["xml_catalog"] = xml_catalog_path
banned_storage_elements = self.config["banned_storage_elements"]
if banned_storage_elements:
lfns_to_remove = ban_storage_elements(
banned_storage_elements=banned_storage_elements,
paths=paths,
xml_catalog_path=xml_catalog_path,
)
if lfns_to_remove:
paths = [path for path in paths if path not in lfns_to_remove]
moore_input_config["paths"] = paths
print(
"The following LFNs were removed as they are hosted in "
+ "banned storage elements:",
lfns_to_remove,
)
self._input_config_tmpfile = tpaths.write_yaml_temp_file(moore_input_config)
envvar.set_environment_variable(
"XDIGI2CSV_INPUT_CONFIG", self._input_config_tmpfile.name
)
else:
python_input_to_use = self.config["python_input"]
paths = None
if self.return_paths:
return python_input_to_use, paths, lfns_to_remove
else:
return python_input_to_use
def __exit__(self, type, value, traceback):
"""Close the temporary file(s) and delete the environment variable."""
if self._catalog_tmpdir is not None:
self._catalog_tmpdir.cleanup()
if self._input_config_tmpfile is not None:
self._input_config_tmpfile.close()
del os.environ["XDIGI2CSV_INPUT_CONFIG"]
[docs]
def get_allen_input(
indir: typing.Optional[str] = None,
mdf_filename: typing.Optional[str] = None,
geo_dirname: typing.Optional[str] = None,
paths: typing.Optional[str] = None,
geodir: typing.Optional[str] = None,
) -> typing.Tuple[typing.List[str], str | None]:
"""Get the MDF input paths and the geometry directory from the configuration.
There are 2 ways of specifying an Allen input
* With ``indir``, ``mdf_filename`` and ``geo_dirname``. This is practical for \
files generated by the ``xdigi2mdf`` program because you just need to specify \
the input directory
* With ``paths`` and ``geodir``
Args:
indir: Input where directory where the MDF files are
mdf_filename: MDF file name in ``indir``
geo_dirname: geometry directory name in ``indir``
paths: list of MDF paths
geodir: input geometry directory
Returns:
List of MDF input files and the geometry directory.
"""
load_with_indir = (indir is not None) and (mdf_filename is not None)
load_with_paths = bool(paths)
assert not (load_with_indir and load_with_paths), (
"Both an input directory and MDF paths were specified, which are "
"2 distinct ways of providing an Allen input."
)
if load_with_indir:
inpaths = [
os.path.join(indir, mdf_filename if (mdf_filename is not None) else "*.mdf")
]
geodir = os.path.join(indir, geo_dirname) if (geo_dirname is not None) else None
return inpaths, geodir
elif load_with_paths:
return paths, geodir
else:
raise Exception("No input.")
[docs]
def get_moore_build(moore_build: str, platform: str | None = None) -> typing.List[str]:
"""Get what to run in order to have access to the Moore build
Args:
moore_build: value of the ``build/moore`` option
platform: Platform of the build to use (within lb-run or the local stack)
Returns:
if ``moore_build`` starts with ``lb-run:``, it is intepreted
as ``lb-run Moore/{version}`` and the latter is returned.
Otherwise, ``moore_build`` is returned as it is
"""
if moore_build.startswith("lb-run"):
lb_run, moore = moore_build.split(":")
lb_runs = lb_run.split(" ")
assert moore.startswith("Moore/")
if platform is None:
return lb_runs + [moore]
else:
return lb_runs + ["-c", str(platform), moore]
else:
if platform is None:
return [moore_build]
else:
return [os.path.join(moore_build, f"build.{platform}", "run")]