diff --git a/src/cloudai/_core/registry.py b/src/cloudai/_core/registry.py index 2e2adf6b7..c7ee157e7 100644 --- a/src/cloudai/_core/registry.py +++ b/src/cloudai/_core/registry.py @@ -229,7 +229,8 @@ def report_order(k: str) -> int: "per_test": 0, # first "status": 2, "dse": 3, - "tarball": 4, # last + "summary": 4, + "tarball": 5, # last }.get(k, 1) return sorted(self.scenario_reports.items(), key=lambda kv: report_order(kv[0])) diff --git a/src/cloudai/core.py b/src/cloudai/core.py index 752d24972..d94595ed8 100644 --- a/src/cloudai/core.py +++ b/src/cloudai/core.py @@ -54,7 +54,7 @@ from .configurator.grid_search import GridSearchAgent from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition from .parser import Parser -from .reporter import PerTestReporter, StatusReporter, TarballReporter +from .reporter import PerTestReporter, StatusReporter, SummaryReporter, TarballReporter from .test_parser import TestParser from .test_scenario_parser import TestScenarioParser @@ -96,6 +96,7 @@ "RewardOverrides", "Runner", "StatusReporter", + "SummaryReporter", "System", "SystemConfigParsingError", "TarballReporter", diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index a6a7d737a..accef86cf 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -46,7 +46,7 @@ def register_all(): ) from cloudai.core import Registry from cloudai.models.scenario import ReportConfig - from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter + from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter # Import systems from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesRunner, KubernetesSystem @@ -338,6 +338,7 @@ def register_all(): ) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("summary", SummaryReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) Registry().add_scenario_report( "nixl_bench_summary", diff --git a/src/cloudai/reporter.py b/src/cloudai/reporter.py index a897015c3..56bc5d57b 100644 --- a/src/cloudai/reporter.py +++ b/src/cloudai/reporter.py @@ -15,11 +15,12 @@ # limitations under the License. import contextlib +import json import logging import tarfile from dataclasses import dataclass from pathlib import Path -from typing import Optional +from typing import Any, Optional import jinja2 import toml @@ -31,7 +32,7 @@ from cloudai.report_generator.util import load_system_metadata from cloudai.util.lazy_imports import lazy -from .core import CommandGenStrategy, Reporter, TestRun, case_name +from .core import METRIC_ERROR, CommandGenStrategy, Reporter, TestRun, case_name from .models.scenario import TestRunDetails @@ -207,6 +208,135 @@ def report_best_dse_config(self): toml.dump(trd.test_definition.model_dump(), f) +class SummaryReporter(Reporter): + """Generate a machine-readable scenario summary for automation.""" + + SUMMARY_FILE_NAME = "cloudai-summary.json" + + def generate(self) -> None: + self.load_test_runs() + report_path = self.results_root / self.SUMMARY_FILE_NAME + with report_path.open("w") as f: + json.dump(self.build_summary(), f, indent=2) + f.write("\n") + + logging.info("Generated scenario summary at %s", report_path) + + def build_summary(self) -> dict[str, Any]: + test_runs = self._test_runs_summary() + return { + "scenario": self.test_scenario.name, + "status": self._scenario_status(test_runs), + "result_dir": self._relative_path(self.results_root), + "reports": self._scenario_artifacts(), + "test_runs": test_runs, + } + + def _scenario_status(self, test_runs: list[dict[str, Any]]) -> str: + if not test_runs: + return "unknown" + if all(tr["status"] == "completed" for tr in test_runs): + return "completed" + return "failed" + + def _test_runs_summary(self) -> list[dict[str, Any]]: + loaded_by_name: dict[str, list[TestRun]] = {} + for tr in self.trs: + loaded_by_name.setdefault(tr.name, []).append(tr) + + summary: list[dict[str, Any]] = [] + for test_run in self.test_scenario.test_runs: + loaded_runs = loaded_by_name.get(test_run.name, []) + if test_run.is_dse_job: + summary.append(self._sweep_test_run_summary(test_run, loaded_runs)) + else: + summary.extend(self._test_run_summary(tr) for tr in loaded_runs) + + return summary + + def _sweep_test_run_summary(self, tr: TestRun, sweeps: list[TestRun]) -> dict[str, Any]: + sweep_summaries = [self._test_run_summary(sweep) for sweep in sweeps] + summary = { + "name": tr.name, + "status": self._scenario_status(sweep_summaries), + "output_path": self._relative_path(self.results_root / tr.name), + "artifacts": self._artifacts_excluding( + self.results_root / tr.name, [sweep.output_path for sweep in sweeps] + ), + "metrics": {}, + "sweeps": sweep_summaries, + } + return summary + + def _test_run_summary(self, tr: TestRun) -> dict[str, Any]: + status = tr.test.was_run_successful(tr) + summary = { + "name": case_name(tr), + "status": "completed" if status.is_successful else "failed", + "output_path": self._relative_path(tr.output_path), + "artifacts": self._artifacts(tr.output_path), + "metrics": self._metrics(tr), + } + if status.error_message: + summary["error_message"] = status.error_message + return summary + + def _metrics(self, tr: TestRun) -> dict[str, float]: + metrics = {} + for metric in tr.test.agent_metrics: + value = tr.get_metric_value(self.system, metric) + if value is METRIC_ERROR: + continue + metrics[metric] = float(value) + + return metrics + + def _scenario_artifacts(self) -> list[dict[str, str]]: + if not self.results_root.is_dir(): + return [] + + return [ + self._artifact(path) + for path in sorted(self.results_root.iterdir()) + if path.is_file() and path.name != self.SUMMARY_FILE_NAME + ] + + def _artifacts(self, root: Path) -> list[dict[str, str]]: + if not root.is_dir(): + return [] + + return [self._artifact(path) for path in sorted(root.rglob("*")) if path.is_file()] + + def _artifacts_excluding(self, root: Path, excluded_roots: list[Path]) -> list[dict[str, str]]: + if not root.is_dir(): + return [] + + return [ + self._artifact(path) + for path in sorted(root.rglob("*")) + if path.is_file() and not any(self._is_relative_to(path, excluded_root) for excluded_root in excluded_roots) + ] + + def _is_relative_to(self, path: Path, root: Path) -> bool: + try: + path.relative_to(root) + except ValueError: + return False + return True + + def _artifact(self, path: Path) -> dict[str, str]: + return { + "path": self._relative_path(path), + "format": path.suffix.removeprefix(".") or "unknown", + } + + def _relative_path(self, path: Path) -> str: + try: + return str(path.relative_to(self.results_root)) + except ValueError: + return str(path) + + class TarballReporter(Reporter): """Creates tarballs of results for failed test runs.""" diff --git a/tests/test_init.py b/tests/test_init.py index bd12b0d41..86553487b 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -16,7 +16,7 @@ from cloudai.core import Registry -from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter +from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesSystem from cloudai.systems.lsf import LSFInstaller, LSFSystem from cloudai.systems.runai import RunAISystem @@ -282,6 +282,7 @@ def test_scenario_reports(): "moe_benchmark_throughput", "status", "dse", + "summary", "tarball", "nixl_bench_summary", "nixl_ep_comparison", @@ -295,6 +296,7 @@ def test_scenario_reports(): MoEBenchmarkThroughputReporter, StatusReporter, DSEReporter, + SummaryReporter, TarballReporter, NIXLBenchComparisonReport, NixlEPComparisonReport, @@ -312,6 +314,7 @@ def test_report_configs(): "moe_benchmark_throughput", "status", "dse", + "summary", "tarball", "nixl_bench_summary", "nixl_ep_comparison", diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 95acd8ac9..f6514a105 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -16,6 +16,7 @@ import copy import csv +import json import tarfile from dataclasses import asdict from pathlib import Path @@ -29,7 +30,7 @@ from cloudai.core import CommandGenStrategy, Registry, Reporter, System from cloudai.models.scenario import ReportConfig, TestRunDetails from cloudai.report_generator.dse_report import build_dse_summaries -from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, TarballReporter +from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, SummaryReporter, TarballReporter from cloudai.systems.slurm.slurm_metadata import ( MetadataCUDA, MetadataMPI, @@ -339,11 +340,118 @@ def test_metadata_for_single_sbatch(self, slurm_system: SlurmSystem, slurm_metad def test_report_order() -> None: reports = Registry().ordered_scenario_reports() assert reports[0][0] == "per_test" - assert reports[-3][0] == "status" - assert reports[-2][0] == "dse" + assert reports[-4][0] == "status" + assert reports[-3][0] == "dse" + assert reports[-2][0] == "summary" assert reports[-1][0] == "tarball" +def test_summary_reporter_writes_machine_readable_summary( + slurm_system: SlurmSystem, + benchmark_tr: TestRun, +) -> None: + report_path = slurm_system.output_path / "test_scenario.html" + report_path.write_text("") + + for iteration in range(benchmark_tr.iterations): + output_path = slurm_system.output_path / benchmark_tr.name / str(iteration) + (output_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + (slurm_system.output_path / benchmark_tr.name / "0" / "cloudai_nccl_test_csv_report.csv").write_text( + "size,bw\n1,2\n" + ) + + sweep_tr = TestRun( + name="sweep", + test=NCCLTestDefinition( + name="nccl", + description="NCCL sweep", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + extra_env_vars={"VAR1": ["value1", "value2"]}, + agent_steps=2, + ), + num_nodes=1, + nodes=["node1"], + ) + sweep_iteration = slurm_system.output_path / sweep_tr.name / "0" + sweep_iteration.mkdir(parents=True) + (sweep_iteration / "trajectory.csv").write_text("step,action,reward,observation\n") + for step in range(sweep_tr.test.agent_steps): + step_path = sweep_iteration / str(step) + step_path.mkdir() + (step_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + (step_path / "some-report.html").write_text("") + + scenario = TestScenario(name="test_scenario", test_runs=[benchmark_tr, sweep_tr]) + reporter = SummaryReporter(slurm_system, scenario, slurm_system.output_path, ReportConfig()) + reporter.generate() + + summary_path = slurm_system.output_path / SummaryReporter.SUMMARY_FILE_NAME + summary = json.loads(summary_path.read_text()) + + assert summary == { + "scenario": "test_scenario", + "status": "completed", + "result_dir": ".", + "reports": [{"path": "test_scenario.html", "format": "html"}], + "test_runs": [ + { + "name": "benchmark", + "status": "completed", + "output_path": "benchmark/0", + "artifacts": [ + {"path": "benchmark/0/cloudai_nccl_test_csv_report.csv", "format": "csv"}, + {"path": "benchmark/0/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + { + "name": "benchmark iter=1", + "status": "completed", + "output_path": "benchmark/1", + "artifacts": [{"path": "benchmark/1/stdout.txt", "format": "txt"}], + "metrics": {}, + }, + { + "name": "benchmark iter=2", + "status": "completed", + "output_path": "benchmark/2", + "artifacts": [{"path": "benchmark/2/stdout.txt", "format": "txt"}], + "metrics": {}, + }, + { + "name": "sweep", + "status": "completed", + "output_path": "sweep", + "artifacts": [{"path": "sweep/0/trajectory.csv", "format": "csv"}], + "metrics": {}, + "sweeps": [ + { + "name": "sweep", + "status": "completed", + "output_path": "sweep/0/0", + "artifacts": [ + {"path": "sweep/0/0/some-report.html", "format": "html"}, + {"path": "sweep/0/0/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + { + "name": "sweep step=1", + "status": "completed", + "output_path": "sweep/0/1", + "artifacts": [ + {"path": "sweep/0/1/some-report.html", "format": "html"}, + {"path": "sweep/0/1/stdout.txt", "format": "txt"}, + ], + "metrics": {}, + }, + ], + }, + ], + } + + def _write_slurm_job(step_dir: Path, elapsed_time_sec: int) -> None: metadata = SlurmJobMetadata( job_id=12345,