diff --git a/doc/changes/DM-53494.feature.rst b/doc/changes/DM-53494.feature.rst new file mode 100644 index 00000000..605b107f --- /dev/null +++ b/doc/changes/DM-53494.feature.rst @@ -0,0 +1 @@ +Added ability to run submit processes as batch jobs on same cluster with shared filesystems. diff --git a/doc/lsst.ctrl.bps/quickstart.rst b/doc/lsst.ctrl.bps/quickstart.rst index f83365b7..67a7bc81 100644 --- a/doc/lsst.ctrl.bps/quickstart.rst +++ b/doc/lsst.ctrl.bps/quickstart.rst @@ -1608,6 +1608,49 @@ Parsl). responsibility to remove them once no longer needed. The removal should be done regularly to avoid too many in single directory. +.. _bps_submit_as_batch: + +Submit Stages as Batch Jobs +--------------------------- + +.. warning:: + + This feature is part of the ongoing work for submission to remote sites. + So details may change. Also currently only the HTCondor plugin supports + this. + +In cases where one cannot run ``bps submit`` interactively (e.g., needs +too much memory), BPS can run the submit processes as batch jobs after +which the payload workflow will start running. + +The interactive submission process, ``bps batch-submit ``, +is much shorter. It will create a workflow with two jobs: + +- ``buildQuantumGraph`` which creates the quantum graph. +- ``preparePayloadWorkflow`` which does the rest of the submission stages seen + when running ``bps submit``. These include clustering, creation of the payload + workflow, and preparing the WMS-specific workflow. + +One can set runtime values specific to those jobs (e.g., ``requestMemory``) in +sections with corresponding names similar to ``finalJob``. Currently the +logging-related command-line arguments aren't passed from ``bps batch-submit`` +to these jobs. Instead, one can set ``bpsPreCommandOpts``, which has the +same default as the payload job. + +Even with the new jobs, there is only one output run collection, one submit +directory and one top level WMS ID to be used with BPS commands. + +``bps report`` will show these 2 new jobs same as the payload jobs. They +will be the only lines to appear in the report until the ``preparePayloadWorkflow`` +has finished at which time the expected payload lines should appear (from +``pipetaskInit`` through ``finalJob``). + +``bps cancel`` can be used to abort the run during these new jobs or later +during the running of the payload jobs. + +See the corresponding section in the WMS-plugin documentation for additional +information. + .. _bps-troubleshooting: Troubleshooting diff --git a/python/lsst/ctrl/bps/batch_submit.py b/python/lsst/ctrl/bps/batch_submit.py new file mode 100644 index 00000000..0b45b881 --- /dev/null +++ b/python/lsst/ctrl/bps/batch_submit.py @@ -0,0 +1,238 @@ +# This file is part of ctrl_bps. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Driver to run submit stages as batch jobs.""" + +__all__ = ["batch_payload_prepare", "create_batch_stages"] + +import logging +import os + +from lsst.resources import ResourcePath, ResourcePathExpression +from lsst.utils.logging import VERBOSE +from lsst.utils.timer import time_this, timeMethod + +from . import ( + DEFAULT_MEM_FMT, + DEFAULT_MEM_UNIT, + BpsConfig, + GenericWorkflow, + GenericWorkflowJob, + GenericWorkflowLazyGroup, +) +from .pre_transform import cluster_quanta, read_quantum_graph +from .prepare import prepare +from .transform import _get_job_values, transform + +_LOG = logging.getLogger(__name__) + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def create_batch_stages( + config: BpsConfig, prefix: ResourcePathExpression +) -> tuple[GenericWorkflow, BpsConfig]: + """Create a GenericWorkflow that performs the submit stages as a workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + BPS configuration. + prefix : `lsst.resources.ResourcePathExpression` + Root path for any output files. + + Returns + ------- + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` + The generic workflow transformed from the clustered quantum graph. + generic_workflow_config : `lsst.ctrl.bps.BpsConfig` + Configuration to accompany GenericWorkflow. + """ + prefix = ResourcePath(prefix) + generic_workflow: GenericWorkflow = GenericWorkflow(name=f"{config['uniqProcName']}_ctrl") + cmd_line_key = "jobCommand" + + # build QuantumGraph job + search_opt = {} + if "buildQuantumGraph" in config: + search_opt["searchobj"] = config.get("buildQuantumGraph") + build_job = GenericWorkflowJob( + name="buildQuantumGraph", + label="buildQuantumGraph", + ) + job_values = _get_job_values(config, search_opt, cmd_line_key) + if not job_values["executable"]: + raise RuntimeError( + f"Missing executable for buildQuantumGraph. Double check submit yaml for {cmd_line_key}" + ) + for key, value in job_values.items(): + if key not in {"name", "label"}: + setattr(build_job, key, value) + + generic_workflow.add_job(build_job) + generic_workflow.run_attrs.update( + { + "bps_isjob": "True", + "bps_project": config["project"], + "bps_campaign": config["campaign"], + "bps_run": generic_workflow.name, + "bps_operator": config["operator"], + "bps_payload": config["payloadName"], + "bps_runsite": config["computeSite"], + } + ) + + # cluster/transform/prepare job + search_opt = {} + if "preparePayloadWorkflow" in config: + search_opt["searchobj"] = config.get("preparePayloadWorkflow") + prepare_job = GenericWorkflowLazyGroup( + name="preparePayloadWorkflow", + label="preparePayloadWorkflow", + ) + job_values = _get_job_values(config, search_opt, cmd_line_key) + if not job_values["executable"]: + raise RuntimeError( + f"Missing executable for preparePayloadWorkflow. Double check submit yaml for {cmd_line_key}" + ) + for key, value in job_values.items(): + if key not in {"name", "label"}: + setattr(prepare_job, key, value) + + generic_workflow.add_job(prepare_job, parent_names=["buildQuantumGraph"]) + + _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) + if save_workflow: + with prefix.join("bps_stages_generic_workflow.pickle").open("wb") as outfh: + generic_workflow.save(outfh, "pickle") + _, save_dot = config.search("saveDot", opt={"default": False}) + if save_dot: + with prefix.join("bps_stages_generic_workflow.dot").open("w") as outfh: + generic_workflow.draw(outfh, "dot") + + return generic_workflow, config + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def batch_payload_prepare(config: BpsConfig, prefix: ResourcePathExpression) -> None: + """Create a GenericWorkflow that performs the submit stages as a workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + BPS configuration. + + prefix : `lsst.resources.ResourcePathExpression` + Root path for any output files. + + Returns + ------- + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` + The generic workflow transformed from the clustered quantum graph. + generic_workflow_config : `lsst.ctrl.bps.BpsConfig` + Configuration to accompany GenericWorkflow. + """ + prefix = ResourcePath(prefix) + # Read existing QuantumGraph + qgraph_filename = prefix.join(config["qgraphFileTemplate"]) + qgraph = read_quantum_graph(qgraph_filename) + config[".bps_defined.runQgraphFile"] = str(qgraph_filename) + + # Cluster + _LOG.info("Starting cluster stage (grouping quanta into jobs)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Cluster stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + clustered_qgraph = cluster_quanta(config, qgraph, config["uniqProcName"]) + + _LOG.info("ClusteredQuantumGraph contains %d cluster(s)", len(clustered_qgraph)) + + submit_path = config[".bps_defined.submitPath"] + _, save_clustered_qgraph = config.search("saveClusteredQgraph", opt={"default": False}) + if save_clustered_qgraph: + clustered_qgraph.save(os.path.join(submit_path, "bps_clustered_qgraph.pickle")) + _, save_dot = config.search("saveDot", opt={"default": False}) + if save_dot: + clustered_qgraph.draw(os.path.join(submit_path, "bps_clustered_qgraph.dot")) + + # Transform + _LOG.info("Starting transform stage (creating generic workflow)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Transform stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + generic_workflow, generic_workflow_config = transform(config, clustered_qgraph, submit_path) + _LOG.info("Generic workflow name '%s'", generic_workflow.name) + + num_jobs = sum(generic_workflow.job_counts.values()) + _LOG.info("GenericWorkflow contains %d job(s) (including final)", num_jobs) + + _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) + if save_workflow: + with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh: + generic_workflow.save(outfh, "pickle") + _, save_dot = config.search("saveDot", opt={"default": False}) + if save_dot: + with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh: + generic_workflow.draw(outfh, "dot") + + # Prepare + _LOG.info("Starting prepare stage (creating specific implementation of workflow)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Prepare stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path) + + # Add payload workflow to currently running workflow + _LOG.info("Starting update workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Workflow update completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + # Assuming submit_path for ctrl workflow is visible by this job. + wms_workflow.add_to_parent_workflow(generic_workflow_config, submit_path) diff --git a/python/lsst/ctrl/bps/cli/cmd/__init__.py b/python/lsst/ctrl/bps/cli/cmd/__init__.py index 49c97190..aedb5323 100644 --- a/python/lsst/ctrl/bps/cli/cmd/__init__.py +++ b/python/lsst/ctrl/bps/cli/cmd/__init__.py @@ -27,6 +27,9 @@ __all__ = [ "acquire", + "batch_acquire", + "batch_prepare", + "batch_submit", "cluster", "transform", "prepare", @@ -41,6 +44,9 @@ from .commands import ( acquire, + batch_acquire, + batch_prepare, + batch_submit, cancel, cluster, ping, diff --git a/python/lsst/ctrl/bps/cli/cmd/commands.py b/python/lsst/ctrl/bps/cli/cmd/commands.py index 46ffb32a..e412edf8 100644 --- a/python/lsst/ctrl/bps/cli/cmd/commands.py +++ b/python/lsst/ctrl/bps/cli/cmd/commands.py @@ -36,6 +36,9 @@ from ... import BpsSubprocessError from ...drivers import ( acquire_qgraph_driver, + batch_acquire_driver, + batch_prepare_driver, + batch_submit_driver, cancel_driver, cluster_qgraph_driver, ping_driver, @@ -219,3 +222,30 @@ def ping(*args, **kwargs): def submitcmd(*args, **kwargs): """Submit a command for execution.""" submitcmd_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_acquire(*args, **kwargs): + """Run inside a batch job to create a new quantum graph.""" + with catch_errors(): + batch_acquire_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_prepare(*args, **kwargs): + """Run payload workflow preparation inside a batch job.""" + with catch_errors(): + batch_prepare_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_submit(*args, **kwargs): + """Submit a workflow with preparation inside a batch jobs too.""" + with catch_errors(): + batch_submit_driver(*args, **kwargs) diff --git a/python/lsst/ctrl/bps/drivers.py b/python/lsst/ctrl/bps/drivers.py index 8debefdc..1b92e45a 100644 --- a/python/lsst/ctrl/bps/drivers.py +++ b/python/lsst/ctrl/bps/drivers.py @@ -33,6 +33,9 @@ __all__ = [ "acquire_qgraph_driver", + "batch_acquire_driver", + "batch_prepare_driver", + "batch_submit_driver", "cancel_driver", "cluster_qgraph_driver", "ping_driver", @@ -49,12 +52,22 @@ import logging import os from pathlib import Path +from typing import Any from lsst.pipe.base.quantum_graph import PredictedQuantumGraph from lsst.utils.timer import time_this from lsst.utils.usage import get_peak_mem_usage -from . import BPS_DEFAULTS, BPS_SEARCH_ORDER, DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT, BpsConfig +from . import ( + BPS_DEFAULTS, + BPS_SEARCH_ORDER, + DEFAULT_MEM_FMT, + DEFAULT_MEM_UNIT, + BpsConfig, + ClusteredQuantumGraph, + GenericWorkflow, +) +from .batch_submit import batch_payload_prepare, create_batch_stages from .bps_reports import compile_code_summary, compile_job_summary from .bps_utils import _dump_env_info, _dump_pkg_info, _make_id_link from .cancel import cancel @@ -67,7 +80,7 @@ submit_path_validator, ) from .ping import ping -from .pre_transform import acquire_quantum_graph, cluster_quanta +from .pre_transform import acquire_quantum_graph, cluster_quanta, read_quantum_graph from .prepare import prepare from .report import display_report, retrieve_report from .restart import restart @@ -142,14 +155,16 @@ def acquire_qgraph_driver(config_file: str, **kwargs) -> tuple[BpsConfig, Predic mem_unit=DEFAULT_MEM_UNIT, mem_fmt=DEFAULT_MEM_FMT, ): - qgraph_file, qgraph = acquire_quantum_graph(config, out_prefix=submit_path) + qgraph_file = acquire_quantum_graph(config, out_prefix=submit_path) + qgraph = read_quantum_graph(qgraph_file) + _log_mem_usage() config[".bps_defined.runQgraphFile"] = qgraph_file return config, qgraph -def cluster_qgraph_driver(config_file, **kwargs): +def cluster_qgraph_driver(config_file: str, **kwargs: Any) -> tuple[BpsConfig, ClusteredQuantumGraph]: """Group quanta into clusters. Parameters @@ -193,7 +208,7 @@ def cluster_qgraph_driver(config_file, **kwargs): return config, clustered_qgraph -def transform_driver(config_file, **kwargs): +def transform_driver(config_file: str, **kwargs: Any) -> tuple[BpsConfig, GenericWorkflow]: """Create a workflow for a specific workflow management system. Parameters @@ -207,7 +222,7 @@ def transform_driver(config_file, **kwargs): ------- generic_workflow_config : `lsst.ctrl.bps.BpsConfig` Configuration to use when creating the workflow. - generic_workflow : `lsst.ctrl.bps.BaseWmsWorkflow` + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` Representation of the abstract/scientific workflow specific to a given workflow management system. """ @@ -687,3 +702,135 @@ def _log_mem_usage() -> None: "Peak memory usage for bps process %s (main), %s (largest child process)", *tuple(f"{val.to(DEFAULT_MEM_UNIT):{DEFAULT_MEM_FMT}}" for val in get_peak_mem_usage()), ) + + +def batch_acquire_driver(config_file: str, **kwargs: Any) -> None: + """Create a quantum graph from pipeline definition in a batch job. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + config = BpsConfig(config_file) + submit_path = config[".bps_defined.submitPath"] + + _LOG.info("Starting acquire stage (generating and/or reading quantum graph)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Acquire stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + qgraph_file = acquire_quantum_graph(config, out_prefix=submit_path) + _log_mem_usage() + + config[".bps_defined.runQgraphFile"] = qgraph_file + + +def batch_prepare_driver(config_file: str, **kwargs: Any) -> None: + """Run workflow preparation in a batch job for an existing QuantumGraph. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + config = BpsConfig(config_file) + submit_path = config[".bps_defined.submitPath"] + + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Batch preparation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + batch_payload_prepare(config, prefix=submit_path) + _log_mem_usage() + + +def batch_submit_driver(config_file: str, **kwargs: Any) -> None: + """Submit a workflow for execution with preparation done in batch jobs. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + kwargs.setdefault("runWmsSubmissionChecks", True) + + _LOG.info( + "DISCLAIMER: All values regarding memory consumption reported below are approximate and may " + "not accurately reflect actual memory usage by the bps process." + ) + + config = _init_submission_driver(config_file, **kwargs) + submit_path = config[".bps_defined.submitPath"] + + _LOG.info("Starting batch submission") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Batch submission completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + _LOG.info("Starting to create control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Creation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + generic_workflow, config = create_batch_stages(config, submit_path) + + _LOG.info("Starting to prepare control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Preparation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + wms_workflow = prepare(config, generic_workflow, submit_path) + + if kwargs.get("dry_run", False): + return + + _LOG.info("Starting to submit control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Submission completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + submit(config, wms_workflow, **kwargs) + _LOG.info("Run '%s' submitted for execution with id '%s'", wms_workflow.name, wms_workflow.run_id) + _log_mem_usage() + + _make_id_link(config, wms_workflow.run_id) + + print(f"Run Id: {wms_workflow.run_id}") + print(f"Run Name: {wms_workflow.name}") diff --git a/python/lsst/ctrl/bps/etc/bps_defaults.yaml b/python/lsst/ctrl/bps/etc/bps_defaults.yaml index 7b1e39a0..a38fe32d 100644 --- a/python/lsst/ctrl/bps/etc/bps_defaults.yaml +++ b/python/lsst/ctrl/bps/etc/bps_defaults.yaml @@ -150,3 +150,11 @@ memoryLimit: 491520 # Default values for making id soft link to submit directory makeIdLink: False idLinkPath: "${PWD}/bps_links" + + +# Running submit stages as batch jobs +bpsPreCommandOpts: "{defaultPreCmdOpts}" +buildQuantumGraph: + jobCommand: "${CTRL_BPS_DIR}/bin/bps {bpsPreCommandOpts} batch-acquire {configFile}" +preparePayloadWorkflow: + jobCommand: "${CTRL_BPS_DIR}/bin/bps {bpsPreCommandOpts} batch-prepare {configFile}" diff --git a/python/lsst/ctrl/bps/generic_workflow.py b/python/lsst/ctrl/bps/generic_workflow.py index 65889d95..3539cf0d 100644 --- a/python/lsst/ctrl/bps/generic_workflow.py +++ b/python/lsst/ctrl/bps/generic_workflow.py @@ -33,6 +33,7 @@ "GenericWorkflowFile", "GenericWorkflowGroup", "GenericWorkflowJob", + "GenericWorkflowLazyGroup", "GenericWorkflowNode", "GenericWorkflowNodeType", "GenericWorkflowNoopJob", @@ -127,6 +128,9 @@ class GenericWorkflowNodeType(IntEnum): GROUP = auto() """A special group (subdag) of jobs.""" + LAZY_GROUP = auto() + """When run will generate sub-workflow of jobs.""" + @dataclasses.dataclass(slots=True) class GenericWorkflowNode: @@ -474,7 +478,7 @@ def add_job( super().add_node(job.name, job=job) self.add_job_relationships(parent_names, job.name) self.add_job_relationships(job.name, child_names) - if job.node_type == GenericWorkflowNodeType.PAYLOAD: + if job.node_type in [GenericWorkflowNodeType.PAYLOAD, GenericWorkflowNodeType.LAZY_GROUP]: job = cast(GenericWorkflowJob, job) self.add_executable(job.executable) self._job_labels.add_job( @@ -1335,6 +1339,18 @@ def __init__(self, name: str, label: str, blocking: bool = False) -> None: self.blocking = blocking +@dataclasses.dataclass(slots=True) +class GenericWorkflowLazyGroup(GenericWorkflowJob): + """Node representing a group of jobs to be generated when run.""" + + # Docstring inherited. + + @property + def node_type(self) -> GenericWorkflowNodeType: + """Indicate this is a lazy group job.""" + return GenericWorkflowNodeType.LAZY_GROUP + + class GenericWorkflowLabels: """Label-oriented representation of the GenericWorkflowJobs.""" diff --git a/python/lsst/ctrl/bps/initialize.py b/python/lsst/ctrl/bps/initialize.py index 166c9ac3..f4d466c9 100644 --- a/python/lsst/ctrl/bps/initialize.py +++ b/python/lsst/ctrl/bps/initialize.py @@ -140,7 +140,10 @@ def init_submission( # save copy of configs (orig and expanded config) shutil.copy2(config_file, submit_path) - with open(f"{submit_path}/{config['uniqProcName']}_config.yaml", "w") as fh: + expanded_config_file = f"{submit_path}/{config['uniqProcName']}_config.yaml" + config[".bps_defined.configFile"] = expanded_config_file + + with open(expanded_config_file, "w") as fh: config.dump(fh) # Dump information about runtime environment and software versions in use. diff --git a/python/lsst/ctrl/bps/pre_transform.py b/python/lsst/ctrl/bps/pre_transform.py index e4cef200..c712c9ba 100644 --- a/python/lsst/ctrl/bps/pre_transform.py +++ b/python/lsst/ctrl/bps/pre_transform.py @@ -41,7 +41,7 @@ from lsst.pipe.base import QuantumGraph from lsst.pipe.base.pipeline_graph import TaskImportMode from lsst.pipe.base.quantum_graph import PredictedQuantumGraph -from lsst.resources import ResourcePath +from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils import doImport from lsst.utils.logging import VERBOSE from lsst.utils.timer import time_this, timeMethod @@ -50,7 +50,7 @@ @timeMethod(logger=_LOG, logLevel=VERBOSE) -def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, PredictedQuantumGraph]: +def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> str: """Read a quantum graph from a file or create one from scratch. Parameters @@ -64,10 +64,7 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, Returns ------- qgraph_filename : `str` - Name of file containing QuantumGraph that was read into qgraph. - qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` - A quantum graph read in from pre-generated file or one that is the - result of running code that generates it. + Name of file containing the quantum graph. """ # Check to see if user provided pre-generated QuantumGraph. found, input_qgraph_filename = config.search("qgraphFile") @@ -91,6 +88,24 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating quantum graph"): qgraph_filename = create_quantum_graph(config, out_prefix) + return qgraph_filename + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def read_quantum_graph(qgraph_filename: ResourcePathExpression) -> PredictedQuantumGraph: + """Read a quantum graph from a file. + + Parameters + ---------- + qgraph_filename : `lsst.resources.ResourcePathExpression` + Name of file containing PredictedQuantumGraph to be read. + + Returns + ------- + qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` + A quantum graph read in from pre-generated file or one that is the + result of running code that generates it. + """ _LOG.info("Reading quantum graph from '%s'", qgraph_filename) with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed reading quantum graph"): qgraph_path = ResourcePath(qgraph_filename) @@ -102,7 +117,7 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, qgraph = PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(qgraph_path)) else: raise ValueError(f"Unrecognized extension for quantum graph file: {qgraph_filename}.") - return qgraph_filename, qgraph + return qgraph def execute(command: str, filename: str, write_buffering: int = 1) -> int: diff --git a/python/lsst/ctrl/bps/wms_service.py b/python/lsst/ctrl/bps/wms_service.py index 7665526d..94156f28 100644 --- a/python/lsst/ctrl/bps/wms_service.py +++ b/python/lsst/ctrl/bps/wms_service.py @@ -43,6 +43,8 @@ from enum import Enum from typing import Any +from . import BpsConfig + _LOG = logging.getLogger(__name__) @@ -576,3 +578,17 @@ def write(self, out_prefix): as well as internal WMS files. """ raise NotImplementedError + + @abstractmethod + def add_to_parent_workflow(self, config: BpsConfig, submit_path: str): + """Add self to parent workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + Configuration. + submit_path : `str` + Root directory to be used for WMS workflow inputs and outputs + as well as internal WMS files. + """ + raise NotImplementedError