Source code for rail.projects.execution

"""Functions to execute pipeline and other shell commands"""

import enum
import os
import subprocess
import time


[docs] class RunMode(enum.Enum): """Choose the run mode""" dry_run = 0 bash = 1 slurm = 2
S3DF_SLURM_OPTIONS: list[str] = [ "-p", "milano", "--account", "rubin:commissioning@milano", "--mem", "16G", "--parsable", ] PERLMUTTER_SLURM_OPTIONS: list[str] = [ "--account", "m1727", "--constraint", "cpu", "--qos", "regular", "--parsable", ] SLURM_OPTIONS = { "s3df": S3DF_SLURM_OPTIONS, "perlmutter": PERLMUTTER_SLURM_OPTIONS, }
[docs] def handle_command( run_mode: RunMode, command_line: list[str], ) -> int: """Run a single command in the mode requested Parameters ---------- run_mode: RunMode How to run the command, e.g., dry_run, bash or slurm command_line: list[str] Tokens in the command line Returns ------- int: Status returned by the command. 0 for success, exit code otherwise """ print("subprocess:", *command_line) _start_time = time.time() print(">>>>>>>>") if run_mode == RunMode.dry_run: # print(command_line) command_line.insert(0, "echo") finished = subprocess.run(command_line, check=False) elif run_mode == RunMode.bash: # pragma: no cover # return os.system(command_line) finished = subprocess.run(command_line, check=False) elif run_mode == RunMode.slurm: # pragma: no cover raise RuntimeError( "handle_command should not be called with run_mode == RunMode.slurm" ) returncode = finished.returncode _end_time = time.time() _elapsed_time = _end_time - _start_time print("<<<<<<<<") print(f"subprocess completed with status {returncode} in {_elapsed_time} seconds\n") return returncode
[docs] def handle_commands( run_mode: RunMode, command_lines: list[list[str]], script_path: str | None = None, ) -> int: # pragma: no cover """Run a multiple commands in the mode requested Parameters ---------- run_mode: RunMode How to run the command, e.g., dry_run, bash or slurm command_lines: list[list[str]] List of commands to run, each one is the list of tokens in the command line script_path: str | None Path to write the slurm submit script to Returns ------- int: Status returned by the commands. 0 for success, exit code otherwise """ if run_mode in [RunMode.dry_run, RunMode.bash]: for command_ in command_lines: retcode = handle_command(run_mode, command_) if retcode: return retcode return 0 # At this point we are using slurm and need a script to send to batch if script_path is None: raise ValueError( "handle_commands with run_mode == RunMode.slurm requires a path to a script to write", ) try: os.makedirs(os.path.dirname(script_path)) except FileExistsError: pass with open(script_path, "w", encoding="utf-8") as fout: fout.write("#!/usr/bin/bash\n\n") for command_ in command_lines: com_line = " ".join(command_) fout.write(f"{com_line}\n") script_out = script_path.replace(".sh", ".out") command_line = ["srun", "--output", script_out, "--error", script_path] try: with subprocess.Popen( command_line, stdout=subprocess.PIPE, ) as srun: assert srun.stdout line = srun.stdout.read().decode().strip() ret_val = int(line.split("|")[0]) except TypeError as msg: raise TypeError(f"Bad slurm submit: {msg}") from msg return ret_val
[docs] def sbatch_wrap( run_mode: RunMode, site: str, args: list[str] ) -> int: # pragma: no cover """Wrap a rail_pipe command with site-based arguements Parameters ---------- run_mode: RunMode How to run the command, e.g., dry_run, bash or slurm site: str Execution site, used to set sbatch options args: list[str] Additional arguments Returns ------- int Status. 0 for success, exit code otherwise """ try: slurm_options = SLURM_OPTIONS[site] except KeyError as msg: raise KeyError( f"{site} is not a recognized site, options are {SLURM_OPTIONS.keys()}" ) from msg command_line = ( ["sbatch"] + slurm_options + ["rail_pipe", "--run_mode", "slurm"] + list(args) ) return handle_command(run_mode, command_line)