from __future__ import annotations
import typing
import os
import os.path as op
import subprocess
from definitions import dcomputing
from tools import envvar
[docs]
def add_environment_variables_to_copy(
environment_variables: typing.Dict[str, str],
backend: str,
):
"""Add in-place the environment variables to copy.
Args:
environment_variables: dictionnary of environemnt variables that is going to
be altered in place.
backend: backend used for the computation
"""
for environment_variable in dcomputing.environment_variables_to_copy.get(
backend, dcomputing.environment_variables_to_copy.get("default", [])
):
environment_variables[environment_variable] = envvar.get_environment_variable(
environment_variable
)
def _create_ganga_job(backend: str, wrapper_path: str):
"""Create a ganga job given the backend
Args:
backend: backend used for the computation
wrapper_path: path to the wrapper script to source the environment before
running the actual script
Returns:
ganga.Job: job to run, without any arguments
"""
import ganga.ganga
job = ganga.Job()
job.application = ganga.Executable()
# job.application.exe = ganga.File(algo_path)
job.application.exe = wrapper_path
job.application.platform = "x86_64-centos7-gcc8-opt"
job.application.env = {}
add_environment_variables_to_copy(job.application.env, backend)
# Configure backend
if backend == "ganga-local":
job.backend = ganga.Local()
elif backend == "ganga-dirac":
job.backend = ganga.Dirac()
job.backend.settings["Destination"] = "LCG.CERN.cern" # to have AFS mounted
job.backend.settings["CPUTime"] = 50
elif backend == "ganga-condor":
job.backend = ganga.Condor()
job.backend.requirements.memory = 1200
return job
[docs]
def write_condor_job(
job: typing.Dict[str, str], path: str, items: typing.Optional[list] = None
):
"""Write a HTCondor submit file.
Args:
job: associates an argument with its value in the submit file
path: where to save the file
items: list of items, used after the ``queue`` statement
"""
os.makedirs(op.dirname(path), exist_ok=True)
with open(path, "w") as submit_file:
for key, value in job.items():
submit_file.write(f"{key} = {value}\n")
submit_file.write("queue")
if items is not None:
submit_file.write(" 1 in (" + ", ".join(map(str, items)) + ")")
[docs]
def submit_job(
script_path: str,
args: list,
nb_files: typing.Optional[int] = None,
nb_files_per_job: int = -1,
backend: str = "condor",
logdir: typing.Optional[str] = None,
max_runtime: typing.Optional[int] = None,
max_transfer_output_mb: typing.Optional[int] = None,
):
"""Submit a job in HTCondor or ganga.
Args:
script_path: path to the script to run
args: list of the arguments to pass to the script
nb_files: number of files in total
nb_files_per_job: number of files per job
backend: ``condor``, ``ganga-local`` or ``ganga-dirac``. Only the
``condor`` backend is suppose to work properly
logdir: path where to save the log
max_runtime: time within which the job should be run. This is used
by the ``condor`` backend
max_transfer_output_mb: ``MAX_TRANSFER_OUTPUT_MB`` HTCondor parameter. This
is used by the ``condor`` backend
"""
wrapper_path = op.join(envvar.get_repo_path(), "run", "wrapper.sh")
if nb_files_per_job != -1:
start_indices = range(0, nb_files, nb_files_per_job)
if backend in dcomputing.ganga_backends:
import ganga.ganga
job = _create_ganga_job(backend=backend, wrapper_path=wrapper_path)
if backend == "ganga-local":
# disable parallelisation as it might not work locally
job.parallel_submit = False
if nb_files_per_job != -1:
assert isinstance(nb_files, int)
splitted_args = []
for start_index in start_indices:
end_index = start_index + nb_files_per_job
end_index = max(end_index, nb_files)
splitted_args.append(
[script_path]
+ args
+ ["--start_index", start_index, "--suboutdir", str(start_index)]
)
job.splitter = ganga.ArgSplitter(args=splitted_args)
job.submit()
ganga.runMonitoring()
elif backend == "condor":
submit_path = os.path.join(logdir, "condor.submit")
if logdir is None:
logdir = "log/"
job = {
"executable": wrapper_path,
"universe": "vanilla",
"notification": "never",
"log": op.join(logdir, "log.log"),
}
if max_runtime is not None:
max_runtime = int(max_runtime)
job["+MaxRuntime"] = str(max_runtime)
if max_transfer_output_mb is not None:
max_transfer_output_mb = int(max_transfer_output_mb)
job["+MaxTransferOutputMB"] = int(max_transfer_output_mb)
environment_variables = {}
add_environment_variables_to_copy(environment_variables, backend)
job["environment"] = (
'"'
+ " ".join(
[f"{name}={value}" for name, value in environment_variables.items()]
)
+ '"'
)
if nb_files_per_job != -1:
job["output"] = op.join(logdir, "stdout.$(Item).log")
job["error"] = op.join(logdir, "stderr.$(Item).log")
job["arguments"] = (
f"{script_path} "
+ " ".join(args)
+ " "
+ "--start_index $(Item) "
+ "--suboutdir $(Item)"
)
else:
job["output"] = op.join(logdir, "stdout.log")
job["error"] = op.join(logdir, "stderr.log")
job["arguments"] = script_path + " " + " ".join(args)
write_condor_job(
job, submit_path, items=start_indices if nb_files_per_job != -1 else None
)
subprocess.run(["condor_submit", submit_path])
else:
raise ValueError(
"Backend is "
+ str(backend)
+ " which is not recognised as a valid backend."
)