Source code for tools.inoutconfig

"""Tools to configure the input and output paths."""
from __future__ import annotations
import typing
import os
import tempfile
import subprocess
from definitions import dpaths, dconfig
from . import envvar, tpaths


[docs] def get_outdir( config: dict, datatype: typing.Optional[str] = None, ) -> str: """Get the output path according to the output mode that was chosen. Args: config: current configuration datatype: Type of the data (e.g., ``csv``). This is used if ``auto_output_mode`` is set to ``eos``. Returns: output path """ output_config = config["output"] auto_output_mode = output_config["auto_output_mode"] if auto_output_mode == "outdir": assert isinstance(output_config["outdir"], str) outdir = output_config["outdir"] elif auto_output_mode == "eos": assert datatype is not None assert output_config["version"] is not None assert output_config["dataname"] is not None outdir = output_config["wildcard_data_outdir"].format( datadir=config["global"]["datadir"], datatype=datatype, version=output_config["version"], dataname=output_config["dataname"], ) else: raise ValueError( "`auto_output_mode is `" + str(auto_output_mode) + "` which is unknown." ) suboutdir = output_config["suboutdir"] if suboutdir is not None: return os.path.join(outdir, suboutdir) else: return outdir
[docs] def generate_catalog( paths: typing.Iterable[str], use_ganga: bool = False ) -> tempfile.NamedTemporaryFile | tempfile.TemporaryDirectory | None: """Get the PFNs given LFNs. Args: paths: list of paths, that can contain LFNs (Logical File Name). LFNs must start with ``LFN:`` use_ganga: whether to use ``ganga`` Returns: Temporary file that contains the XML catalog of the LFNs, or ``None`` if no LFNs were found. Notes: This function uses ganga. """ lfns = list(filter(lambda path: path.startswith("LFN:"), paths)) if lfns: if use_ganga: import ganga.ganga dataset = ganga.LHCbDataset(files=lfns) catalog_content = dataset.getCatalog() catalog_tempfile = tempfile.NamedTemporaryFile( suffix=".xml", mode="wb", delete=False ) catalog_tempfile.write(catalog_content) catalog_tempfile.close() # Need to close it, otherwise Moore can't read it print("Generating the XML catalog associated with the LFNs paths...") return catalog_tempfile else: catalog_tempdir = tempfile.TemporaryDirectory() catalog_temppath = os.path.join(catalog_tempdir.name, "catalog.xml") # Workaround in order not to use ``ganga`` to_run = [ "lb-dirac", "dirac-bookkeeping-genXMLCatalog", "--Ignore", "--Catalog=" + catalog_temppath, "-l", ",".join(lfns), ] print("Generating the XML catalog associated with the LFNs paths...") print("Run", " ".join(to_run)) result = subprocess.run(to_run) assert result.returncode == 0 return catalog_tempdir
[docs] def ban_storage_elements( banned_storage_elements: typing.List[str], paths: typing.List[str], xml_catalog_path: str, ) -> typing.List[str]: """Remove the physical links to banned storage elements in the XML catalog file. Args: banned_storage_elements: storage elements that must not be used paths: list of paths that may include LFN paths xml_catalog_path: path to the XML catalog to alter Returns: List of LFNs to remove as they are only stored on banned storage elements Notes: All this is EXTREMELY ugly but I couldn't find another way of removing storage elements while keeping XML files. """ import xml.etree.ElementTree as ET xml_content = ET.parse(xml_catalog_path) catalog = xml_content.getroot() lfns_to_remove = [] missing_lfns = [path for path in paths if path.startswith("LFN:")] for catalog_file in catalog[:]: logical_elements = catalog_file.findall("logical") assert len(logical_elements) == 1 lfn_elements = logical_elements[0].findall("lfn") assert len(lfn_elements) == 1 lfn = "LFN:" + lfn_elements[0].attrib["name"] if lfn in paths: assert lfn in missing_lfns, ( "LFN was already remove, which means it appears 2 times " "in the catalog. This shouldn't be the case?" ) missing_lfns.remove(lfn) physical_elements = catalog_file.findall("physical") assert len(physical_elements) == 1 pfn_elements = physical_elements[0].findall("pfn") no_pfn_element_left = True for pfn_element in pfn_elements[:]: if pfn_element.attrib["se"] in banned_storage_elements: physical_elements[0].remove(pfn_element) else: no_pfn_element_left = False if no_pfn_element_left: catalog.remove(catalog_file) lfns_to_remove.append(lfn) # Also remove missing LFNs lfns_to_remove += missing_lfns if lfns_to_remove: with open(xml_catalog_path, "wb") as xml_catalog_file: xml_catalog_file.write( bytes( '<?xml version="1.0" encoding="UTF-8" standalone="no" ?>\n', encoding="utf-8", ) ) xml_catalog_file.write( bytes( '<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">\n', encoding="utf-8" ) ) xml_content.write(xml_catalog_file, encoding="utf-8") return lfns_to_remove
[docs] def get_bookkeeping_lfns( bookkeeping_path: str, start_index: int = 0, nb_files: int = -1 ) -> typing.List[str]: """Get the LFNs associated with a bookkeeping path. Args: bookkeeping_path: path in the Dirac Bookkeeping browser start_index: index of the first LFN to retrieve nb_files: number of LFNs to retrieve Returns: List of LFNs from ``start_index`` to ``start_index + nb_files`` Notes: This function uses ganga. """ import ganga.ganga print("The bookkeeping path is", bookkeeping_path) print("Starting index:", start_index) print("Number of files:", nb_files) print("Getting the LFNs associated with the bookkeeping path...") dataset = ganga.BKQuery(bookkeeping_path).getDataset() if nb_files != -1: sliced_dataset = dataset[start_index : start_index + nb_files] else: sliced_dataset = dataset[start_index:] lfns = sliced_dataset.getLFNs() lfns = ["LFN:" + lfn for lfn in lfns] print("The LFNs are") for lfn in lfns: print("-", lfn) return lfns
[docs] def get_moore_input_config(input_config: dict) -> dict | None: """Return the configuration dictionnary of the input file for Moore, in the case where the ``python_input`` is not used, and then, the generic python moore input file will be used. Args: input_config: section ``moore_input`` of the configuration Returns: configuration dictionnary of the input file for Moore, or ``None`` if a python file is used as input. """ python_input = input_config["python_input"] bookkeeping_path = input_config["bookkeeping_path"] paths = input_config["paths"] assert ( (python_input is not None) ^ (bookkeeping_path is not None) ^ (len(paths) > 0) ), ( "You can either use `python_input`, `bookkeeping_path` or `paths` " "but neither two of them at the same time nor none of them. \n" "python_input: " + str(python_input) + "\n" "bookkeeping_path: " + str(bookkeeping_path) + "\n" "paths: " + str(paths) ) if python_input: return else: moore_input_config = { param: input_config[param] for param in dconfig.input_config_params } start_index = input_config["start_index"] nb_files = input_config["nb_files"] if bookkeeping_path: paths = get_bookkeeping_lfns( bookkeeping_path=bookkeeping_path, start_index=start_index, nb_files=nb_files, ) else: if isinstance(paths, str): paths = [paths] paths = tpaths.expand_paths(paths) if start_index != 0 or nb_files != -1: if nb_files != -1: paths = paths[start_index : start_index + nb_files] else: paths = paths[start_index:] moore_input_config["paths"] = paths return moore_input_config
[docs] class MooreInputConfig: """This context manager allows the configure the input needed for Moore, given the configuration. It returns the path to the python file that is needed, that is, ``python_input`` in the configuration, or a custom python file that allows the configure the input in the case where ``bookkeeping_path`` or ``paths`` are used. In the latter, it creates the necessary temporary configuration file that the custom python input file will read to configure the input. When the context manager is exited, the environment variables and temporary files are deleted. Attributes: config: configuration of ``moore_input`` repo: path to the root of the repository return_paths: whether to return the paths as well """ def __init__(self, config: dict, repo: str, return_paths: bool = False): self.config = config self.repo = repo self._input_config_tmpfile = None self._catalog_tmpdir = None self.return_paths = return_paths def __enter__( self, ) -> str | typing.Tuple[str, typing.List[str] | None, typing.List[str] | None]: """ Returns: Input python path. If ``return_paths`` attribute set to ``True``, also returns the list of paths or the list of banned paths. """ moore_input_config = get_moore_input_config(self.config) # python input file if moore_input_config is not None: paths = moore_input_config["paths"] python_input_to_use = dpaths.generic_moore_input.format(repo=self.repo) self._catalog_tmpdir = generate_catalog(paths) lfns_to_remove = None if self._catalog_tmpdir is not None: xml_catalog_path = os.path.join( self._catalog_tmpdir.name, "catalog.xml" ) moore_input_config["xml_catalog"] = xml_catalog_path banned_storage_elements = self.config["banned_storage_elements"] if banned_storage_elements: lfns_to_remove = ban_storage_elements( banned_storage_elements=banned_storage_elements, paths=paths, xml_catalog_path=xml_catalog_path, ) if lfns_to_remove: paths = [path for path in paths if path not in lfns_to_remove] moore_input_config["paths"] = paths print( "The following LFNs were removed as they are hosted in " + "banned storage elements:", lfns_to_remove, ) self._input_config_tmpfile = tpaths.write_yaml_temp_file(moore_input_config) envvar.set_environment_variable( "XDIGI2CSV_INPUT_CONFIG", self._input_config_tmpfile.name ) else: python_input_to_use = self.config["python_input"] paths = None if self.return_paths: return python_input_to_use, paths, lfns_to_remove else: return python_input_to_use def __exit__(self, type, value, traceback): """Close the temporary file(s) and delete the environment variable.""" if self._catalog_tmpdir is not None: self._catalog_tmpdir.cleanup() if self._input_config_tmpfile is not None: self._input_config_tmpfile.close() del os.environ["XDIGI2CSV_INPUT_CONFIG"]
[docs] def get_allen_input( indir: typing.Optional[str] = None, mdf_filename: typing.Optional[str] = None, geo_dirname: typing.Optional[str] = None, paths: typing.Optional[str] = None, geodir: typing.Optional[str] = None, ) -> typing.Tuple[typing.List[str], str | None]: """Get the MDF input paths and the geometry directory from the configuration. There are 2 ways of specifying an Allen input * With ``indir``, ``mdf_filename`` and ``geo_dirname``. This is practical for \ files generated by the ``xdigi2mdf`` program because you just need to specify \ the input directory * With ``paths`` and ``geodir`` Args: indir: Input where directory where the MDF files are mdf_filename: MDF file name in ``indir`` geo_dirname: geometry directory name in ``indir`` paths: list of MDF paths geodir: input geometry directory Returns: List of MDF input files and the geometry directory. """ load_with_indir = (indir is not None) and (mdf_filename is not None) load_with_paths = bool(paths) assert not (load_with_indir and load_with_paths), ( "Both an input directory and MDF paths were specified, which are " "2 distinct ways of providing an Allen input." ) if load_with_indir: inpaths = [ os.path.join(indir, mdf_filename if (mdf_filename is not None) else "*.mdf") ] geodir = os.path.join(indir, geo_dirname) if (geo_dirname is not None) else None return inpaths, geodir elif load_with_paths: return paths, geodir else: raise Exception("No input.")
[docs] def get_moore_build(moore_build: str, platform: str | None = None) -> typing.List[str]: """Get what to run in order to have access to the Moore build Args: moore_build: value of the ``build/moore`` option platform: Platform of the build to use (within lb-run or the local stack) Returns: if ``moore_build`` starts with ``lb-run:``, it is intepreted as ``lb-run Moore/{version}`` and the latter is returned. Otherwise, ``moore_build`` is returned as it is """ if moore_build.startswith("lb-run"): lb_run, moore = moore_build.split(":") lb_runs = lb_run.split(" ") assert moore.startswith("Moore/") if platform is None: return lb_runs + [moore] else: return lb_runs + ["-c", str(platform), moore] else: if platform is None: return [moore_build] else: return [os.path.join(moore_build, f"build.{platform}", "run")]