diff --git a/src/cloudai/models/workload.py b/src/cloudai/models/workload.py index 863250e2c..c976f7f83 100644 --- a/src/cloudai/models/workload.py +++ b/src/cloudai/models/workload.py @@ -89,6 +89,15 @@ def __hash__(self) -> int: return self.git_repo.__hash__() +class TrainingReportConfig(BaseModel): + """Training-report aggregation window: steps excluded before computing per-metric stats.""" + + model_config = ConfigDict(extra="forbid") + + exclude_start_steps: int = Field(default=5, ge=0) + exclude_post_profiling_steps: int = Field(default=2, ge=0) + + class TestDefinition(BaseModel, ABC): """Base Test object.""" @@ -106,6 +115,7 @@ class TestDefinition(BaseModel, ABC): git_repos: list[GitRepo] = [] nsys: Optional[NsysConfiguration] = None predictor: Optional[PredictorConfig] = None + training_report: Optional[TrainingReportConfig] = None agent: str = "grid_search" agent_steps: int = 1 diff --git a/src/cloudai/report_generator/training/mappings.py b/src/cloudai/report_generator/training/mappings.py index f554ac493..2588d5c29 100644 --- a/src/cloudai/report_generator/training/mappings.py +++ b/src/cloudai/report_generator/training/mappings.py @@ -52,8 +52,9 @@ } -# (world_size, num_nodes, model_name) and the computed data_parallel_size are not mapped here. -NEMO_CONFIG: dict[str, str] = { +# Framework's resolved config artifact. (world_size, num_nodes, model_name) and computed data_parallel_size are not +# mapped here. +NEMO_MODEL_CONFIG: dict[str, str] = { "micro_batch_size": "data.micro_batch_size", "global_batch_size": "data.global_batch_size", "seq_length": "data.seq_length", @@ -77,7 +78,7 @@ "moe_grouped_gemm": "model.moe_grouped_gemm", } -MEGATRON_CONFIG: dict[str, str] = { +MEGATRON_MODEL_CONFIG: dict[str, str] = { "micro_batch_size": "micro_batch_size", "global_batch_size": "global_batch_size", "seq_length": "seq_length", @@ -101,7 +102,7 @@ "moe_grouped_gemm": "moe_grouped_gemm", } -MEGATRON_BRIDGE_CONFIG: dict[str, str] = { +MEGATRON_BRIDGE_MODEL_CONFIG: dict[str, str] = { "micro_batch_size": "train.micro_batch_size", "global_batch_size": "train.global_batch_size", "seq_length": "model.seq_length", @@ -124,3 +125,29 @@ "moe_ffn_hidden_size": "model.moe_ffn_hidden_size", "moe_grouped_gemm": "model.moe_grouped_gemm", } + + +# CloudAI TestDefinition (user TOML + defaults). TrainingConfig field -> dotted path in TestDefinition.model_dump(). +NEMO_TEST_CONFIG: dict[str, str] = { + "profiling_enabled": "nsys.enable", + "profiling_start_step": "extra_cmd_args.*start_step", + "profiling_stop_step": "extra_cmd_args.*end_step", + "exclude_start_steps": "training_report.exclude_start_steps", + "exclude_post_profiling_steps": "training_report.exclude_post_profiling_steps", +} + +MEGATRON_TEST_CONFIG: dict[str, str] = { + "profiling_enabled": "nsys.enable", + "profiling_start_step": "cmd_args.profile_step_start", + "profiling_stop_step": "cmd_args.profile_step_end", + "exclude_start_steps": "training_report.exclude_start_steps", + "exclude_post_profiling_steps": "training_report.exclude_post_profiling_steps", +} + +MEGATRON_BRIDGE_TEST_CONFIG: dict[str, str] = { + "profiling_enabled": "cmd_args.enable_nsys", + "profiling_start_step": "cmd_args.profiling_start_step", + "profiling_stop_step": "cmd_args.profiling_stop_step", + "exclude_start_steps": "training_report.exclude_start_steps", + "exclude_post_profiling_steps": "training_report.exclude_post_profiling_steps", +} diff --git a/src/cloudai/report_generator/training/models.py b/src/cloudai/report_generator/training/models.py index 5877c0401..43b9d64da 100644 --- a/src/cloudai/report_generator/training/models.py +++ b/src/cloudai/report_generator/training/models.py @@ -16,11 +16,43 @@ """Data models for training parsers.""" +import statistics from collections.abc import Hashable from dataclasses import MISSING, dataclass, fields from typing import Any, List, Optional +@dataclass +class MetricStats: + """Aggregated statistics for one metric over the filtered steps.""" + + mean: float + min: float + max: float + std: float + t99: float + t95: float + + @classmethod + def from_values(cls, values: list[float]) -> "MetricStats": + """Build stats from a non-empty list of values (population std; inclusive percentiles).""" + return cls( + mean=statistics.mean(values), + min=min(values), + max=max(values), + std=statistics.pstdev(values), + t99=cls._percentile(values, 99), + t95=cls._percentile(values, 95), + ) + + @staticmethod + def _percentile(values: list[float], p: int) -> float: + """Inclusive, linearly-interpolated p-th percentile; returns the sole value for a single sample.""" + if len(values) == 1: + return float(values[0]) + return statistics.quantiles(values, n=100, method="inclusive")[p - 1] + + @dataclass(frozen=True) class Scalar: """A single scalar event from a training run (source-agnostic: TensorBoard today, others later).""" @@ -51,6 +83,29 @@ class TrainingStep: OPTIONAL_STEP_FIELDS = {f.name for f in fields(TrainingStep) if f.default is not MISSING} +@dataclass(kw_only=True) +class StepAggregation: + """Per-metric aggregated statistics over the filtered steps.""" + + step_time_sec: MetricStats + loss: MetricStats + memory_reserved_bytes: MetricStats + memory_allocated_bytes: MetricStats + tflops_per_gpu: Optional[MetricStats] = None + + @classmethod + def from_steps(cls, steps: list["TrainingStep"]) -> "StepAggregation": + """Build per-metric stats from a non-empty list of already-filtered steps.""" + tflops = [s.tflops_per_gpu for s in steps if s.tflops_per_gpu is not None] + return cls( + step_time_sec=MetricStats.from_values([s.step_time_sec for s in steps]), + loss=MetricStats.from_values([s.loss for s in steps]), + memory_reserved_bytes=MetricStats.from_values([s.memory_reserved_bytes for s in steps]), + memory_allocated_bytes=MetricStats.from_values([s.memory_allocated_bytes for s in steps]), + tflops_per_gpu=MetricStats.from_values(tflops) if tflops else None, + ) + + @dataclass(kw_only=True) class TrainingConfig: """ @@ -95,6 +150,15 @@ class TrainingConfig: world_size: Optional[int] = None # CloudAI-computed (None when gpus_per_node is unavailable) num_nodes: int = 0 # CloudAI-computed + # Profiling (CloudAI-computed from the run's nsys/profiler settings) + profiling_enabled: bool = False + profiling_start_step: Optional[int] = None + profiling_stop_step: Optional[int] = None + + # Aggregation window (steps dropped before computing the top-level aggregation) + exclude_start_steps: int = 5 + exclude_post_profiling_steps: int = 2 + # Identity test_template_name: str = "" # CloudAI-computed @@ -105,3 +169,4 @@ class TrainingResults: config: TrainingConfig steps: List[TrainingStep] + aggregation: Optional[StepAggregation] = None # None when no steps remain after exclusions diff --git a/src/cloudai/report_generator/training/parser.py b/src/cloudai/report_generator/training/parser.py index f4b271535..3bb70a77a 100644 --- a/src/cloudai/report_generator/training/parser.py +++ b/src/cloudai/report_generator/training/parser.py @@ -21,10 +21,10 @@ """ import ast +import fnmatch import json import logging from abc import ABC, abstractmethod -from functools import reduce from pathlib import Path from typing import Any, ClassVar, Optional @@ -33,15 +33,18 @@ from cloudai.core import System, TestRun from .mappings import ( - MEGATRON_BRIDGE_CONFIG, + MEGATRON_BRIDGE_MODEL_CONFIG, MEGATRON_BRIDGE_SCALE, MEGATRON_BRIDGE_STEPS, - MEGATRON_CONFIG, + MEGATRON_BRIDGE_TEST_CONFIG, + MEGATRON_MODEL_CONFIG, MEGATRON_STEPS, - NEMO_CONFIG, + MEGATRON_TEST_CONFIG, + NEMO_MODEL_CONFIG, NEMO_STEPS, + NEMO_TEST_CONFIG, ) -from .models import OPTIONAL_STEP_FIELDS, Scalar, TrainingConfig, TrainingResults, TrainingStep +from .models import OPTIONAL_STEP_FIELDS, Scalar, StepAggregation, TrainingConfig, TrainingResults, TrainingStep from .tb_reader import read_scalars, read_text @@ -49,8 +52,9 @@ class TrainingParser(ABC): """Base parser: a workload's run artifacts -> TrainingResults.""" STEP_MAPPING: ClassVar[dict[str, str]] # TrainingStep field -> TB scalar tag (read from the value column) - CONFIG_MAPPING: ClassVar[dict[str, str]] SCALE: ClassVar[dict[str, float]] = {} # TrainingStep field -> unit factor (e.g. m-bridge gigabytes -> bytes) + MODEL_CONFIG_MAPPING: ClassVar[dict[str, str]] # TrainingConfig field -> path in the framework's resolved config + TEST_CONFIG_MAPPING: ClassVar[dict[str, str]] = {} # TrainingConfig field -> dotted path in the TestDefinition @abstractmethod def get_tb_dir(self, tr: TestRun) -> Path: @@ -61,8 +65,8 @@ def get_config_path(self, tr: TestRun) -> Optional[Path]: """Config artifact path used by the parser, or None when it cannot be found.""" @abstractmethod - def get_config(self, tr: TestRun) -> dict: - """Read the workload's config artifact into a (possibly nested) dict.""" + def get_model_config(self, tr: TestRun) -> dict: + """Read the framework's resolved config artifact into a (possibly nested) dict.""" @abstractmethod def get_model_name(self, tr: TestRun) -> str: @@ -79,8 +83,27 @@ def can_parse(self, tr: TestRun) -> bool: def parse(self, tr: TestRun, system: System) -> TrainingResults: """Read TB scalars + the config artifact and assemble TrainingResults.""" steps: list[TrainingStep] = self._build_steps(self._read_scalars(tr)) - config: TrainingConfig = self._build_config(self.get_config(tr), tr, system) - return TrainingResults(config=config, steps=steps) + config: TrainingConfig = self._build_config(tr, system) + aggregation = self._aggregate(steps, config) + return TrainingResults(config=config, steps=steps, aggregation=aggregation) + + def _aggregate(self, steps: list[TrainingStep], config: TrainingConfig) -> Optional[StepAggregation]: + """Per-metric stats over the filtered steps (first N + profiling window dropped); None if none remain.""" + filtered = self._filter_steps(steps, config) + return StepAggregation.from_steps(filtered) if filtered else None + + @staticmethod + def _filter_steps(steps: list[TrainingStep], config: TrainingConfig) -> list[TrainingStep]: + """Drop the first exclude_start_steps, then the profiling window + exclude_post_profiling_steps after it.""" + steps = steps[config.exclude_start_steps :] + if ( + config.profiling_enabled + and config.profiling_start_step is not None + and config.profiling_stop_step is not None + ): + lo, hi = config.profiling_start_step, config.profiling_stop_step + config.exclude_post_profiling_steps + steps = [s for s in steps if not (lo <= s.iteration <= hi)] + return steps def _build_steps(self, scalars: list[Scalar]) -> list[TrainingStep]: """list[Scalar] -> list[TrainingStep] via STEP_MAPPING; drop a step only if a required tag is missing.""" @@ -105,11 +128,11 @@ def _build_step(self, step: int, step_scalars: list[Scalar]) -> Optional[Trainin } return TrainingStep(iteration=step, **field_values) - def _build_config(self, raw: dict, tr: TestRun, system: System) -> TrainingConfig: - """Map the raw config via CONFIG_MAPPING, then fill the CloudAI-computed fields.""" - field_values: dict[str, Any] = { - field: self._get_from_dict(raw, path) for field, path in self.CONFIG_MAPPING.items() - } + def _build_config(self, tr: TestRun, system: System) -> TrainingConfig: + """Map the framework + test config into TrainingConfig, then fill the CloudAI-computed fields.""" + field_values: dict[str, Any] = {} + field_values.update(self._resolve_model_config(tr)) + field_values.update(self._resolve_test_config(tr)) config = TrainingConfig(**field_values) config.test_template_name = tr.test.test_template_name config.num_nodes = tr.nnodes @@ -141,9 +164,20 @@ def _group_by_step(scalars: list[Scalar]) -> dict[int, list[Scalar]]: return dict(sorted(by_step.items())) @staticmethod - def _get_from_dict(raw: dict, path: str) -> Any: - """Read a dotted path (e.g. 'model.num_layers') from a nested dict; None if any key is missing.""" - return reduce(lambda value, key: value.get(key) if isinstance(value, dict) else None, path.split("."), raw) + def _get_from_dict(source: dict, path: str) -> Any: + """Read a dotted path; a '*' segment globs that key against the current dict's keys (first match).""" + value: Any = source + for part in path.split("."): + if not isinstance(value, dict): + return None + if "*" not in part: + value = value.get(part) + continue + matches = fnmatch.filter(value, part) + if len(matches) > 1: + logging.warning(f"Multiple config values match '{part}' ({matches}); using '{matches[0]}'.") + value = value[matches[0]] if matches else None + return value def _compute_data_parallel_size(self, config: TrainingConfig, world_size: int) -> int: """world_size / (tensor*pipeline*context parallel). EP is carved out of DP, not world_size.""" @@ -158,12 +192,37 @@ def _compute_data_parallel_size(self, config: TrainingConfig, world_size: int) - raise ValueError(f"{cls}: world_size {world_size} not a multiple of tp*pp*cp={parallel}; check topology") return world_size // parallel + def _resolve_model_config(self, tr: TestRun) -> dict[str, Any]: + """Resolve MODEL_CONFIG_MAPPING against the framework's resolved config (values already native).""" + source = self.get_model_config(tr) + return {field: self._get_from_dict(source, path) for field, path in self.MODEL_CONFIG_MAPPING.items()} + + def _resolve_test_config(self, tr: TestRun) -> dict[str, Any]: + """Resolve TEST_CONFIG_MAPPING against the run's TestDefinition; coerce values, drop misses (keep defaults).""" + source = tr.test.model_dump() + return { + field: self._parse_literal(value) + for field, path in self.TEST_CONFIG_MAPPING.items() + if (value := self._get_from_dict(source, path)) is not None + } + + @staticmethod + def _parse_literal(value: Any) -> Any: + """Parse a string value to its native type ("20" -> 20, "True" -> True); pass non-strings through.""" + if not isinstance(value, str): + return value + try: + return ast.literal_eval(value) + except (ValueError, SyntaxError): + return value + class NeMoRunParser(TrainingParser): """NeMoRun: config from nemo_config.json, steps from TB scalars.""" STEP_MAPPING = NEMO_STEPS - CONFIG_MAPPING = NEMO_CONFIG + MODEL_CONFIG_MAPPING = NEMO_MODEL_CONFIG + TEST_CONFIG_MAPPING = NEMO_TEST_CONFIG CONFIG_FILE = "nemo_config.json" def get_tb_dir(self, tr: TestRun) -> Path: @@ -172,7 +231,7 @@ def get_tb_dir(self, tr: TestRun) -> Path: def get_config_path(self, tr: TestRun) -> Path: return tr.output_path / self.CONFIG_FILE - def get_config(self, tr: TestRun) -> dict: + def get_model_config(self, tr: TestRun) -> dict: return json.loads(self.get_config_path(tr).read_text()) def get_model_name(self, tr: TestRun) -> str: @@ -187,7 +246,8 @@ class MegatronParser(TrainingParser): """MegatronRun: config from TB text summaries, steps from TB scalars.""" STEP_MAPPING = MEGATRON_STEPS - CONFIG_MAPPING = MEGATRON_CONFIG + MODEL_CONFIG_MAPPING = MEGATRON_MODEL_CONFIG + TEST_CONFIG_MAPPING = MEGATRON_TEST_CONFIG def get_config_path(self, tr: TestRun) -> Optional[Path]: # Megatron's config lives in the TB text stream, not a separate file; return any TB entry so the @@ -197,18 +257,11 @@ def get_config_path(self, tr: TestRun) -> Optional[Path]: def _read_scalars(self, tr: TestRun) -> list[Scalar]: return [s for s in super()._read_scalars(tr) if not s.tag.endswith(SAMPLE_AXIS_SUFFIX)] - @staticmethod - def _parse_literal(value: str) -> Any: - # TB text config is all strings; parse "64" -> 64, "True" -> True, leave bare words ("rope") as str. - try: - return ast.literal_eval(value) - except (ValueError, SyntaxError): - return value - def get_tb_dir(self, tr: TestRun) -> Path: return tr.output_path / "tensorboard" - def get_config(self, tr: TestRun) -> dict: + def get_model_config(self, tr: TestRun) -> dict: + # TB text config is all strings; _parse_literal turns "64" -> 64, "True" -> True, leaves bare words as str. return {tag: self._parse_literal(value) for tag, value in read_text(self.get_tb_dir(tr)).items()} def get_model_name(self, tr: TestRun) -> str: @@ -220,7 +273,8 @@ class MegatronBridgeParser(TrainingParser): STEP_MAPPING = MEGATRON_BRIDGE_STEPS SCALE = MEGATRON_BRIDGE_SCALE - CONFIG_MAPPING = MEGATRON_BRIDGE_CONFIG + MODEL_CONFIG_MAPPING = MEGATRON_BRIDGE_MODEL_CONFIG + TEST_CONFIG_MAPPING = MEGATRON_BRIDGE_TEST_CONFIG CONFIG_FILE = "ConfigContainer.yaml" def _read_scalars(self, tr: TestRun) -> list[Scalar]: @@ -234,7 +288,7 @@ def get_config_path(self, tr: TestRun) -> Optional[Path]: hits = sorted(tr.output_path.glob(f"**/configs/{self.CONFIG_FILE}")) return hits[0] if hits else None - def get_config(self, tr: TestRun) -> dict: + def get_model_config(self, tr: TestRun) -> dict: path = self.get_config_path(tr) return yaml.safe_load(path.read_text()) if path else {} diff --git a/tests/report_generator/training/test_training_parser.py b/tests/report_generator/training/test_training_parser.py index 548ad1e22..2e8e112d3 100644 --- a/tests/report_generator/training/test_training_parser.py +++ b/tests/report_generator/training/test_training_parser.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import types from pathlib import Path from typing import Any @@ -22,7 +23,7 @@ from cloudai.report_generator.training import parser as parser_mod from cloudai.report_generator.training import tb_reader -from cloudai.report_generator.training.models import Scalar +from cloudai.report_generator.training.models import Scalar, TrainingStep from cloudai.report_generator.training.parser import MegatronBridgeParser, MegatronParser, NeMoRunParser from cloudai.report_generator.training.report_generation_strategy import TrainingReportGenerationStrategy @@ -35,13 +36,41 @@ def _system(gpus_per_node: int | None = 4, ntasks_per_node: int | None = None) - return types.SimpleNamespace(gpus_per_node=gpus_per_node, ntasks_per_node=ntasks_per_node) +class _Test(types.SimpleNamespace): + """Stand-in for the pydantic TestDefinition: model_dump() returns nested cmd_args/nsys dicts.""" + + def model_dump(self) -> dict[str, Any]: + dumped = dict(vars(self)) + dumped["cmd_args"] = dict(vars(self.cmd_args)) + dumped["nsys"] = dict(vars(self.nsys)) if self.nsys is not None else None + return dumped + + def _tr( - output_path: str | Path = "run", nnodes: int = 8, name: str = "t", template: str = "NeMoRun", **cmd_args: Any + output_path: str | Path = "run", + nnodes: int = 8, + name: str = "t", + template: str = "NeMoRun", + nsys: Any = None, + extra_cmd_args: dict[str, Any] | None = None, + training_report: dict[str, Any] | None = None, + **cmd_args: Any, ) -> Any: - test = types.SimpleNamespace(name=name, test_template_name=template, cmd_args=types.SimpleNamespace(**cmd_args)) + test = _Test( + name=name, + test_template_name=template, + cmd_args=types.SimpleNamespace(**cmd_args), + nsys=nsys, + extra_cmd_args=extra_cmd_args or {}, + training_report=training_report, + ) return types.SimpleNamespace(output_path=Path(output_path), nnodes=nnodes, test=test) +def _nsys(enable: bool) -> Any: + return types.SimpleNamespace(enable=enable) + + # --- steps --------------------------------------------------------------------------------------- @@ -207,9 +236,9 @@ def test_build_config_resolves_paths_and_computes_fields(): "parallelism": {"tensor_model_parallel_size": 4, "pipeline_model_parallel_size": 1, "context_parallel_size": 1}, "model": {"num_layers": 30}, } - config = NeMoRunParser()._build_config( - raw, _tr(nnodes=8, template="NeMoRun", recipe_name="gpt3"), _system(gpus_per_node=4) - ) + parser = NeMoRunParser() + parser.get_model_config = lambda tr: raw + config = parser._build_config(_tr(nnodes=8, template="NeMoRun", recipe_name="gpt3"), _system(gpus_per_node=4)) assert config.test_template_name == "NeMoRun" # CloudAI-computed assert config.micro_batch_size == 1 # nested dotted-path resolve @@ -223,7 +252,9 @@ def test_build_config_resolves_paths_and_computes_fields(): def test_build_config_leaves_world_size_none_without_gpus_per_node(): # No gpus_per_node/ntasks_per_node on the system: world_size/data_parallel_size stay None, rest still resolves. raw = {"parallelism": {"tensor_model_parallel_size": 4, "pipeline_model_parallel_size": 1}} - config = NeMoRunParser()._build_config(raw, _tr(nnodes=8, recipe_name="gpt3"), _system(gpus_per_node=None)) + parser = NeMoRunParser() + parser.get_model_config = lambda tr: raw + config = parser._build_config(_tr(nnodes=8, recipe_name="gpt3"), _system(gpus_per_node=None)) assert config.world_size is None assert config.data_parallel_size is None @@ -243,7 +274,7 @@ def test_megatron_config_parses_string_literals(monkeypatch): monkeypatch.setattr(parser_mod, "read_text", lambda _tb_dir: text) parser = MegatronParser() - config = parser._build_config(parser.get_config(_tr()), _tr(nnodes=16, name="dsv3"), _system(gpus_per_node=4)) + config = parser._build_config(_tr(nnodes=16, name="dsv3"), _system(gpus_per_node=4)) assert config.micro_batch_size == 1 # "1" -> int assert config.sequence_parallel is True # "True" -> bool @@ -254,10 +285,182 @@ def test_megatron_config_parses_string_literals(monkeypatch): def test_compute_data_parallel_size_rejects_invalid_topology(): parser = NeMoRunParser() parallel = {"tensor_model_parallel_size": 4, "pipeline_model_parallel_size": 1, "context_parallel_size": 1} + parser.get_model_config = lambda tr: {"parallelism": parallel} with pytest.raises(ValueError, match="world_size"): # world_size 34 is not a multiple of tp*pp*cp=4 - parser._build_config({"parallelism": parallel}, _tr(nnodes=17, recipe_name="x"), _system(gpus_per_node=2)) + parser._build_config(_tr(nnodes=17, recipe_name="x"), _system(gpus_per_node=2)) + parser.get_model_config = lambda tr: {"parallelism": {}} with pytest.raises(ValueError, match="tensor_parallel_size"): # tp missing from the parsed config - parser._build_config({"parallelism": {}}, _tr(recipe_name="x"), _system()) + parser._build_config(_tr(recipe_name="x"), _system()) + + +# --- profiling ----------------------------------------------------------------------------------- + + +def test_nemo_profiling_reads_nsys_and_callback_steps(): + # enable maps from [nsys]; the step bounds are extracted from extra_cmd_args by suffix (index-agnostic). + tr = _tr( + nsys=_nsys(True), + extra_cmd_args={"trainer.callbacks[2].start_step": "20", "trainer.callbacks[2].end_step": "25"}, + ) + assert NeMoRunParser()._resolve_test_config(tr) == { + "profiling_enabled": True, + "profiling_start_step": 20, + "profiling_stop_step": 25, + } + + +def test_nemo_profiling_disabled_and_no_steps(): + tr = _tr(nsys=_nsys(False)) + assert NeMoRunParser()._resolve_test_config(tr) == {"profiling_enabled": False} + + +def test_nemo_profiling_step_extraction_is_index_agnostic(): + # a different user-chosen callback index still resolves via the suffix match + tr = _tr(nsys=_nsys(True), extra_cmd_args={"trainer.callbacks[0].start_step": "7"}) + resolved = NeMoRunParser()._resolve_test_config(tr) + assert resolved["profiling_start_step"] == 7 + assert "profiling_stop_step" not in resolved + + +def test_megatron_profiling_reads_profile_steps(): + tr = _tr(nsys=_nsys(True), profile_step_start=50, profile_step_end=55) + assert MegatronParser()._resolve_test_config(tr) == { + "profiling_enabled": True, + "profiling_start_step": 50, + "profiling_stop_step": 55, + } + + +def test_megatron_profiling_absent_is_dropped(): + # no [nsys] section and no profile_step_* args: everything is dropped, so config keeps model defaults. + tr = _tr(nsys=None) + assert MegatronParser()._resolve_test_config(tr) == {} + + +def test_megatron_bridge_profiling_reads_typed_fields(): + tr = _tr(enable_nsys=True, profiling_start_step=10, profiling_stop_step=12) + assert MegatronBridgeParser()._resolve_test_config(tr) == { + "profiling_enabled": True, + "profiling_start_step": 10, + "profiling_stop_step": 12, + } + + +def test_build_config_sets_profiling_fields(): + # M-Bridge exposes enable + step bounds as typed cmd_args, so _build_config folds all three into the config. + raw = {"model": {"tensor_model_parallel_size": 4, "pipeline_model_parallel_size": 1}} + parser = MegatronBridgeParser() + parser.get_model_config = lambda tr: raw + tr = _tr(model_recipe_name="gpt3", enable_nsys=True, profiling_start_step=3, profiling_stop_step=7) + config = parser._build_config(tr, _system(gpus_per_node=4)) + + assert config.profiling_enabled is True + assert (config.profiling_start_step, config.profiling_stop_step) == (3, 7) + + +def test_build_config_reads_aggregation_flags_from_toml(): + parser = NeMoRunParser() + parser.get_model_config = lambda tr: {} + tr = _tr(recipe_name="gpt3", training_report={"exclude_start_steps": 10, "exclude_post_profiling_steps": 3}) + config = parser._build_config(tr, _system(gpus_per_node=None)) + assert (config.exclude_start_steps, config.exclude_post_profiling_steps) == (10, 3) + + +def test_build_config_aggregation_flags_default_when_absent(): + parser = NeMoRunParser() + parser.get_model_config = lambda tr: {} + config = parser._build_config(_tr(recipe_name="gpt3"), _system(gpus_per_node=None)) + assert (config.exclude_start_steps, config.exclude_post_profiling_steps) == (5, 2) + + +# --- aggregation --------------------------------------------------------------------------------- + + +def _agg_config(**overrides: Any) -> Any: + base = { + "profiling_enabled": False, + "profiling_start_step": None, + "profiling_stop_step": None, + "exclude_start_steps": 0, + "exclude_post_profiling_steps": 0, + } + base.update(overrides) + return types.SimpleNamespace(**base) + + +def _agg_steps(step_times: list[float]) -> list[TrainingStep]: + return [ + TrainingStep( + iteration=i, + step_time_sec=t, + loss=1.0, + memory_reserved_bytes=1.0, + memory_allocated_bytes=1.0, + tflops_per_gpu=None, + ) + for i, t in enumerate(step_times) + ] + + +def test_filter_steps_drops_start_then_profiling_window(): + steps = _agg_steps([1.0] * 10) # iterations 0..9 + config = _agg_config( + exclude_start_steps=2, + profiling_enabled=True, + profiling_start_step=4, + profiling_stop_step=5, + exclude_post_profiling_steps=1, + ) + # drop first 2 (0,1) then the profiling window [4, 5+1] -> 4,5,6 + assert [s.iteration for s in NeMoRunParser._filter_steps(steps, config)] == [2, 3, 7, 8, 9] + + +def test_filter_steps_ignores_profiling_window_when_disabled(): + steps = _agg_steps([1.0] * 6) + config = _agg_config(exclude_start_steps=2, profiling_enabled=False, profiling_start_step=3, profiling_stop_step=4) + assert [s.iteration for s in NeMoRunParser._filter_steps(steps, config)] == [2, 3, 4, 5] + + +def test_aggregate_computes_per_metric_stats(): + agg = NeMoRunParser()._aggregate(_agg_steps([10.0, 20.0, 30.0]), _agg_config()) + assert agg is not None + st = agg.step_time_sec + assert st is not None + assert st.mean == 20.0 + assert (st.min, st.max) == (10.0, 30.0) + assert st.std == pytest.approx(8.16496, rel=1e-4) # population stdev + assert st.mean <= st.t95 <= st.max + + +def test_aggregate_tflops_is_none_when_absent(): + agg = NeMoRunParser()._aggregate(_agg_steps([1.0, 2.0]), _agg_config()) + assert agg is not None + assert agg.tflops_per_gpu is None # tflops is None on every step + assert agg.step_time_sec is not None + + +def test_aggregate_returns_none_when_all_filtered_out(): + # only 2 steps but exclude_start_steps=5 -> nothing remains -> aggregation is None + assert NeMoRunParser()._aggregate(_agg_steps([1.0, 2.0]), _agg_config(exclude_start_steps=5)) is None + + +# --- glob lookup --------------------------------------------------------------------------------- + + +def test_get_from_dict_warns_on_multiple_glob_matches(caplog): + source = {"extra_cmd_args": {"a.start_step": "1", "b.start_step": "2"}} + with caplog.at_level(logging.WARNING): + result = NeMoRunParser._get_from_dict(source, "extra_cmd_args.*start_step") + assert result == "1" # first match kept + assert "Multiple config values match" in caplog.text + + +def test_get_from_dict_single_glob_match_does_not_warn(caplog): + source = {"extra_cmd_args": {"trainer.callbacks[2].start_step": "20"}} + with caplog.at_level(logging.WARNING): + result = NeMoRunParser._get_from_dict(source, "extra_cmd_args.*start_step") + assert result == "20" + assert "Multiple config values match" not in caplog.text # --- can_parse ----------------------------------------------------------------------------------- diff --git a/tests/test_test_definitions.py b/tests/test_test_definitions.py index 986fae491..e7ac59778 100644 --- a/tests/test_test_definitions.py +++ b/tests/test_test_definitions.py @@ -32,6 +32,7 @@ TestRun, ) from cloudai.models.scenario import TestRunDetails +from cloudai.models.workload import TrainingReportConfig from cloudai.systems.slurm.slurm_system import SlurmSystem from cloudai.workloads.chakra_replay import ChakraReplayCmdArgs, ChakraReplayTestDefinition from cloudai.workloads.jax_toolbox import ( @@ -238,6 +239,17 @@ def test_extra_args(self): assert nsys.cmd_args == ["nsys", "profile", "--extra", "args"] +class TestTrainingReportConfig: + def test_defaults(self): + cfg = TrainingReportConfig() + assert (cfg.exclude_start_steps, cfg.exclude_post_profiling_steps) == (5, 2) + + @pytest.mark.parametrize("field", ["exclude_start_steps", "exclude_post_profiling_steps"]) + def test_rejects_negative(self, field: str): + with pytest.raises(ValidationError): + TrainingReportConfig(**{field: -1}) + + class TestLoadTestDefinition: @pytest.fixture def test_parser(self) -> TestParser: