Source code for tools.tcomputing

from __future__ import annotations
import typing
import os
import os.path as op
import subprocess
from definitions import dcomputing
from tools import envvar


[docs] def add_environment_variables_to_copy( environment_variables: typing.Dict[str, str], backend: str, ): """Add in-place the environment variables to copy. Args: environment_variables: dictionnary of environemnt variables that is going to be altered in place. backend: backend used for the computation """ for environment_variable in dcomputing.environment_variables_to_copy.get( backend, dcomputing.environment_variables_to_copy.get("default", []) ): environment_variables[environment_variable] = envvar.get_environment_variable( environment_variable )
def _create_ganga_job(backend: str, wrapper_path: str): """Create a ganga job given the backend Args: backend: backend used for the computation wrapper_path: path to the wrapper script to source the environment before running the actual script Returns: ganga.Job: job to run, without any arguments """ import ganga.ganga job = ganga.Job() job.application = ganga.Executable() # job.application.exe = ganga.File(algo_path) job.application.exe = wrapper_path job.application.platform = "x86_64-centos7-gcc8-opt" job.application.env = {} add_environment_variables_to_copy(job.application.env, backend) # Configure backend if backend == "ganga-local": job.backend = ganga.Local() elif backend == "ganga-dirac": job.backend = ganga.Dirac() job.backend.settings["Destination"] = "LCG.CERN.cern" # to have AFS mounted job.backend.settings["CPUTime"] = 50 elif backend == "ganga-condor": job.backend = ganga.Condor() job.backend.requirements.memory = 1200 return job
[docs] def write_condor_job( job: typing.Dict[str, str], path: str, items: typing.Optional[list] = None ): """Write a HTCondor submit file. Args: job: associates an argument with its value in the submit file path: where to save the file items: list of items, used after the ``queue`` statement """ os.makedirs(op.dirname(path), exist_ok=True) with open(path, "w") as submit_file: for key, value in job.items(): submit_file.write(f"{key} = {value}\n") submit_file.write("queue") if items is not None: submit_file.write(" 1 in (" + ", ".join(map(str, items)) + ")")
[docs] def submit_job( script_path: str, args: list, nb_files: typing.Optional[int] = None, nb_files_per_job: int = -1, backend: str = "condor", logdir: typing.Optional[str] = None, max_runtime: typing.Optional[int] = None, max_transfer_output_mb: typing.Optional[int] = None, ): """Submit a job in HTCondor or ganga. Args: script_path: path to the script to run args: list of the arguments to pass to the script nb_files: number of files in total nb_files_per_job: number of files per job backend: ``condor``, ``ganga-local`` or ``ganga-dirac``. Only the ``condor`` backend is suppose to work properly logdir: path where to save the log max_runtime: time within which the job should be run. This is used by the ``condor`` backend max_transfer_output_mb: ``MAX_TRANSFER_OUTPUT_MB`` HTCondor parameter. This is used by the ``condor`` backend """ wrapper_path = op.join(envvar.get_repo_path(), "run", "wrapper.sh") if nb_files_per_job != -1: start_indices = range(0, nb_files, nb_files_per_job) if backend in dcomputing.ganga_backends: import ganga.ganga job = _create_ganga_job(backend=backend, wrapper_path=wrapper_path) if backend == "ganga-local": # disable parallelisation as it might not work locally job.parallel_submit = False if nb_files_per_job != -1: assert isinstance(nb_files, int) splitted_args = [] for start_index in start_indices: end_index = start_index + nb_files_per_job end_index = max(end_index, nb_files) splitted_args.append( [script_path] + args + ["--start_index", start_index, "--suboutdir", str(start_index)] ) job.splitter = ganga.ArgSplitter(args=splitted_args) job.submit() ganga.runMonitoring() elif backend == "condor": submit_path = os.path.join(logdir, "condor.submit") if logdir is None: logdir = "log/" job = { "executable": wrapper_path, "universe": "vanilla", "notification": "never", "log": op.join(logdir, "log.log"), } if max_runtime is not None: max_runtime = int(max_runtime) job["+MaxRuntime"] = str(max_runtime) if max_transfer_output_mb is not None: max_transfer_output_mb = int(max_transfer_output_mb) job["+MaxTransferOutputMB"] = int(max_transfer_output_mb) environment_variables = {} add_environment_variables_to_copy(environment_variables, backend) job["environment"] = ( '"' + " ".join( [f"{name}={value}" for name, value in environment_variables.items()] ) + '"' ) if nb_files_per_job != -1: job["output"] = op.join(logdir, "stdout.$(Item).log") job["error"] = op.join(logdir, "stderr.$(Item).log") job["arguments"] = ( f"{script_path} " + " ".join(args) + " " + "--start_index $(Item) " + "--suboutdir $(Item)" ) else: job["output"] = op.join(logdir, "stdout.log") job["error"] = op.join(logdir, "stderr.log") job["arguments"] = script_path + " " + " ".join(args) write_condor_job( job, submit_path, items=start_indices if nb_files_per_job != -1 else None ) subprocess.run(["condor_submit", submit_path]) else: raise ValueError( "Backend is " + str(backend) + " which is not recognised as a valid backend." )