diff --git a/conf/experimental/test/moe_benchmark_standard.toml b/conf/experimental/test/moe_benchmark_HT.toml similarity index 53% rename from conf/experimental/test/moe_benchmark_standard.toml rename to conf/experimental/test/moe_benchmark_HT.toml index 9af13a971..12db60c19 100644 --- a/conf/experimental/test/moe_benchmark_standard.toml +++ b/conf/experimental/test/moe_benchmark_HT.toml @@ -14,19 +14,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -name = "moe_benchmark_standard" -description = "MoE Benchmark - DeepEP standard mode plus matrix export" +name = "moe_benchmark_HT" +description = "MoE Benchmark - high-throughput (HT) mode plus matrix export" test_template_name = "MoEBenchmark" [cmd_args] docker_image_url = "/your/path/to/the/container" -benchmark_root = "/workspace/dp-benchmark/benchmark" -mode = "standard" +benchmark_root = "/workspace/DeepEP/benchmark" +mode = "standard" # benchmark CLI value for high-throughput; LL lives in moe_benchmark_low_latency.toml +deepep_versions = [ + "legacy", + "elastic", + "nixl_ep", + "uccl_ep", + "nccl_ep", + "deepep_hybrid", +] tokens = 4096 num_experts = 256 num_topk = 8 hidden_size = 7168 data_type = "bfloat16" +benchmark_combine = true allow_nvlink_for_low_latency = false allow_mnnvl = false round_scale = false @@ -37,8 +46,19 @@ shuffle_columns = false use_kineto_profiler = false enable_tuning = false config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" +results_dir = "/workspace/DeepEP/results" [extra_env_vars] +UCX_LOG_LEVEL = "error" NUM_QPS_PER_RANK = "12" NUM_SMS = "24" +# nixl_ep imports the CUDA-versioned package from the meson build dir. Set it +# explicitly (no trailing colon) so it's present regardless of --export behavior; +# harmless for v1/v2 (deep_ep is in site-packages). +PYTHONPATH = "/workspace/nixl/build/examples/device/ep" +# uccl_ep: UCCL RDMA env (mirror NCCL's GID/iface) + intranode hint. UCCL's +# Buffer falls back to torch.cuda.current_device() when LOCAL_RANK is unset, so +# init_dist_slurm's per-rank device is used; LOCAL_WORLD_SIZE aids its topo detect. +UCCL_SOCKET_IFNAME = "eno3" +UCCL_IB_GID_INDEX = "3" +LOCAL_WORLD_SIZE = "8" diff --git a/conf/experimental/test/moe_benchmark_low_latency.toml b/conf/experimental/test/moe_benchmark_low_latency.toml index fcab8d319..5ae3e6ad1 100644 --- a/conf/experimental/test/moe_benchmark_low_latency.toml +++ b/conf/experimental/test/moe_benchmark_low_latency.toml @@ -15,18 +15,20 @@ # limitations under the License. name = "moe_benchmark_low_latency" -description = "MoE Benchmark - DeepEP low-latency mode plus matrix export" +description = "MoE Benchmark - low-latency (LL) decode mode plus matrix export" test_template_name = "MoEBenchmark" [cmd_args] docker_image_url = "/your/path/to/the/container" -benchmark_root = "/path/in/the/container/to/the/tests/folder" +benchmark_root = "/workspace/DeepEP/benchmark" mode = "low_latency" +deepep_versions = ["legacy", "nixl_ep", "uccl_ep", "nccl_ep"] tokens = 128 -num_experts = 288 +num_experts = 256 num_topk = 8 hidden_size = 7168 data_type = "bfloat16" +benchmark_combine = true allow_nvlink_for_low_latency = false allow_mnnvl = false round_scale = false @@ -37,8 +39,18 @@ shuffle_columns = false use_kineto_profiler = false enable_tuning = false config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" +results_dir = "/workspace/DeepEP/results" [extra_env_vars] +UCX_LOG_LEVEL = "error" NUM_QPS_PER_RANK = "12" NUM_SMS = "24" +NCCL_P2P_DISABLE = "1" +NCCL_NVLS_DISABLE = "1" +NVSHMEM_DISABLE_P2P = "1" +# nixl_ep imports the CUDA-versioned package from the meson build dir. +PYTHONPATH = "/workspace/nixl/build/examples/device/ep" +# uccl_ep: UCCL RDMA env (mirror NCCL's GID/iface) + intranode hint. +UCCL_SOCKET_IFNAME = "eno3" +UCCL_IB_GID_INDEX = "3" +LOCAL_WORLD_SIZE = "8" diff --git a/conf/experimental/test/nccl_test_alltoallv.toml b/conf/experimental/test/nccl_test_alltoallv.toml index 8bb21eb82..d66509a7b 100644 --- a/conf/experimental/test/nccl_test_alltoallv.toml +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -31,6 +31,7 @@ warmup_iters = 1 check = 1 blocking = 0 use_deepep_matrix = true +average = 3 [extra_env_vars] NCCL_P2P_DISABLE = "1" diff --git a/conf/experimental/test/ucc_alltoallv_deepep.toml b/conf/experimental/test/ucc_alltoallv_deepep.toml index 8f5554870..cb888c084 100644 --- a/conf/experimental/test/ucc_alltoallv_deepep.toml +++ b/conf/experimental/test/ucc_alltoallv_deepep.toml @@ -26,6 +26,7 @@ e = "8M" use_deepep_matrix = true [extra_env_vars] +UCX_LOG_LEVEL = "error" UCX_IB_GID_INDEX = "auto" UCX_TLS = "cuda_copy,rc" UCX_RNDV_THRESH = "0" diff --git a/conf/experimental/test_scenario/moe_benchmark.toml b/conf/experimental/test_scenario/moe_benchmark_HT.toml similarity index 97% rename from conf/experimental/test_scenario/moe_benchmark.toml rename to conf/experimental/test_scenario/moe_benchmark_HT.toml index ac4a33ac6..554cccd64 100644 --- a/conf/experimental/test_scenario/moe_benchmark.toml +++ b/conf/experimental/test_scenario/moe_benchmark_HT.toml @@ -18,7 +18,7 @@ name = "moe-benchmark" [[Tests]] id = "Tests.moe_benchmark" -test_name = "moe_benchmark_standard" +test_name = "moe_benchmark_HT" num_nodes = 2 time_limit = "00:30:00" diff --git a/conf/experimental/test_scenario/moe_benchmark_low_latency.toml b/conf/experimental/test_scenario/moe_benchmark_low_latency.toml new file mode 100644 index 000000000..6073ff2d1 --- /dev/null +++ b/conf/experimental/test_scenario/moe_benchmark_low_latency.toml @@ -0,0 +1,46 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Low-latency (decode) counterpart of moe_benchmark.toml: runs the LL MoE test +# (legacy/nixl_ep/uccl_ep/nccl_ep) then the UCC/NCCL all-to-all-v baselines. Uses the +# dedicated no-NVLink baseline variants (ucc_alltoallv_deepep_nonvlink / +# nccl_test_alltoallv_nonvlink) so the baselines match the RDMA-only LL EP backends — +# no need to toggle the NVLink-ON HT baselines. +name = "moe-benchmark-ll" + +[[Tests]] +id = "Tests.moe_benchmark" +test_name = "moe_benchmark_low_latency" +num_nodes = 2 +time_limit = "00:30:00" + +[[Tests]] +id = "Tests.ucc_alltoallv" +test_name = "ucc_alltoallv_deepep_nonvlink" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.moe_benchmark" + +[[Tests]] +id = "Tests.nccl_alltoallv" +test_name = "nccl_test_alltoallv_nonvlink" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.ucc_alltoallv" diff --git a/src/cloudai/systems/slurm/single_sbatch_runner.py b/src/cloudai/systems/slurm/single_sbatch_runner.py index 567e88f10..c855bb138 100644 --- a/src/cloudai/systems/slurm/single_sbatch_runner.py +++ b/src/cloudai/systems/slurm/single_sbatch_runner.py @@ -141,7 +141,27 @@ def get_global_env_vars(self) -> str: vars.append(f"export {key}={value}") return "\n".join(vars) + def _reject_single_sbatch_incompatible(self) -> None: + """ + Fail fast on tests that cannot run in single-sbatch mode, with an actionable message. + + Single-sbatch builds the whole script up front, before anything runs, so a test that needs an + artifact produced by an earlier test at runtime cannot work here. Each command-gen strategy decides + whether it is compatible via single_sbatch_unsupported_reason() (workloads that don't define it are + always allowed); this runner only aggregates the reasons and raises, staying workload-agnostic. + """ + problems: list[str] = [] + for tr in self.all_trs: + cmd_gen = self.get_cmd_gen_strategy(self.system, tr) + reason_fn = getattr(cmd_gen, "single_sbatch_unsupported_reason", None) + reason = reason_fn() if callable(reason_fn) else None + if reason: + problems.append(f" - {tr.name}: {reason}") + if problems: + raise ValueError("These tests cannot run in single-sbatch mode:\n" + "\n".join(problems)) + def gen_sbatch_content(self) -> str: + self._reject_single_sbatch_incompatible() content: list[str] = ["#!/bin/bash", *self.get_sbatch_directives(), ""] content.extend(self.aux_commands()) content.append("") @@ -149,6 +169,20 @@ def gen_sbatch_content(self) -> str: content.append(self.get_global_env_vars()) content.append("") + # Job-scoped prologue (head-node detection / etcd rendezvous). Only workloads that + # define gen_job_prologue() emit anything (e.g. MoE); others contribute nothing. + # Emit once, deduped. + seen_prologues: set[str] = set() + for tr in self.all_trs: + cmd_gen = self.get_cmd_gen_strategy(self.system, tr) + prologue_fn = getattr(cmd_gen, "gen_job_prologue", None) + prologue: list[str] = cast("list[str]", prologue_fn()) if callable(prologue_fn) else [] + key = "\n".join(prologue) + if prologue and key not in seen_prologues: + seen_prologues.add(key) + content.extend(prologue) + content.append("") + tr = self.test_scenario.test_runs[0] if tr.pre_test: content.append(self.add_pre_tests(tr.pre_test, tr)) diff --git a/src/cloudai/workloads/common/moe_benchmark_report.py b/src/cloudai/workloads/common/moe_benchmark_report.py index 510f6f8ab..35aa59e1c 100644 --- a/src/cloudai/workloads/common/moe_benchmark_report.py +++ b/src/cloudai/workloads/common/moe_benchmark_report.py @@ -24,6 +24,14 @@ MOE_BENCHMARK_PREV_MOUNT = "/cloudai_moe_benchmark_prev" +# Why a use_deepep_matrix=True baseline cannot run in single-sbatch mode. Shared by the UCC and +# NCCL command-gen strategies, surfaced to the runner via single_sbatch_unsupported_reason(). +DEEPEP_MATRIX_SINGLE_SBATCH_REASON = ( + "use_deepep_matrix=True replays the MoE benchmark's runtime traffic matrix, which is produced only after " + "the MoE test runs. Single-sbatch builds the whole script before anything runs, so the matrix is " + "unavailable. Run this scenario in multi-sbatch mode (drop --single-sbatch), or set use_deepep_matrix=false." +) + def start_post_comp_chain(test_run: TestRun) -> list[TestRun]: """Follow ``start_post_comp`` (e.g. UCC -> NCCL -> MoE benchmark).""" diff --git a/src/cloudai/workloads/moe_benchmark/moe_benchmark.py b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py index 9ba797c67..a0feebd8b 100644 --- a/src/cloudai/workloads/moe_benchmark/moe_benchmark.py +++ b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py @@ -16,6 +16,8 @@ from typing import Literal, Optional +from pydantic import Field + from cloudai.core import DockerImage, Installable from cloudai.models.workload import CmdArgs, TestDefinition @@ -24,8 +26,9 @@ class MoEBenchmarkCmdArgs(CmdArgs): """Command arguments for the custom MoE benchmark that compares EP/alltoallv backends.""" docker_image_url: str - benchmark_root: str = "/workspace/dp-benchmark/benchmark" + benchmark_root: str = "/workspace/DeepEP/benchmark" mode: Literal["standard", "low_latency"] = "standard" + deepep_versions: list[str] = Field(default_factory=lambda: ["legacy", "elastic"]) tokens: int = 1024 num_experts: int = 256 num_topk: int = 8 @@ -35,6 +38,7 @@ class MoEBenchmarkCmdArgs(CmdArgs): allow_mnnvl: bool = False round_scale: bool = False use_ue8m0: bool = False + benchmark_combine: bool = True num_warmups: int = 20 num_iterations: int = 50 shuffle_columns: bool = False @@ -42,8 +46,12 @@ class MoEBenchmarkCmdArgs(CmdArgs): enable_tuning: bool = False num_sms: int = 24 num_qps_per_rank: int = 12 + + v2_num_sms: int = 12 + v2_num_qps: int = 0 + v2_prefer_overlap_with_compute: bool = False config_file_path: str = "/tmp/config.yaml" - results_dir: str = "/workspace/dp-benchmark/results" + results_dir: str = "/workspace/DeepEP/results" class MoEBenchmarkTestDefinition(TestDefinition): @@ -72,6 +80,7 @@ def cmd_args_dict(self) -> dict: "docker_image_url", "benchmark_root", "mode", + "deepep_versions", "num_sms", "num_qps_per_rank", "config_file_path", diff --git a/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py index b87a161e1..c816e748d 100644 --- a/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py +++ b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py @@ -82,7 +82,7 @@ def generate_report(self) -> None: "num_ranks", "num_tokens", "hidden", - "deepep_time", + "time_s", "bus_bw_avg", "bus_bw_min", "bus_bw_max", diff --git a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py index f3943ad2f..536c86d4f 100644 --- a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py @@ -15,7 +15,7 @@ # limitations under the License. from pathlib import Path, PurePosixPath -from typing import List, cast +from typing import ClassVar, List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy @@ -25,28 +25,106 @@ class MoEBenchmarkSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for the custom MoE benchmark on Slurm systems.""" - def _append_head_node_detection(self, batch_script_content: List[str]) -> None: - batch_script_content.extend( - [ - "", - "nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )", - "nodes_array=($nodes)", - "head_node=${nodes_array[0]}", - 'head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)', - "", - "echo Nodes: $SLURM_JOB_NODELIST", - "echo Num Nodes: ${#nodes[@]}", - "echo Head Node IP: $head_node_ip", - "", - "export MASTER_ADDR=$head_node_ip", - "export MASTER_PORT=29500", - "", - ] - ) + # Per-backend env overrides. The backends are chained in ONE srun and share its + # env, but each EP lib needs different NCCL/UCX knobs, so we emit these as inline + # `VAR=val` assignments before that backend's `python` — they apply to ONLY that + # process. v1/v2/uccl_ep need nothing extra (they use the shared env). Mirrors + # csrc/gaia-run-allbackends.sh in the dp-benchmark repo. + # * nixl_ep: GID=auto + GIN(type 3) + NCCL net-plugin off (so deep_ep's + # duplicate-NCCL check doesn't trip when it loads the hpcx plugin). + # * nccl_ep: same + EP_SKIP_DEEPEP_PREIMPORT=1 (don't import deep_ep) and + # prepend the ISOLATED v0.1.0 NCCL prefix baked into the unified image, so + # libnccl_ep/nccl.ep resolve it without colliding with deepep's NCCL. + _BACKEND_ENV: ClassVar[dict[str, list[str]]] = { + "nixl_ep": [ + "NCCL_IB_GID_INDEX=auto", + "NCCL_NET_PLUGIN=none", + "NCCL_IB_QPS_PER_CONNECTION=4", + "NCCL_IB_SPLIT_DATA_ON_QPS=0", + "NCCL_IBEXT_DISABLE=0", + "OMPI_MCA_btl=tcp,self", + "EP_SUPPRESS_NCCL_CHECK=1", + "NCCL_GIN_TYPE=3", + ], + "nccl_ep": [ + "NCCL_IB_GID_INDEX=auto", + "NCCL_NET_PLUGIN=none", + "NCCL_IB_QPS_PER_CONNECTION=4", + "NCCL_IB_SPLIT_DATA_ON_QPS=0", + "NCCL_IBEXT_DISABLE=0", + "OMPI_MCA_btl=tcp,self", + "NCCL_GIN_TYPE=3", + "EP_SKIP_DEEPEP_PREIMPORT=1", + "LD_LIBRARY_PATH=/opt/nccl-ep-build/lib:$LD_LIBRARY_PATH", + ], + # Hybrid-EP: NIXL transport (etcd rendezvous started in the sbatch script, + # NIXL_ETCD_ENDPOINTS exported globally there). Its kernels are JIT-compiled + # at runtime, so nvcc needs the UCX(device API)+NIXL includes on CPATH. + "deepep_hybrid": [ + "NCCL_IB_GID_INDEX=auto", + "NUM_OF_TOKENS_PER_CHUNK_DISPATCH_API=64", + "NUM_OF_TOKENS_PER_CHUNK_COMBINE_API=64", + "NUM_OF_TOKENS_PER_CHUNK_PREPROCESSING_API=64", + "CPATH=/usr/local/ucx-nixl/include:/usr/local/nixl/include:/usr/local/nixl/include/gpu/ucx:$CPATH", + "CPLUS_INCLUDE_PATH=/usr/local/ucx-nixl/include:/usr/local/nixl/include:/usr/local/nixl/include/gpu/ucx:$CPATH", + ], + } + + def gen_job_prologue(self) -> List[str]: + """ + Head-node detection + MASTER_ADDR/PORT (+ etcd rendezvous for Hybrid-EP). + + Emitted once per job in BOTH paths: the per-test (multi-sbatch) header via + _append_sbatch_directives, and the single-sbatch script via SingleSbatchRunner. + """ + lines: List[str] = [ + "", + "nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )", + "nodes_array=($nodes)", + "head_node=${nodes_array[0]}", + 'head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)', + "", + "echo Nodes: $SLURM_JOB_NODELIST", + "echo Num Nodes: ${#nodes[@]}", + "echo Head Node IP: $head_node_ip", + "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", + ] + # Hybrid-EP's NIXL path needs an etcd server reachable by all ranks. If a hybrid + # backend is in the run, start etcd on the head node (background, in the unified + # container) and export NIXL_ETCD_ENDPOINTS for every step (only hybrid reads it). + # The trap kills it when the script exits (single- AND multi-sbatch); Slurm also + # reclaims the step at job end. + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + versions = tdef.cmd_args.deepep_versions or [] + if any(v in ("deepep_hybrid", "hybrid", "hybrid_ep") for v in versions): + image = self.image_path() + lines.extend( + [ + "# Hybrid-EP NIXL rendezvous: etcd on the head node (background).", + f'srun --overlap --nodes=1 --ntasks=1 -w "$head_node" --container-image={image} ' + 'bash -c "etcd --log-level error --listen-client-urls http://0.0.0.0:2379 ' + '--advertise-client-urls http://$head_node_ip:2379 --data-dir /tmp/etcd-cloudai-$$" &', + "_MOE_ETCD_PID=$!", + "trap 'kill ${_MOE_ETCD_PID} 2>/dev/null || true' EXIT", + "export NIXL_ETCD_ENDPOINTS=http://$head_node_ip:2379", + "# Poll etcd's client endpoint until it accepts connections (replaces a", + "# fixed sleep that could race a not-yet-ready endpoint); fail fast otherwise.", + "for _i in $(seq 1 60); do", + " (exec 3<>/dev/tcp/$head_node_ip/2379) 2>/dev/null && break", + ' [ "$_i" -eq 60 ] && { echo "ERROR: etcd $head_node_ip:2379 not ready after 60s" >&2; exit 1; }', + " sleep 1", + "done", + "", + ] + ) + return lines def _append_sbatch_directives(self, batch_script_content: List[str]) -> None: super()._append_sbatch_directives(batch_script_content) - self._append_head_node_detection(batch_script_content) + batch_script_content.extend(self.gen_job_prologue()) def _container_mounts(self) -> List[str]: """Return container mounts specific to the MoE benchmark.""" @@ -69,15 +147,29 @@ def generate_test_command(self) -> List[str]: tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args - script_name = "benchmark.py" if cmd_args.mode == "standard" else "benchmark_ll.py" - benchmark_script = str(PurePosixPath(cmd_args.benchmark_root) / script_name) + benchmark_script = str(PurePosixPath(cmd_args.benchmark_root) / "benchmark.py") + + # standard AND low_latency chain one process per backend (each backend is the + # 3rd CLI arg -> benchmark.py runs only that one; backends can't coexist in a + # process). Other modes fall back to a single call. + if cmd_args.mode not in ("standard", "low_latency"): + return ["python", benchmark_script, cmd_args.config_file_path] - return ["python", benchmark_script, cmd_args.config_file_path] + versions = cmd_args.deepep_versions or ["legacy"] + parts: List[str] = [] + for version in versions: + if parts: + parts.append("&&") + env_key = "deepep_hybrid" if version in ("hybrid", "hybrid_ep") else version + parts.extend(self._BACKEND_ENV.get(env_key, [])) + parts.extend(["python", benchmark_script, cmd_args.config_file_path, version]) + return parts def _generate_config_yaml(self, config_path: Path) -> None: tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) config_lines = ["# MoE Benchmark Configuration", "# Generated by CloudAI", ""] + config_lines.append(f'benchmark_type: "{tdef.cmd_args.mode}"') for key, value in tdef.cmd_args_dict.items(): if isinstance(value, bool): config_lines.append(f"{key}: {str(value).lower()}") diff --git a/src/cloudai/workloads/moe_benchmark/throughput_reporter.py b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py index 1bcdff17b..5398a5638 100644 --- a/src/cloudai/workloads/moe_benchmark/throughput_reporter.py +++ b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py @@ -14,15 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Scenario-level MoE throughput summary: standalone SVG file.""" +""" +Scenario-level MoE throughput summary: one standalone SVG with a 2x2 panel grid. + +TL Bus BW — dispatch TR Bus BW — combine (all backends + UCC/NCCL) +BL NVLink vs RDMA — disp. BR NVLink vs RDMA — comb. (EP backends only) +""" from __future__ import annotations import html import json import logging +import math import re from pathlib import Path +from typing import Any from cloudai.core import Reporter from cloudai.workloads.common.moe_benchmark_report import moe_benchmark_results_json_files @@ -32,20 +39,51 @@ from cloudai.workloads.ucc_test.ucc import UCCTestDefinition -def _read_latest_moe_results_rows(test_output: Path) -> list[object]: - paths = moe_benchmark_results_json_files(test_output) - if not paths: - return [] - latest = max(paths, key=lambda p: p.stat().st_mtime) - try: - data = json.loads(latest.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError) as e: - logging.debug("MoE benchmark results.json unreadable %s: %s", latest, e) - return [] - return data if isinstance(data, list) else [] - +def _read_moe_results_rows(test_output: Path) -> list[object]: + """ + Read and concatenate rows from ALL results.json under the test output. -def _extract_moe_bus_bw(row: object) -> tuple[str, float] | None: + One row per backend x operation; backends run as separate processes that append/merge. + """ + rows: list[object] = [] + for path in moe_benchmark_results_json_files(test_output): + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logging.debug("MoE benchmark results.json unreadable %s: %s", path, e) + continue + if isinstance(data, list): + rows.extend(data) + return rows + + +_DEEPEP_BACKEND_LABEL = { + "legacy": "legacy", + "elastic": "elastic", + "nixl_ep": "nixl_ep", + "uccl_ep": "uccl_ep", + "nccl_ep": "nccl_ep", + "deepep_hybrid": "hybrid_ep", +} +# Per-backend colors (bus panels): legacy green, elastic purple, nixl_ep red, +# uccl_ep cyan, nccl_ep brown, hybrid_ep pink. UCC/NCCL baselines = gray. +_DEEPEP_BACKEND_COLOR = { + "legacy": "#2ca02c", + "elastic": "#9467bd", + "nixl_ep": "#d62728", + "uccl_ep": "#17becf", + "nccl_ep": "#8c564b", + "deepep_hybrid": "#e377c2", +} +_BASELINE_COLOR = "#9a9a9a" # UCC / NCCL all-to-all-v baselines + +# NVLink vs RDMA split colors (bottom panels), one hue per component. +_NVL_COLOR = "#1baf7a" # NVLink (intra-node, scale-up) +_RDMA_COLOR = "#2a78d6" # RDMA (inter-node, scale-out) + + +def _extract_moe_bus_bw(row: object) -> tuple[str, str, float] | None: + """Return ``(backend, operation, bus_bw_avg)`` for a dispatch/combine row.""" if not isinstance(row, dict): return None op = row.get("operation") @@ -54,28 +92,84 @@ def _extract_moe_bus_bw(row: object) -> tuple[str, float] | None: op_l = op.lower() if op_l not in ("dispatch", "combine"): return None + backend = row.get("backend") + backend_l = backend if isinstance(backend, str) else "" try: - return op_l, float(row["bus_bw_avg"]) + val = float(row["bus_bw_avg"]) except (TypeError, ValueError): return None + if not math.isfinite(val): # skip NaN/Inf so gmax stays finite for the int() in the SVG path + return None + return backend_l, op_l, val -def _moe_benchmark_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: - """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" - by_op: dict[str, float] = {} - for row in _read_latest_moe_results_rows(test_output): - extracted = _extract_moe_bus_bw(row) - if extracted is not None: - by_op[extracted[0]] = extracted[1] +def _extract_moe_separate_bw(row: object) -> tuple[str, str, float, float] | None: + """ + Return ``(backend, operation, nvl_bw, rdma_bw)`` for a dispatch/combine row. + Only EP backends populate ``separate_nvl_bw`` / ``separate_rdma_bw``, so this + naturally excludes the UCC / NCCL all-to-all-v baselines (single bus bw, no split). + """ + if not isinstance(row, dict): + return None + op = row.get("operation") + if not isinstance(op, str): + return None + op_l = op.lower() + if op_l not in ("dispatch", "combine"): + return None + if "separate_nvl_bw" not in row or "separate_rdma_bw" not in row: + return None + backend = row.get("backend") + backend_l = backend if isinstance(backend, str) else "" + try: + nvl = float(row["separate_nvl_bw"]) + rdma = float(row["separate_rdma_bw"]) + except (TypeError, ValueError): + return None + if not (math.isfinite(nvl) and math.isfinite(rdma)): # skip NaN/Inf (keeps gmax finite) + return None + return backend_l, op_l, nvl, rdma + + +def _moe_bus_bars(rows: list[object], op_filter: str) -> list[tuple[str, float, str]]: + """For one operation: ordered ``(backend_label, bus_bw, color)`` per EP backend.""" + order: list[str] = [] + data: dict[str, float] = {} + for row in rows: + ex = _extract_moe_bus_bw(row) + if ex is None: + continue + backend, op, val = ex + if op != op_filter: + continue + if backend not in data: + order.append(backend) + data[backend] = val out: list[tuple[str, float, str]] = [] - if "dispatch" in by_op: - out.append(("MoE dispatch", by_op["dispatch"], "#2ca02c")) - if "combine" in by_op: - out.append(("MoE combine", by_op["combine"], "#31a354")) + for b in order: + base = b[:-3] if b.endswith("_ll") else b # low-latency tags are "_ll" + out.append((_DEEPEP_BACKEND_LABEL.get(base, base), data[b], _DEEPEP_BACKEND_COLOR.get(base, "#2ca02c"))) return out +def _moe_nvl_rdma_bars(rows: list[object], op_filter: str) -> list[tuple[str, float, float]]: + """For one operation: ordered ``(backend_label, nvl_bw, rdma_bw)`` per EP backend.""" + order: list[str] = [] + data: dict[str, tuple[float, float]] = {} + for row in rows: + ex = _extract_moe_separate_bw(row) + if ex is None: + continue + backend, op, nvl, rdma = ex + if op != op_filter: + continue + if backend not in data: + order.append(backend) + data[backend] = (nvl, rdma) + return [(_DEEPEP_BACKEND_LABEL.get(b[:-3] if b.endswith("_ll") else b, b), data[b][0], data[b][1]) for b in order] + + def _mean_ucc_bus_bw_gb_s(test_output: Path) -> float | None: for name in ("stdout.txt", "ucc_perftest_capture.log"): path = test_output / name @@ -105,7 +199,7 @@ def _parse_ucc_perftest_mean_bus_avg(path: Path) -> float | None: bavg = float(parts[5]) except ValueError: continue - if sz < 1048576: + if sz < 1048576 or not math.isfinite(bavg): continue avgs.append(bavg) if not avgs: @@ -120,105 +214,194 @@ def _mean_nccl_oop_busbw_gb_s(test_output: Path) -> float | None: vals: list[float] = [] for parts in rows: try: - vals.append(float(parts[7])) + v = float(parts[7]) except (IndexError, ValueError): continue + if not math.isfinite(v): + continue + vals.append(v) if not vals: return None return float(sum(vals) / len(vals)) -def _write_moe_throughput_svg( - path: Path, - *, - scenario_name: str, - labels: list[str], - values: list[float], - colors: list[str], - y_axis_label: str, -) -> None: - """Bar chart + value markers; standalone SVG.""" - n = len(labels) - ml, mr, mt = 72, 44, 72 - ih = 300 - mb = max(100, 36 + n * 18) - h = mt + ih + mb - w = max(720, min(1280, ml + mr + max(1, n) * 92)) - - iw = w - ml - mr - y0 = mt + ih - vmin, vmax = 0.0, max(values) * 1.12 if values else 1.0 - if vmax <= vmin: - vmax = vmin + 1.0 - - def ypx(v: float) -> float: - return y0 - (v - vmin) / (vmax - vmin) * ih +# --------------------------------------------------------------------------- +# Panel renderers — each draws one sub-chart inside the rect (x0,y0,pw,ph) of the +# parent SVG, returning a list of SVG fragment strings. Shared internal margins. +# --------------------------------------------------------------------------- +_PML, _PMT, _PMR, _PMB = 48, 46, 16, 54 + + +def _panel_frame( + x0: float, y0: float, pw: float, ph: float, title: str, subtitle: str, vmax: float +) -> tuple[list[str], float, float, float, float, float]: + """Title/subtitle + axes + gridlines for a panel; returns (frags, ax0, ay0, ax1, ay1, vmax).""" + ax0, ay0 = x0 + _PML, y0 + _PMT + ax1, ay1 = x0 + pw - _PMR, y0 + ph - _PMB + if vmax <= 0.0: + vmax = 1.0 + f = [ + f'{html.escape(title)}', + f'{html.escape(subtitle)}', + f'', + f'', + ] + ih = ay1 - ay0 + for g in (0.25, 0.5, 0.75, 1.0): + gy = ay1 - g * ih + f.append(f'') + f.append( + f'{g * vmax:.0f}' + ) + return f, ax0, ay0, ax1, ay1, vmax - slot = iw / max(n, 1) - bar_w = min(56.0, slot * 0.55) - centers = [ml + (i + 0.5) * slot for i in range(n)] - pts = [(cx, ypx(v)) for cx, v in zip(centers, values, strict=True)] - parts: list[str] = [ - '', - f'', - '', - f'{html.escape(scenario_name)}', - f'{html.escape(y_axis_label)}', - f'', - f'', - ] +def _xlabel(cx: float, y: float, label: str) -> str: + return ( + f'{html.escape(label)}' + ) - for g in (0.25, 0.5, 0.75): - gy = y0 - g * ih - parts.append(f'') - gv = vmin + g * (vmax - vmin) - parts.append( - f'{gv:.1f}' - ) - for cx, val, col, _ in zip(centers, values, colors, labels, strict=True): - top = ypx(val) - x1 = cx - bar_w / 2 - hbar = y0 - top - parts.append( - f'' - ) +def _panel_single(x0, y0, pw, ph, title, subtitle, entries, vmax) -> list[str]: + """ + Single-series bar panel: entries = [(label, value, color)]. - for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): - col_esc = html.escape(col) - parts.append(f'') - parts.append( - f'{val:.2f}' + `vmax` is shared across all panels so bar heights are directly comparable between charts. + """ + f, ax0, ay0, ax1, ay1, vmax = _panel_frame(x0, y0, pw, ph, title, subtitle, vmax) + iw, ih = ax1 - ax0, ay1 - ay0 + n = len(entries) + slot = iw / max(n, 1) + bw = min(30.0, slot * 0.6) + for i, (lab, val, col) in enumerate(entries): + cx = ax0 + (i + 0.5) * slot + top = ay1 - (val / vmax) * ih + f.append( + f'' ) - parts.append( - f'' - f"{html.escape(lab)}" + f.append( + f'{val:.0f}' ) - - y_axis_mid = mt + ih / 2 - parts.append( - f'{html.escape(y_axis_label)}' + f.append(_xlabel(cx, ay1 + 12, lab)) + return f + + +def _panel_grouped(x0, y0, pw, ph, title, subtitle, entries, vmax) -> list[str]: + """ + Two-series grouped bar panel (NVLink + RDMA): entries = [(label, nvl, rdma)]. + + `vmax` is shared across all panels for comparable bar heights. + """ + f, ax0, ay0, ax1, ay1, vmax = _panel_frame(x0, y0, pw, ph, title, subtitle, vmax) + iw, ih = ax1 - ax0, ay1 - ay0 + # Legend on the subtitle line (under the panel title), right-aligned. + ly = y0 + 33 + lx = x0 + pw - _PMR - 188 + f.append(f'') + f.append(f'NVLink') + f.append( + f'' ) + f.append(f'RDMA') + n = len(entries) + slot = iw / max(n, 1) + bw = min(18.0, slot * 0.34) + for i, (lab, nvl, rdma) in enumerate(entries): + gc = ax0 + (i + 0.5) * slot + for val, fill, stroke, dx in ( + (nvl, _NVL_COLOR, _NVL_COLOR, -0.56), + (rdma, "url(#rdmaHatch)", _RDMA_COLOR, 0.56), + ): + bx = gc + dx * bw - bw / 2 + top = ay1 - (val / vmax) * ih + f.append( + f'' + ) + f.append( + f'{val:.0f}' + ) + f.append(_xlabel(gc, ay1 + 12, lab)) + return f - leg_y = y0 + 38 - parts.append(f'Summary') - for i, (lab, val, col) in enumerate(zip(labels, values, colors, strict=True)): - parts.append( - f'' - f'{html.escape(lab)}' - f": {val:.4f} GB/s" - ) +def _write_dashboard_svg( + path: Path, + *, + scenario_name: str, + bus_dispatch: list[tuple[str, float, str]], + bus_combine: list[tuple[str, float, str]], + nvl_dispatch: list[tuple[str, float, float]], + nvl_combine: list[tuple[str, float, float]], + subtitle: str = "", +) -> None: + """ + One SVG dashboard, laid out as a grid of ONLY the panels that have data. + + Columns = operations present (dispatch, and combine if measured); rows = metrics + present ("Bus BW" always; "NVLink vs RDMA" only when the backend reports the split — + e.g. standard mode does, low-latency mode does not). So HT renders 2x2, a + dispatch-only HT run renders 2x1, and a low-latency run renders a single bus row. + All panels share ONE y-axis ceiling (multiple of 50, above the global max) so bars + are comparable. `subtitle` carries the run config (tokens/hidden/top-k/cluster/nodes). + """ + sub_bus = "all backends · UCC/NCCL = baselines (gray)" + sub_split = "EP backends only" + # Rows present: bus always (if any bus data); nvl/rdma only if the split exists. + rows: list[tuple[str, Any, dict[str, Any], str]] = [ + ("Bus BW", _panel_single, {"dispatch": bus_dispatch, "combine": bus_combine}, sub_bus) + ] + if nvl_dispatch or nvl_combine: + rows.append(("NVLink vs RDMA", _panel_grouped, {"dispatch": nvl_dispatch, "combine": nvl_combine}, sub_split)) + # Columns present: dispatch always; combine only if measured. + cols = ["dispatch"] + if bus_combine or nvl_combine: + cols.append("combine") + + pw, ph, gap, top = 480.0, 340.0, 26.0, 58.0 + ncol, nrow = len(cols), len(rows) + w = gap * (ncol + 1) + pw * ncol + h = top + gap * (nrow + 1) + ph * nrow + + allv = ( + [v for _, v, _ in bus_dispatch] + + [v for _, v, _ in bus_combine] + + [v for _, n, r in nvl_dispatch for v in (n, r)] + + [v for _, n, r in nvl_combine for v in (n, r)] + ) + gmax = max(allv) if allv else 1.0 + vmax = (int(gmax * 1.1 // 50) + 1) * 50.0 # nice ceiling, multiple of 50, ~10% headroom + + parts: list[str] = [ + '', + f'', + ( + '' + f'' + '' + "" + ), + '', + f'' + f"{html.escape(scenario_name)} — MoE EP bandwidth", + f'{html.escape(subtitle)}', + ] + for ri, (row_title, panel_fn, data_by_op, sub) in enumerate(rows): + for ci, op in enumerate(cols): + x0 = gap + ci * (pw + gap) + y0 = top + gap + ri * (ph + gap) + parts += panel_fn(x0, y0, pw, ph, f"{row_title} — {op}", sub, data_by_op[op], vmax) parts.append("") path.write_text("\n".join(parts), encoding="utf-8") class MoEBenchmarkThroughputReporter(Reporter): - """After the scenario finishes, write one standalone SVG chart under the results root.""" + """After the scenario finishes, write one standalone 2x2 SVG under the results root.""" def generate(self) -> None: self.load_test_runs() @@ -227,48 +410,62 @@ def generate(self) -> None: logging.debug("Skipping moe_benchmark_throughput: no MoEBenchmark test in scenario.") return - categories: list[str] = [] - values: list[float] = [] - colors: list[str] = [] + rows = _read_moe_results_rows(moe_trs[0].output_path) + bus_dispatch = _moe_bus_bars(rows, "dispatch") + bus_combine = _moe_bus_bars(rows, "combine") + nvl_dispatch = _moe_nvl_rdma_bars(rows, "dispatch") + nvl_combine = _moe_nvl_rdma_bars(rows, "combine") - moe_bars = _moe_benchmark_dispatch_combine_bars(moe_trs[0].output_path) - if not moe_bars: + if not bus_dispatch and not bus_combine: logging.warning( "Skipping moe_benchmark_throughput: no dispatch/combine bus_bw_avg in results.json under %s", moe_trs[0].output_path, ) return - for lab, val, col in moe_bars: - categories.append(lab) - values.append(val) - colors.append(col) + # Whether the EP backends actually MEASURED combine (benchmark_combine=true). Decide + # this from EP data ONLY, before injecting baselines — otherwise the baselines below + # would make bus_combine non-empty and resurrect an (otherwise empty) combine column + # for a dispatch-only run. + has_combine = bool(bus_combine or nvl_combine) + + # Append the UCC / NCCL all-to-all-v baselines (single value each, repeated as a + # reference). They mirror only the operations the EP backends measured: always + # dispatch, and combine ONLY when there's real EP combine data. No NVLink/RDMA + # split -> not added to the bottom panels. ucc_trs = [tr for tr in self.trs if isinstance(tr.test, UCCTestDefinition)] if ucc_trs: uval = _mean_ucc_bus_bw_gb_s(ucc_trs[0].output_path) if uval is not None: - categories.append("UCC") - values.append(uval) - colors.append("#1f77b4") - else: - logging.debug("UCC test present but bus bandwidth not parsed from outputs.") - + bus_dispatch.append(("UCC", uval, _BASELINE_COLOR)) + if has_combine: + bus_combine.append(("UCC", uval, _BASELINE_COLOR)) nccl_trs = [tr for tr in self.trs if isinstance(tr.test, NCCLTestDefinition)] if nccl_trs: nval = _mean_nccl_oop_busbw_gb_s(nccl_trs[0].output_path) if nval is not None: - categories.append("NCCL") - values.append(nval) - colors.append("#ff7f0e") - else: - logging.debug("NCCL test present but perf table not parsed from stdout.") + bus_dispatch.append(("NCCL", nval, _BASELINE_COLOR)) + if has_combine: + bus_combine.append(("NCCL", nval, _BASELINE_COLOR)) + + # Run config for the header subtitle: tokens / hidden / top-k / cluster / nodes. + ca = moe_trs[0].test.cmd_args + nn = moe_trs[0].num_nodes + if isinstance(nn, list): + nn = nn[0] if nn else "?" + cluster = getattr(self.system, "name", "?") + subtitle = ( + f"{ca.mode} · tokens={ca.tokens} · hidden={ca.hidden_size} · top-k={ca.num_topk} · " + f"{ca.data_type} · {cluster} · {nn} nodes" + ) out = self.results_root / f"{self.test_scenario.name}-moe-throughput.svg" - _write_moe_throughput_svg( + _write_dashboard_svg( out, scenario_name=self.test_scenario.name, - labels=categories, - values=values, - colors=colors, - y_axis_label="Mean bus bandwidth (GB/s)", + subtitle=subtitle, + bus_dispatch=bus_dispatch, + bus_combine=bus_combine, + nvl_dispatch=nvl_dispatch, + nvl_combine=nvl_combine, ) diff --git a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py index ce1fffeb1..9920e2caf 100644 --- a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py @@ -18,7 +18,11 @@ from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy -from cloudai.workloads.common.moe_benchmark_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root +from cloudai.workloads.common.moe_benchmark_report import ( + DEEPEP_MATRIX_SINGLE_SBATCH_REASON, + MOE_BENCHMARK_PREV_MOUNT, + moe_benchmark_root, +) from .nccl import NCCLTestDefinition @@ -56,6 +60,10 @@ def _nccl_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: class NcclTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for NCCL tests on Slurm systems.""" + def single_sbatch_unsupported_reason(self) -> str | None: + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + return DEEPEP_MATRIX_SINGLE_SBATCH_REASON if tdef.cmd_args.use_deepep_matrix else None + def _deepep_nccl_matrix_host_path(self) -> Path | None: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) if not tdef.cmd_args.use_deepep_matrix: diff --git a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py index 682b799eb..53f983f05 100644 --- a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py @@ -18,7 +18,11 @@ from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy -from cloudai.workloads.common.moe_benchmark_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root +from cloudai.workloads.common.moe_benchmark_report import ( + DEEPEP_MATRIX_SINGLE_SBATCH_REASON, + MOE_BENCHMARK_PREV_MOUNT, + moe_benchmark_root, +) from .ucc import UCCCmdArgs, UCCTestDefinition @@ -41,6 +45,10 @@ def _ucc_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: class UCCTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for UCC tests on Slurm systems.""" + def single_sbatch_unsupported_reason(self) -> str | None: + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + return DEEPEP_MATRIX_SINGLE_SBATCH_REASON if tdef.cmd_args.use_deepep_matrix else None + def _deepep_ucc_matrix_host_path(self) -> Path | None: tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) if not tdef.cmd_args.use_deepep_matrix: diff --git a/tests/ref_data/moe-benchmark.sbatch b/tests/ref_data/moe-benchmark.sbatch index 65a5ed89d..9b5d2fb09 100644 --- a/tests/ref_data/moe-benchmark.sbatch +++ b/tests/ref_data/moe-benchmark.sbatch @@ -22,9 +22,22 @@ echo Head Node IP: $head_node_ip export MASTER_ADDR=$head_node_ip export MASTER_PORT=29500 +# Hybrid-EP NIXL rendezvous: etcd on the head node (background). +srun --overlap --nodes=1 --ntasks=1 -w "$head_node" --container-image=docker/image:url bash -c "etcd --log-level error --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://$head_node_ip:2379 --data-dir /tmp/etcd-cloudai-$$" & +_MOE_ETCD_PID=$! +trap 'kill ${_MOE_ETCD_PID} 2>/dev/null || true' EXIT +export NIXL_ETCD_ENDPOINTS=http://$head_node_ip:2379 +# Poll etcd's client endpoint until it accepts connections (replaces a +# fixed sleep that could race a not-yet-ready endpoint); fail fast otherwise. +for _i in $(seq 1 60); do + (exec 3<>/dev/tcp/$head_node_ip/2379) 2>/dev/null && break + [ "$_i" -eq 60 ] && { echo "ERROR: etcd $head_node_ip:2379 not ready after 60s" >&2; exit 1; } + sleep 1 +done -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/DeepEP/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python /workspace/dp-benchmark/benchmark/benchmark.py /tmp/config.yaml" +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/DeepEP/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/DeepEP/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml legacy && python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml elastic && NCCL_IB_GID_INDEX=auto NCCL_NET_PLUGIN=none NCCL_IB_QPS_PER_CONNECTION=4 NCCL_IB_SPLIT_DATA_ON_QPS=0 NCCL_IBEXT_DISABLE=0 OMPI_MCA_btl=tcp,self EP_SUPPRESS_NCCL_CHECK=1 NCCL_GIN_TYPE=3 python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml nixl_ep && python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml uccl_ep && NCCL_IB_GID_INDEX=auto NCCL_NET_PLUGIN=none NCCL_IB_QPS_PER_CONNECTION=4 NCCL_IB_SPLIT_DATA_ON_QPS=0 NCCL_IBEXT_DISABLE=0 OMPI_MCA_btl=tcp,self NCCL_GIN_TYPE=3 EP_SKIP_DEEPEP_PREIMPORT=1 LD_LIBRARY_PATH=/opt/nccl-ep-build/lib:$LD_LIBRARY_PATH python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml nccl_ep && NCCL_IB_GID_INDEX=auto NUM_OF_TOKENS_PER_CHUNK_DISPATCH_API=64 NUM_OF_TOKENS_PER_CHUNK_COMBINE_API=64 NUM_OF_TOKENS_PER_CHUNK_PREPROCESSING_API=64 CPATH=/usr/local/ucx-nixl/include:/usr/local/nixl/include:/usr/local/nixl/include/gpu/ucx:$CPATH CPLUS_INCLUDE_PATH=/usr/local/ucx-nixl/include:/usr/local/nixl/include:/usr/local/nixl/include/gpu/ucx:$CPATH python /workspace/DeepEP/benchmark/benchmark.py /tmp/config.yaml deepep_hybrid" diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 74d37c917..397c52f7e 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -588,6 +588,7 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - test_template_name="MoEBenchmark", cmd_args=MoEBenchmarkCmdArgs( docker_image_url="docker/image:url", + deepep_versions=["legacy", "elastic", "nixl_ep", "uccl_ep", "nccl_ep", "deepep_hybrid"], ), ), ), diff --git a/tests/test_single_sbatch_runner.py b/tests/test_single_sbatch_runner.py index 91d3cdf27..d17320648 100644 --- a/tests/test_single_sbatch_runner.py +++ b/tests/test_single_sbatch_runner.py @@ -487,6 +487,20 @@ def test_dse_and_non_dse(self, nccl_tr: TestRun, slurm_system: SlurmSystem) -> N assert runner.on_job_submit.call_count == 3 # 2 dse runs + 1 non-dse run +def test_rejects_single_sbatch_incompatible(nccl_tr: TestRun, slurm_system: SlurmSystem) -> None: + cast(NCCLTestDefinition, nccl_tr.test).cmd_args.use_deepep_matrix = True + tc = TestScenario(name="tc", test_runs=[nccl_tr]) + runner = SingleSbatchRunner(mode="run", system=slurm_system, test_scenario=tc, output_path=slurm_system.output_path) + + with pytest.raises(ValueError) as exc_info: + runner.gen_sbatch_content() + + msg = str(exc_info.value) + assert "single-sbatch mode" in msg + assert nccl_tr.name in msg + assert "use_deepep_matrix" in msg + + def test_store_job_metadata(nccl_tr: TestRun, slurm_system: SlurmSystem) -> None: tc = TestScenario(name="tc", test_runs=[nccl_tr]) runner = SingleSbatchRunner(mode="run", system=slurm_system, test_scenario=tc, output_path=slurm_system.output_path)