Source code for rail.projects.pipeline_holder

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import yaml
from ceci.config import StageParameter
from rail.core.stage import RailPipeline
from rail.utils import catalog_utils

from .configurable import Configurable

if TYPE_CHECKING:
    from .project import RailProject


[docs] def inform_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, # pylint: disable=unused-argument **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the inform pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor", "baseline") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) return input_files
[docs] def inform_sompz_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, # pylint: disable=unused-argument **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the inform pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor", "baseline") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) return input_files
[docs] def inform_recalib_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, # pylint: disable=unused-argument **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the inform pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor", "baseline") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) pdfs_dir = sink_dir pz_algorithms = project.get_pzalgorithms() for pz_algo_ in pz_algorithms.keys(): input_files[f"input_{pz_algo_}"] = os.path.join( pdfs_dir, f"output_estimate_{pz_algo_}.hdf5" ) return input_files
[docs] def inform_somlike_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, # pylint: disable=unused-argument **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the inform pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor", "baseline") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) return input_files
[docs] def estimate_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the estimate pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor", "baseline") local_input_tag = kwcopy.pop("input_tag", None) if local_input_tag: input_files["sink_dir"] = os.path.join(sink_dir, local_input_tag) else: input_files["sink_dir"] = sink_dir for key, val in input_file_tags.items(): input_file_flavor = kwargs.get("flavor", val.get("flavor", flavor)) input_tag = kwargs.get("input_tag", val["tag"]) input_files[key] = project.get_file_for_flavor( input_file_flavor, input_tag, **kwcopy, ) pz_algorithms = project.get_pzalgorithms() for pz_algo_ in pz_algorithms.keys(): input_files[f"model_{pz_algo_}"] = os.path.join( project.get_path("ceci_output_dir", flavor=input_file_flavor, **kwcopy), f"model_inform_{pz_algo_}.pkl", ) return input_files
[docs] def estimate_sompz_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the estimate pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor", "baseline") local_input_tag = kwcopy.pop("input_tag", None) if local_input_tag: input_files["sink_dir"] = os.path.join(sink_dir, local_input_tag) else: input_files["sink_dir"] = sink_dir for key, val in input_file_tags.items(): input_file_flavor = kwargs.get("flavor", val.get("flavor", flavor)) input_tag = kwargs.get("input_tag", val["tag"]) input_files[key] = project.get_file_for_flavor( input_file_flavor, input_tag, **kwcopy, ) for field_ in ["wide", "deep"]: input_files[f"{field_}_model"] = os.path.join( project.get_path("ceci_output_dir", flavor=input_file_flavor, **kwcopy), f"model_som_informer_{field_}.pkl", ) return input_files
[docs] def estimate_recalib_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the estimate pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor", "baseline") local_input_tag = kwcopy.pop("input_tag", None) if local_input_tag: input_files["sink_dir"] = os.path.join(sink_dir, local_input_tag) else: input_files["sink_dir"] = sink_dir for key, val in input_file_tags.items(): input_file_flavor = kwargs.get("flavor", val.get("flavor", flavor)) input_tag = kwargs.get("input_tag", val["tag"]) input_files[key] = project.get_file_for_flavor( input_file_flavor, input_tag, **kwcopy, ) pz_algorithms = project.get_pzalgorithms() ceci_dir = project.get_path("ceci_output_dir", flavor=input_file_flavor, **kwcopy) for pz_algo_ in pz_algorithms.keys(): input_files[f"input_{pz_algo_}"] = os.path.join( ceci_dir, f"output_estimate_{pz_algo_}.hdf5" ) for recalib_algo_ in ["pz_max_cell_p", "pz_mode"]: input_files[f"model_{pz_algo_}_{recalib_algo_}"] = os.path.join( ceci_dir, f"model_inform_{pz_algo_}_{recalib_algo_}.pkl", ) return input_files
[docs] def somlike_recalib_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the somlike recalib pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] kwcopy = kwargs.copy() flavor = kwcopy.pop("flavor", "baseline") models_dir = sink_dir pz_algorithms = project.get_pzalgorithms() for pz_algo_ in pz_algorithms.keys(): for field_ in ["deep", "wide"]: model_file = f"model_pz_informer_{pz_algo_}_{field_}.pkl" input_files[f"model_{pz_algo_}_{field_}"] = os.path.join( models_dir, model_file ) local_input_tag = kwcopy.pop("input_tag", None) if local_input_tag: input_files["sink_dir"] = os.path.join(sink_dir, local_input_tag) else: input_files["sink_dir"] = sink_dir for key, val in input_file_tags.items(): input_file_flavor = kwargs.get("flavor", val.get("flavor", flavor)) input_tag = kwargs.get("input_tag", val["tag"]) input_files[key] = project.get_file_for_flavor( input_file_flavor, input_tag, **kwcopy, ) return input_files
[docs] def evaluate_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the evalute pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor", "baseline") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) pdfs_dir = sink_dir pz_algorithms = project.get_pzalgorithms() for pz_algo_ in pz_algorithms.keys(): input_files[f"input_evaluate_{pz_algo_}"] = os.path.join( pdfs_dir, f"estimate_output_{pz_algo_}.hdf5" ) return input_files
[docs] def pz_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, # pylint: disable=unused-argument **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the pz pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) if input_file_flavor!=flavor: input_file_flavor=flavor input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], **kwargs ) return input_files
[docs] def tomography_input_callback( project: RailProject, pipeline_name: str, sink_dir: str, **kwargs: Any, ) -> dict[str, str]: """Make dict of input tags and paths for the tomography pipeline Parameters ---------- project: RailProject Object with project configuration pipeline_name: str Name of the pipeline to run sink_dir: str Path to output directory kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str]: Dictionary of input file tags and paths """ pipeline_info = project.get_pipeline(pipeline_name) input_files = {} input_file_tags = pipeline_info["input_file_templates"] flavor = kwargs.pop("flavor") selection = kwargs.get("selection") for key, val in input_file_tags.items(): input_file_flavor = val.get("flavor", flavor) input_files[key] = project.get_file_for_flavor( input_file_flavor, val["tag"], selection=selection ) pdfs_dir = sink_dir pz_algorithms = project.get_pzalgorithms() for pz_algo_ in pz_algorithms.keys(): input_files[f"input_{pz_algo_}"] = os.path.join( pdfs_dir, f"output_estimate_{pz_algo_}.hdf5" ) return input_files
[docs] def truth_to_observed_convert_commands( sink_dir: str, **_kwargs: Any ) -> list[list[str]]: convert_command = [ "tables-io", "convert", "--input", f"{sink_dir}/output_dereddener_errors.pq", "--output", f"{sink_dir}/output.hdf5", ] convert_commands = [convert_command] return convert_commands
[docs] def prepare_convert_commands(sink_dir: str, **_kwargs: Any) -> list[list[str]]: convert_command = [ "tables-io", "convert", "--input", f"{sink_dir}/output_deredden.pq", "--output", f"{sink_dir}/output.hdf5", ] convert_commands = [convert_command] return convert_commands
[docs] def photometric_errors_convert_commands( sink_dir: str, **_kwargs: Any ) -> list[list[str]]: convert_command = [ "tables-io", "convert", "--input", f"{sink_dir}/output_dereddener_errors.pq", "--output", f"{sink_dir}/output.hdf5", ] convert_commands = [convert_command] return convert_commands
[docs] def spectroscopic_selection_convert_commands( sink_dir: str, **kwargs: Any ) -> list[list[str]]: convert_commands = [] spec_selections = kwargs.get("spec_selections") assert isinstance(spec_selections, list) for spec_selection_ in spec_selections: convert_command = [ "tables-io", "convert", "--input", f"{sink_dir}/output_select_{spec_selection_}.pq", "--output", f"{sink_dir}/output_select_{spec_selection_}.hdf5", ] convert_commands.append(convert_command) return convert_commands
[docs] def blending_convert_commands(sink_dir: str, **_kwargs: Any) -> list[list[str]]: convert_command = [ "tables-io", "convert", "--input", f"{sink_dir}/output_blended.pq", "--output", f"{sink_dir}/output_blended.hdf5", ] convert_commands = [convert_command] return convert_commands
INPUT_CALLBACK_DICT = dict( inform=inform_input_callback, estimate=estimate_input_callback, evaluate=evaluate_input_callback, inform_sompz=inform_sompz_input_callback, estimate_sompz=estimate_sompz_input_callback, inform_recalib=inform_recalib_input_callback, estimate_recalib=estimate_recalib_input_callback, inform_somlike=inform_somlike_input_callback, somlike_recalib=somlike_recalib_input_callback, pz=pz_input_callback, tomography=tomography_input_callback, ) CATALOG_CONVERT_COMMANDS_DICT = dict( truth_to_observed=truth_to_observed_convert_commands, prepare=prepare_convert_commands, photometric_errors=photometric_errors_convert_commands, spec_selection=spectroscopic_selection_convert_commands, blending=blending_convert_commands, )
[docs] class RailPipelineTemplate(Configurable): """Simple class for holding a pipeline configuraiton""" config_options: dict[str, StageParameter] = dict( name=StageParameter(str, None, fmt="%s", required=True, msg="Pipeline name"), pipeline_class=StageParameter( str, None, fmt="%s", required=True, msg="Full class name for Pipeline", ), input_catalog_template=StageParameter( str, "", fmt="%s", msg="Template to use for input catalog", ), output_catalog_template=StageParameter( str, "", fmt="%s", msg="Template to use for output catalog", ), input_catalog_basename=StageParameter( str, "", fmt="%s", msg="Basename to use for input catalog", ), input_file_templates=StageParameter( dict, {}, fmt="%s", msg="Templates to use for input files", ), kwargs=StageParameter( dict, {}, fmt="%s", msg="Keywords to provide Pipeline c'tor", ), ) yaml_tag = "PipelineTemplate" def __init__(self, **kwargs: Any) -> None: """C'tor Parameters ---------- **kwargs: Any Configuration parameters for this RailPipelineTemplate, must match class.config_options data members """ Configurable.__init__(self, **kwargs) def __repr__(self) -> str: return f"{self.config.pipeline_class}"
[docs] def make_instance( self, project: RailProject, flavor: str, pipeline_overrides: dict[str, Any], ) -> RailPipelineInstance: kwargs_copy = self.config.kwargs.copy() if "kwargs" in pipeline_overrides: # pragma: no cover kwargs_update = pipeline_overrides.pop("kwargs", {}) kwargs_copy.update(**kwargs_update) path = project.get_path( "pipeline_path", pipeline=self.config.name, flavor=flavor ) return RailPipelineInstance( name=f"{self.config.name}_{flavor}", path=path, pipeline_template=self.config.name, flavor=flavor, kwargs=kwargs_copy, pipeline_overrides=pipeline_overrides, )
[docs] class RailPipelineInstance(Configurable): """Simple class for holding a pipeline configuraiton""" config_options: dict[str, StageParameter] = dict( name=StageParameter(str, None, fmt="%s", required=True, msg="Pipeline name"), path=StageParameter(str, None, fmt="%s", msg="Path to pipeline file"), pipeline_template=StageParameter( str, None, fmt="%s", required=True, msg="Name of PipelineTemplate to use", ), flavor=StageParameter(str, None, fmt="%s", msg="Pipeline flavor"), kwargs=StageParameter( dict, {}, fmt="%s", msg="Keywords to provide Pipeline c'tor", ), pipeline_overrides=StageParameter( dict, {}, fmt="%s", msg="Parameters to override from template", ), ) yaml_tag = "PipelineInstance" def __init__(self, **kwargs: Any): """C'tor Parameters ---------- kwargs: Any Configuration parameters for this RailPipelineInstance, must match class.config_options data members """ Configurable.__init__(self, **kwargs) def __repr__(self) -> str: return f"{self.config.pipeline_template} {self.config.path}"
[docs] def build( self, project: RailProject, ) -> int: output_dir = project.get_common_path("project_scratch_dir") pipeline_template = project.get_pipeline(self.config.pipeline_template) pipeline_class = pipeline_template.config.pipeline_class pipe_out_dir = os.path.dirname(self.config.path) try: os.makedirs(pipe_out_dir) except FileExistsError: pass pipeline_kwargs = self.config.kwargs.copy() if self.config.pipeline_overrides: # pragma: no cover copy_overrides = self.config.pipeline_overrides.copy() stages_config = os.path.join( pipe_out_dir, f"{self.config.name}_overrides.yml" ) kwarg_overrides = copy_overrides.pop("kwargs", {}) pipeline_kwargs.update(**kwarg_overrides) with open(stages_config, "w", encoding="utf-8") as fout: yaml.dump(copy_overrides, fout) else: stages_config = None for key, val in pipeline_kwargs.items(): if key == "selectors": temp_dict = project.get_spec_selections() elif key == "algorithms": temp_dict = project.get_pzalgorithms() elif key == "classifiers": temp_dict = project.get_classifiers() elif key == "summarizers": temp_dict = project.get_summarizers() elif key == "error_models": temp_dict = project.get_error_models() else: continue if "all" in val: pipeline_kwargs[key] = temp_dict else: pipeline_kwargs[key] = { algo_name_: temp_dict[algo_name_] for algo_name_ in val } catalog_tag = project.get_flavor(self.config.flavor).get("catalog_tag", None) if catalog_tag: try: catalog_utils.apply_defaults(catalog_tag) except KeyError: tokens = catalog_tag.split(".") module_name = ".".join(tokens[:-1]) class_name = tokens[-1] __import__(module_name) catalog_utils.CatalogConfigBase.apply_class(class_name) tokens = pipeline_class.split(".") module = ".".join(tokens[:-1]) class_name = tokens[-1] log_dir = f"{pipe_out_dir}/logs/{self.config.name}" print(f"Writing {self.config.path}") __import__(module) RailPipeline.build_and_write( class_name, self.config.path, None, stages_config, output_dir, log_dir, **pipeline_kwargs, ) return 0
[docs] def get_input_files( self, project: RailProject, **kwargs: Any, ) -> dict[str, str]: """Get the input files needed to run this instandce Parameters ---------- project: RailProject Object with project configuration **kwargs: Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- dict[str, str] Input files, keyed by label """ pipeline_name = self.config.pipeline_template sink_dir = project.get_path( "ceci_output_dir", flavor=self.config.flavor, **kwargs ) input_callback = INPUT_CALLBACK_DICT[pipeline_name] input_files = input_callback( project, pipeline_name, sink_dir, flavor=self.config.flavor, **kwargs ) input_files.setdefault("sink_dir", sink_dir) return input_files
[docs] def make_pipeline_single_input_command( self, project: RailProject, **kwargs: Any, ) -> list[str]: """Make the command to run a single instance Parameters ---------- project: RailProject Object with project configuration **kwargs: Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- list[str] Commands to run """ pipeline_path = self.config.path pipeline_config = pipeline_path.replace(".yaml", "_config.yml") input_files = self.get_input_files(project, **kwargs) sink_dir = input_files.pop("sink_dir") command_line = project.generate_ceci_command( pipeline_path=pipeline_path, config=pipeline_config, inputs=input_files, output_dir=sink_dir, log_dir=f"{sink_dir}/logs", ) return command_line
[docs] def make_pipeline_catalog_commands( self, project: RailProject, **kwargs: Any, ) -> list[tuple[list[list[str]], str]]: """Make the command to run a pipeline on an entire catalog Parameters ---------- project: RailProject Object with project configuration kwargs: Any Additional parameters to specify pipeline, e.g., flavor, selection, ... Returns ------- commands: list[tuple[list[list[str]], str]] Commands to run """ pipeline_name = self.config.pipeline_template pipeline_info = project.get_pipeline(pipeline_name) flavor = self.config.flavor pipeline_path = project.get_path( "pipeline_path", pipeline=pipeline_name, flavor=flavor, **kwargs ) catalog_convert_commands_function = CATALOG_CONVERT_COMMANDS_DICT[pipeline_name] source_catalog_files = project.get_catalog_files( pipeline_info.config.input_catalog_template, basename=pipeline_info.config.input_catalog_basename, flavor=self.config.flavor, **kwargs, ) sink_catalog_files = project.get_catalog_files( pipeline_info.config.output_catalog_template, basename="output.hdf5", flavor=self.config.flavor, **kwargs, ) all_commands: list[tuple[list[list[str]], str]] = [] selection = kwargs["selection"] for source_catalog, sink_catalog in zip( source_catalog_files, sink_catalog_files ): sink_dir = os.path.dirname(sink_catalog) script_path = os.path.join( sink_dir, f"submit_{pipeline_name}_{selection}_{flavor}.sh", ) ceci_commands = project.generate_ceci_command( pipeline_path=pipeline_path, config=pipeline_path.replace(".yaml", "_config.yml"), inputs=dict(input=source_catalog), output_dir=sink_dir, log_dir=sink_dir, ) convert_commands = catalog_convert_commands_function( sink_dir, **kwargs, ) iter_commands = [ ["mkdir", "-p", f"{sink_dir}"], ceci_commands, *convert_commands, ] all_commands.append((iter_commands, script_path)) return all_commands