From 7715acf0fb81286d3091b7b7210e6480ad5d1552 Mon Sep 17 00:00:00 2001 From: shreyaskommuri Date: Tue, 9 Jun 2026 09:01:32 -0700 Subject: [PATCH 1/2] Add machine-readable scenario summary report Generate cloudai-summary.json as a default scenario reporter so automation can discover scenario status, report artifacts, test run artifacts, and configured metrics without scraping workload-specific files. Issue: NVIDIA/cloudai#917 Tested: uv run ruff check src/cloudai/reporter.py src/cloudai/registration.py src/cloudai/_core/registry.py src/cloudai/core.py tests/test_reporter.py tests/test_init.py Tested: uv run ruff format --check src/cloudai/reporter.py src/cloudai/registration.py src/cloudai/_core/registry.py src/cloudai/core.py tests/test_reporter.py tests/test_init.py Tested: uv run pytest Signed-off-by: shreyaskommuri --- src/cloudai/_core/registry.py | 3 +- src/cloudai/core.py | 3 +- src/cloudai/registration.py | 3 +- src/cloudai/reporter.py | 96 ++++++++++++++++++++++++++++++++++- tests/test_init.py | 5 +- tests/test_reporter.py | 53 +++++++++++++++++-- 6 files changed, 154 insertions(+), 9 deletions(-) diff --git a/src/cloudai/_core/registry.py b/src/cloudai/_core/registry.py index 2e2adf6b7..c7ee157e7 100644 --- a/src/cloudai/_core/registry.py +++ b/src/cloudai/_core/registry.py @@ -229,7 +229,8 @@ def report_order(k: str) -> int: "per_test": 0, # first "status": 2, "dse": 3, - "tarball": 4, # last + "summary": 4, + "tarball": 5, # last }.get(k, 1) return sorted(self.scenario_reports.items(), key=lambda kv: report_order(kv[0])) diff --git a/src/cloudai/core.py b/src/cloudai/core.py index 752d24972..d94595ed8 100644 --- a/src/cloudai/core.py +++ b/src/cloudai/core.py @@ -54,7 +54,7 @@ from .configurator.grid_search import GridSearchAgent from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition from .parser import Parser -from .reporter import PerTestReporter, StatusReporter, TarballReporter +from .reporter import PerTestReporter, StatusReporter, SummaryReporter, TarballReporter from .test_parser import TestParser from .test_scenario_parser import TestScenarioParser @@ -96,6 +96,7 @@ "RewardOverrides", "Runner", "StatusReporter", + "SummaryReporter", "System", "SystemConfigParsingError", "TarballReporter", diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index e31bc7273..c341a986e 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -46,7 +46,7 @@ def register_all(): ) from cloudai.core import Registry from cloudai.models.scenario import ReportConfig - from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter + from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter # Import systems from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesRunner, KubernetesSystem @@ -316,6 +316,7 @@ def register_all(): Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("summary", SummaryReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) Registry().add_scenario_report( "nixl_bench_summary", diff --git a/src/cloudai/reporter.py b/src/cloudai/reporter.py index a897015c3..4e5620d43 100644 --- a/src/cloudai/reporter.py +++ b/src/cloudai/reporter.py @@ -15,11 +15,12 @@ # limitations under the License. import contextlib +import json import logging import tarfile from dataclasses import dataclass from pathlib import Path -from typing import Optional +from typing import Any, Optional import jinja2 import toml @@ -31,7 +32,7 @@ from cloudai.report_generator.util import load_system_metadata from cloudai.util.lazy_imports import lazy -from .core import CommandGenStrategy, Reporter, TestRun, case_name +from .core import METRIC_ERROR, CommandGenStrategy, Reporter, TestRun, case_name from .models.scenario import TestRunDetails @@ -207,6 +208,97 @@ def report_best_dse_config(self): toml.dump(trd.test_definition.model_dump(), f) +class SummaryReporter(Reporter): + """Generate a machine-readable scenario summary for automation.""" + + SUMMARY_FILE_NAME = "cloudai-summary.json" + + def generate(self) -> None: + self.load_test_runs() + report_path = self.results_root / self.SUMMARY_FILE_NAME + with report_path.open("w") as f: + json.dump(self.build_summary(), f, indent=2) + f.write("\n") + + logging.info("Generated scenario summary at %s", report_path) + + def build_summary(self) -> dict[str, Any]: + test_runs = [self._test_run_summary(tr) for tr in self.trs] + return { + "schema_version": "1.0", + "scenario": self.test_scenario.name, + "status": self._scenario_status(test_runs), + "system": { + "name": self.system.name, + "scheduler": self.system.scheduler, + }, + "result_dir": self._relative_path(self.results_root), + "reports": self._scenario_artifacts(), + "test_runs": test_runs, + } + + def _scenario_status(self, test_runs: list[dict[str, Any]]) -> str: + if not test_runs: + return "unknown" + if all(tr["status"] == "completed" for tr in test_runs): + return "completed" + return "failed" + + def _test_run_summary(self, tr: TestRun) -> dict[str, Any]: + status = tr.test.was_run_successful(tr) + summary = { + "name": tr.name, + "case": case_name(tr), + "description": tr.test.description, + "iteration": tr.current_iteration, + "step": tr.step, + "status": "completed" if status.is_successful else "failed", + "error_message": status.error_message, + "output_path": self._relative_path(tr.output_path), + "artifacts": self._artifacts(tr.output_path), + "metrics": self._metrics(tr), + } + return summary + + def _metrics(self, tr: TestRun) -> dict[str, float]: + metrics = {} + for metric in tr.test.agent_metrics: + value = tr.get_metric_value(self.system, metric) + if value is METRIC_ERROR: + continue + metrics[metric] = float(value) + + return metrics + + def _scenario_artifacts(self) -> list[dict[str, str]]: + if not self.results_root.is_dir(): + return [] + + return [ + self._artifact(path) + for path in sorted(self.results_root.iterdir()) + if path.is_file() and path.name != self.SUMMARY_FILE_NAME + ] + + def _artifacts(self, root: Path) -> list[dict[str, str]]: + if not root.is_dir(): + return [] + + return [self._artifact(path) for path in sorted(root.rglob("*")) if path.is_file()] + + def _artifact(self, path: Path) -> dict[str, str]: + return { + "path": self._relative_path(path), + "format": path.suffix.removeprefix(".") or "unknown", + } + + def _relative_path(self, path: Path) -> str: + try: + return str(path.relative_to(self.results_root)) + except ValueError: + return str(path) + + class TarballReporter(Reporter): """Creates tarballs of results for failed test runs.""" diff --git a/tests/test_init.py b/tests/test_init.py index 47b486110..b996d920d 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -16,7 +16,7 @@ from cloudai.core import Registry -from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter +from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesSystem from cloudai.systems.lsf import LSFInstaller, LSFSystem from cloudai.systems.runai import RunAISystem @@ -270,6 +270,7 @@ def test_scenario_reports(): "per_test", "status", "dse", + "summary", "tarball", "nixl_bench_summary", "nixl_ep_comparison", @@ -282,6 +283,7 @@ def test_scenario_reports(): PerTestReporter, StatusReporter, DSEReporter, + SummaryReporter, TarballReporter, NIXLBenchComparisonReport, NixlEPComparisonReport, @@ -298,6 +300,7 @@ def test_report_configs(): "per_test", "status", "dse", + "summary", "tarball", "nixl_bench_summary", "nixl_ep_comparison", diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 95acd8ac9..e6aed3624 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -16,6 +16,7 @@ import copy import csv +import json import tarfile from dataclasses import asdict from pathlib import Path @@ -29,7 +30,7 @@ from cloudai.core import CommandGenStrategy, Registry, Reporter, System from cloudai.models.scenario import ReportConfig, TestRunDetails from cloudai.report_generator.dse_report import build_dse_summaries -from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, TarballReporter +from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, SummaryReporter, TarballReporter from cloudai.systems.slurm.slurm_metadata import ( MetadataCUDA, MetadataMPI, @@ -339,11 +340,57 @@ def test_metadata_for_single_sbatch(self, slurm_system: SlurmSystem, slurm_metad def test_report_order() -> None: reports = Registry().ordered_scenario_reports() assert reports[0][0] == "per_test" - assert reports[-3][0] == "status" - assert reports[-2][0] == "dse" + assert reports[-4][0] == "status" + assert reports[-3][0] == "dse" + assert reports[-2][0] == "summary" assert reports[-1][0] == "tarball" +def test_summary_reporter_writes_machine_readable_summary( + slurm_system: SlurmSystem, + benchmark_tr: TestRun, +) -> None: + report_path = slurm_system.output_path / "test_scenario.html" + report_path.write_text("") + output_path = slurm_system.output_path / benchmark_tr.name / "0" + (output_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + (output_path / "cloudai_nccl_test_csv_report.csv").write_text("size,bw\n1,2\n") + + scenario = TestScenario(name="test_scenario", test_runs=[benchmark_tr]) + reporter = SummaryReporter(slurm_system, scenario, slurm_system.output_path, ReportConfig()) + reporter.generate() + + summary_path = slurm_system.output_path / SummaryReporter.SUMMARY_FILE_NAME + summary = json.loads(summary_path.read_text()) + + assert summary["schema_version"] == "1.0" + assert summary["scenario"] == "test_scenario" + assert summary["status"] == "failed" + assert summary["system"] == {"name": "test_system", "scheduler": "slurm"} + assert summary["result_dir"] == "." + assert summary["reports"] == [{"path": "test_scenario.html", "format": "html"}] + assert len(summary["test_runs"]) == 3 + + first_run = summary["test_runs"][0] + assert first_run["name"] == "benchmark" + assert first_run["case"] == "benchmark" + assert first_run["description"] == "NCCL test" + assert first_run["iteration"] == 0 + assert first_run["step"] == 0 + assert first_run["status"] == "completed" + assert first_run["error_message"] == "" + assert first_run["output_path"] == "benchmark/0" + assert first_run["artifacts"] == [ + {"path": "benchmark/0/cloudai_nccl_test_csv_report.csv", "format": "csv"}, + {"path": "benchmark/0/stdout.txt", "format": "txt"}, + ] + assert first_run["metrics"] == {} + + failed_run = summary["test_runs"][1] + assert failed_run["status"] == "failed" + assert "stdout.txt file not found" in failed_run["error_message"] + + def _write_slurm_job(step_dir: Path, elapsed_time_sec: int) -> None: metadata = SlurmJobMetadata( job_id=12345, From 62d2ef026ffccb4bfdf9dce0ef4814302665ba1d Mon Sep 17 00:00:00 2001 From: shreyaskommuri Date: Fri, 12 Jun 2026 11:10:20 -0700 Subject: [PATCH 2/2] Keep summary JSON focused on automation Trim the default scenario summary to the fields external tools need and group DSE/sweep outputs under their parent test run so nested sweep artifacts remain discoverable. Constraint: Reviewer requested a smaller JSON surface, a full-object regression assertion, and sweep-aware artifact reporting. Rejected: Keeping iteration, step, description, schema_version, and system metadata in the first summary contract | they widen the surface before consumers have asked for them. Confidence: high Scope-risk: narrow Directive: Treat cloudai-summary.json as an automation entry point; add fields only when they have clear consumer value. Tested: uv run ruff check src/cloudai/reporter.py tests/test_reporter.py tests/test_init.py src/cloudai/registration.py src/cloudai/_core/registry.py src/cloudai/core.py Tested: uv run ruff format --check src/cloudai/reporter.py tests/test_reporter.py tests/test_init.py src/cloudai/registration.py src/cloudai/_core/registry.py src/cloudai/core.py Tested: uv run pytest tests/test_reporter.py tests/test_init.py tests/test_registry.py -q Tested: uv run pytest -q Not-tested: Live CloudAI sweep run on an HPC system. --- src/cloudai/reporter.py | 62 ++++++++++++++++---- tests/test_reporter.py | 121 ++++++++++++++++++++++++++++++---------- 2 files changed, 141 insertions(+), 42 deletions(-) diff --git a/src/cloudai/reporter.py b/src/cloudai/reporter.py index 4e5620d43..56bc5d57b 100644 --- a/src/cloudai/reporter.py +++ b/src/cloudai/reporter.py @@ -223,15 +223,10 @@ def generate(self) -> None: logging.info("Generated scenario summary at %s", report_path) def build_summary(self) -> dict[str, Any]: - test_runs = [self._test_run_summary(tr) for tr in self.trs] + test_runs = self._test_runs_summary() return { - "schema_version": "1.0", "scenario": self.test_scenario.name, "status": self._scenario_status(test_runs), - "system": { - "name": self.system.name, - "scheduler": self.system.scheduler, - }, "result_dir": self._relative_path(self.results_root), "reports": self._scenario_artifacts(), "test_runs": test_runs, @@ -244,20 +239,46 @@ def _scenario_status(self, test_runs: list[dict[str, Any]]) -> str: return "completed" return "failed" + def _test_runs_summary(self) -> list[dict[str, Any]]: + loaded_by_name: dict[str, list[TestRun]] = {} + for tr in self.trs: + loaded_by_name.setdefault(tr.name, []).append(tr) + + summary: list[dict[str, Any]] = [] + for test_run in self.test_scenario.test_runs: + loaded_runs = loaded_by_name.get(test_run.name, []) + if test_run.is_dse_job: + summary.append(self._sweep_test_run_summary(test_run, loaded_runs)) + else: + summary.extend(self._test_run_summary(tr) for tr in loaded_runs) + + return summary + + def _sweep_test_run_summary(self, tr: TestRun, sweeps: list[TestRun]) -> dict[str, Any]: + sweep_summaries = [self._test_run_summary(sweep) for sweep in sweeps] + summary = { + "name": tr.name, + "status": self._scenario_status(sweep_summaries), + "output_path": self._relative_path(self.results_root / tr.name), + "artifacts": self._artifacts_excluding( + self.results_root / tr.name, [sweep.output_path for sweep in sweeps] + ), + "metrics": {}, + "sweeps": sweep_summaries, + } + return summary + def _test_run_summary(self, tr: TestRun) -> dict[str, Any]: status = tr.test.was_run_successful(tr) summary = { - "name": tr.name, - "case": case_name(tr), - "description": tr.test.description, - "iteration": tr.current_iteration, - "step": tr.step, + "name": case_name(tr), "status": "completed" if status.is_successful else "failed", - "error_message": status.error_message, "output_path": self._relative_path(tr.output_path), "artifacts": self._artifacts(tr.output_path), "metrics": self._metrics(tr), } + if status.error_message: + summary["error_message"] = status.error_message return summary def _metrics(self, tr: TestRun) -> dict[str, float]: @@ -286,6 +307,23 @@ def _artifacts(self, root: Path) -> list[dict[str, str]]: return [self._artifact(path) for path in sorted(root.rglob("*")) if path.is_file()] + def _artifacts_excluding(self, root: Path, excluded_roots: list[Path]) -> list[dict[str, str]]: + if not root.is_dir(): + return [] + + return [ + self._artifact(path) + for path in sorted(root.rglob("*")) + if path.is_file() and not any(self._is_relative_to(path, excluded_root) for excluded_root in excluded_roots) + ] + + def _is_relative_to(self, path: Path, root: Path) -> bool: + try: + path.relative_to(root) + except ValueError: + return False + return True + def _artifact(self, path: Path) -> dict[str, str]: return { "path": self._relative_path(path), diff --git a/tests/test_reporter.py b/tests/test_reporter.py index e6aed3624..f6514a105 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -352,43 +352,104 @@ def test_summary_reporter_writes_machine_readable_summary( ) -> None: report_path = slurm_system.output_path / "test_scenario.html" report_path.write_text("") - output_path = slurm_system.output_path / benchmark_tr.name / "0" - (output_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") - (output_path / "cloudai_nccl_test_csv_report.csv").write_text("size,bw\n1,2\n") - scenario = TestScenario(name="test_scenario", test_runs=[benchmark_tr]) + for iteration in range(benchmark_tr.iterations): + output_path = slurm_system.output_path / benchmark_tr.name / str(iteration) + (output_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + (slurm_system.output_path / benchmark_tr.name / "0" / "cloudai_nccl_test_csv_report.csv").write_text( + "size,bw\n1,2\n" + ) + + sweep_tr = TestRun( + name="sweep", + test=NCCLTestDefinition( + name="nccl", + description="NCCL sweep", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + extra_env_vars={"VAR1": ["value1", "value2"]}, + agent_steps=2, + ), + num_nodes=1, + nodes=["node1"], + ) + sweep_iteration = slurm_system.output_path / sweep_tr.name / "0" + sweep_iteration.mkdir(parents=True) + (sweep_iteration / "trajectory.csv").write_text("step,action,reward,observation\n") + for step in range(sweep_tr.test.agent_steps): + step_path = sweep_iteration / str(step) + step_path.mkdir() + (step_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + (step_path / "some-report.html").write_text("") + + scenario = TestScenario(name="test_scenario", test_runs=[benchmark_tr, sweep_tr]) reporter = SummaryReporter(slurm_system, scenario, slurm_system.output_path, ReportConfig()) reporter.generate() summary_path = slurm_system.output_path / SummaryReporter.SUMMARY_FILE_NAME summary = json.loads(summary_path.read_text()) - assert summary["schema_version"] == "1.0" - assert summary["scenario"] == "test_scenario" - assert summary["status"] == "failed" - assert summary["system"] == {"name": "test_system", "scheduler": "slurm"} - assert summary["result_dir"] == "." - assert summary["reports"] == [{"path": "test_scenario.html", "format": "html"}] - assert len(summary["test_runs"]) == 3 - - first_run = summary["test_runs"][0] - assert first_run["name"] == "benchmark" - assert first_run["case"] == "benchmark" - assert first_run["description"] == "NCCL test" - assert first_run["iteration"] == 0 - assert first_run["step"] == 0 - assert first_run["status"] == "completed" - assert first_run["error_message"] == "" - assert first_run["output_path"] == "benchmark/0" - assert first_run["artifacts"] == [ - {"path": "benchmark/0/cloudai_nccl_test_csv_report.csv", "format": "csv"}, - {"path": "benchmark/0/stdout.txt", "format": "txt"}, - ] - assert first_run["metrics"] == {} - - failed_run = summary["test_runs"][1] - assert failed_run["status"] == "failed" - assert "stdout.txt file not found" in failed_run["error_message"] + assert summary == { + "scenario": "test_scenario", + "status": "completed", + "result_dir": ".", + "reports": [{"path": "test_scenario.html", "format": "html"}], + "test_runs": [ + { + "name": "benchmark", + "status": "completed", + "output_path": "benchmark/0", + "artifacts": [ + {"path": "benchmark/0/cloudai_nccl_test_csv_report.csv", "format": "csv"}, + {"path": "benchmark/0/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + { + "name": "benchmark iter=1", + "status": "completed", + "output_path": "benchmark/1", + "artifacts": [{"path": "benchmark/1/stdout.txt", "format": "txt"}], + "metrics": {}, + }, + { + "name": "benchmark iter=2", + "status": "completed", + "output_path": "benchmark/2", + "artifacts": [{"path": "benchmark/2/stdout.txt", "format": "txt"}], + "metrics": {}, + }, + { + "name": "sweep", + "status": "completed", + "output_path": "sweep", + "artifacts": [{"path": "sweep/0/trajectory.csv", "format": "csv"}], + "metrics": {}, + "sweeps": [ + { + "name": "sweep", + "status": "completed", + "output_path": "sweep/0/0", + "artifacts": [ + {"path": "sweep/0/0/some-report.html", "format": "html"}, + {"path": "sweep/0/0/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + { + "name": "sweep step=1", + "status": "completed", + "output_path": "sweep/0/1", + "artifacts": [ + {"path": "sweep/0/1/some-report.html", "format": "html"}, + {"path": "sweep/0/1/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + ], + }, + ], + } def _write_slurm_job(step_dir: Path, elapsed_time_sec: int) -> None: