From 72562158f4653616027abdfa5cb2e89341bc7acf Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 14:08:59 -0700 Subject: [PATCH 1/7] Adding IEC check from Prefect variable --- orchestration/flows/bl832/dispatcher.py | 79 ++++++++++++++----------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index d28a02eb..b88158d2 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -143,6 +143,17 @@ async def dispatcher( logger.error(f"Invalid input parameters: {e}") raise + # Override is_export_control from Prefect Variable (safety net for IEC checkbox bug) + iec_override = Variable.get("is_export_control", default=False, _sync=True) + if iec_override: + logger.warning( + "is_export_control Prefect Variable is True — forcing IEC mode " + "(skipping NERSC transfer, reconstruction, and segmentation)" + ) + else: + logger.info("is_export_control Prefect Variable is False — proceeding with normal flow") + inputs.is_export_control = inputs.is_export_control or iec_override + # Run new_file_832 first (synchronously) available_params = inputs.model_dump() try: @@ -162,39 +173,42 @@ async def dispatcher( # Optionally, raise a specific ValueError raise ValueError("new_file_832 task Failed") from e - # Prepare ALCF and NERSC flows to run asynchronously, based on settings - tasks = [] - if decision_settings.get("alcf_recon_flow/alcf_recon_flow"): - alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) - tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) - - if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): - nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) - tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) - - if decision_settings.get("nersc_petiole_segment_flow/nersc_petiole_segment_flow"): - nersc_petiole_segment_params = FlowParameterMapper.get_flow_parameters( - "nersc_petiole_segment_flow/nersc_petiole_segment_flow", available_params - ) - tasks.append( - run_recon_flow_async("nersc_petiole_segment_flow/nersc_petiole_segment_flow", nersc_petiole_segment_params) - ) - - if decision_settings.get("nersc_moon_segment_flow/nersc_moon_segment_flow"): - moon_params = FlowParameterMapper.get_flow_parameters( - "nersc_moon_segment_flow/nersc_moon_segment_flow", available_params - ) - tasks.append(run_recon_flow_async("nersc_moon_segment_flow/nersc_moon_segment_flow", moon_params)) - - # Run ALCF and NERSC flows in parallel, if any - if tasks: - try: - await asyncio.gather(*tasks) - except Exception as e: - logger.error(f"Failed to run one or more tasks: {e}") - raise + if inputs.is_export_control: + logger.info("is_export_control=True — skipping ALCF recon, NERSC recon, and segmentation flows.") else: - logger.info("No ALCF or NERSC tasks to run based on decision settings.") + # Prepare ALCF and NERSC flows to run asynchronously, based on settings + tasks = [] + if decision_settings.get("alcf_recon_flow/alcf_recon_flow"): + alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) + tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): + nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) + tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) + + if decision_settings.get("nersc_petiole_segment_flow/nersc_petiole_segment_flow"): + nersc_petiole_segment_params = FlowParameterMapper.get_flow_parameters( + "nersc_petiole_segment_flow/nersc_petiole_segment_flow", available_params + ) + tasks.append( + run_recon_flow_async("nersc_petiole_segment_flow/nersc_petiole_segment_flow", nersc_petiole_segment_params) + ) + + if decision_settings.get("nersc_moon_segment_flow/nersc_moon_segment_flow"): + moon_params = FlowParameterMapper.get_flow_parameters( + "nersc_moon_segment_flow/nersc_moon_segment_flow", available_params + ) + tasks.append(run_recon_flow_async("nersc_moon_segment_flow/nersc_moon_segment_flow", moon_params)) + + # Run ALCF and NERSC flows in parallel, if any + if tasks: + try: + await asyncio.gather(*tasks) + except Exception as e: + logger.error(f"Failed to run one or more tasks: {e}") + raise + else: + logger.info("No ALCF or NERSC tasks to run based on decision settings.") return None @@ -209,7 +223,6 @@ async def dispatcher( setup_decision_settings( alcf_recon=True, nersc_recon=True, - nersc_recon_multinode=True, new_file_832=True ) # Run the main decision flow with the specified parameters From 50f9552671d64649b69de3816661fe49b5247262 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 14:10:21 -0700 Subject: [PATCH 2/7] Adding logger message when IEC is true, that NERSC transfer and Scicat ingestion is skipped --- orchestration/flows/bl832/move.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 1d9ea158..71009e78 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -157,6 +157,8 @@ def process_new_832_file_task( scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec=TOMO_INGESTOR_SPEC) except Exception as e: logger.error(f"SciCat ingest failed with {e}") + elif is_export_control: + logger.info("File is export controlled. Skipping NERSC transfer and SciCat ingest.") # Holding off from moving and registering Raw Data to Beegfs Tiled for storage concerns. From ce1fa161459c8dfedae738de825f641e1f4e371c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 14:11:03 -0700 Subject: [PATCH 3/7] Moving dispatcher flow test down a level --- .../_tests/test_bl832/test_dispatcher.py | 280 ++++++++++++++++++ orchestration/_tests/test_globus_flow.py | 56 ---- 2 files changed, 280 insertions(+), 56 deletions(-) create mode 100644 orchestration/_tests/test_bl832/test_dispatcher.py diff --git a/orchestration/_tests/test_bl832/test_dispatcher.py b/orchestration/_tests/test_bl832/test_dispatcher.py new file mode 100644 index 00000000..c71458b8 --- /dev/null +++ b/orchestration/_tests/test_bl832/test_dispatcher.py @@ -0,0 +1,280 @@ +"""Tests for the bl832 dispatcher flow. +""" +import asyncio +import warnings +from uuid import UUID, uuid4, uuid5 + +import pytest +from prefect.blocks.system import Secret +from prefect.testing.utilities import prefect_test_harness +from prefect.variables import Variable +from pytest_mock import MockFixture + +from orchestration._tests.test_globus import MockTransferClient + +warnings.filterwarnings("ignore", category=DeprecationWarning) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True, scope="session") +def bl832_dispatcher_prefect_fixture(): + """Set up Prefect test harness and bl832 variables/secrets for the session.""" + with prefect_test_harness(): + Secret(value=str(uuid4())).save(name="globus-client-id", overwrite=True) + Secret(value=str(uuid4())).save(name="globus-client-secret", overwrite=True) + Secret(value=str(uuid4())).save(name="globus-compute-endpoint", overwrite=True) + + Variable.set( + name="pruning-config", + value={"max_wait_seconds": 600}, + overwrite=True, + _sync=True, + ) + Variable.set( + name="decision-settings", + value={ + "alcf_recon_flow/alcf_recon_flow": True, + "nersc_recon_flow/nersc_recon_flow": True, + "nersc_petiole_segment_flow/nersc_petiole_segment_flow": True, + "nersc_moon_segment_flow/nersc_moon_segment_flow": True, + "new_832_file_flow/new_file_832": True, + }, + overwrite=True, + _sync=True, + ) + Variable.set( + name="alcf-allocation-root-path", + value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"}, + overwrite=True, + _sync=True, + ) + Variable.set( + name="bl832-settings", + value={ + "delete_spot832_files_after_days": 1, + "delete_data832_files_after_days": 35, + }, + overwrite=True, + _sync=True, + ) + + yield + + +@pytest.fixture(autouse=True) +def reset_iec_variable(): + """Reset is_export_control Prefect Variable to False before each test. + + Tests that need a different starting state (True, or missing) should + override this within the test body after the fixture runs. + """ + Variable.set(name="is_export_control", value=False, overwrite=True, _sync=True) + yield + + +# --------------------------------------------------------------------------- +# Mocks +# --------------------------------------------------------------------------- + + +class MockEndpoint: + """Mock Globus endpoint.""" + + def __init__(self, root_path: str, uuid_value: str = None, name: str = None) -> None: + self.root_path = root_path + self.uuid = uuid_value or str(uuid4()) + self.uri = f"mock_endpoint_uri_{self.uuid}" + self.name = name or f"mock_endpoint_{self.uuid[:8]}" + + +class MockSecret: + """Deterministic secret values for tests.""" + + value = "550e8400-e29b-41d4-a716-446655440000" + + @staticmethod + def for_endpoint(endpoint_name: str) -> str: + """Generate a deterministic UUID string based on endpoint name.""" + namespace = UUID("12345678-1234-5678-1234-123456789012") + return str(uuid5(namespace, endpoint_name)) + + +class MockConfig832: + """Mock Config832 to avoid real Globus initialization.""" + + def __init__(self) -> None: + self.endpoints = { + "spot832": MockEndpoint("mock_spot832_path", MockSecret.for_endpoint("spot832")), + "data832": MockEndpoint("mock_data832_path", MockSecret.for_endpoint("data832")), + "nersc832": MockEndpoint("mock_nersc832_path", MockSecret.for_endpoint("nersc832")), + "alcf832_raw": MockEndpoint( + "mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw") + ), + "alcf832_scratch": MockEndpoint( + "mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch") + ), + "beegfs_raw": MockEndpoint( + "mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw") + ), + "beegfs_scratch": MockEndpoint( + "mock_beegfs_scratch_path", MockSecret.for_endpoint("beegfs_scratch") + ), + } + self.tc = MockTransferClient() + self.spot832 = self.endpoints["spot832"] + self.data832 = self.endpoints["data832"] + self.nersc832 = self.endpoints["nersc832"] + self.alcf832_raw = self.endpoints["alcf832_raw"] + self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.beegfs_raw = self.endpoints["beegfs_raw"] + self.beegfs_scratch = self.endpoints["beegfs_scratch"] + self.scicat = "mock_scicat_value" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +# Deployment names the dispatcher launches when not in IEC mode +DOWNSTREAM_DEPLOYMENTS = { + "alcf_recon_flow/alcf_recon_flow", + "nersc_recon_flow/nersc_recon_flow", + "nersc_petiole_segment_flow/nersc_petiole_segment_flow", + "nersc_moon_segment_flow/nersc_moon_segment_flow", +} + + +def _setup_dispatcher_mocks(mocker: MockFixture): + """Patch external dependencies of the dispatcher flow. + + Returns: + Tuple of (mock_run_deployment, mock_process_new_832_file_task) + for tests to assert against. + """ + mocker.patch("prefect.blocks.system.Secret.load", return_value=MockSecret()) + + mock_process = mocker.patch( + "orchestration.flows.bl832.dispatcher.process_new_832_file_task", + return_value=None, + ) + + mock_run_deployment = mocker.patch( + "orchestration.flows.bl832.dispatcher.run_deployment", + new=mocker.AsyncMock(return_value=None), + ) + + return mock_run_deployment, mock_process + + +def _called_deployment_names(mock_run_deployment) -> set: + """Extract the set of deployment names that run_deployment was called with.""" + return { + call.kwargs.get("name") or (call.args[0] if call.args else None) + for call in mock_run_deployment.call_args_list + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_dispatcher_happy_path(mocker: MockFixture): + """Non-IEC param, Variable=False: process runs, all 4 downstream deployments launch.""" + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + + from orchestration.flows.bl832.dispatcher import dispatcher + + result = asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=False, + config=MockConfig832(), + ) + ) + + assert result is None + mock_process.assert_called_once() + assert mock_process.call_args.kwargs["is_export_control"] is False + assert _called_deployment_names(mock_run_deployment) == DOWNSTREAM_DEPLOYMENTS + + +def test_dispatcher_iec_param_true_skips_downstream(mocker: MockFixture): + """IEC param=True, Variable=False: process runs with IEC=True, no downstream deployments.""" + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + + from orchestration.flows.bl832.dispatcher import dispatcher + + result = asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=True, + config=MockConfig832(), + ) + ) + + assert result is None + mock_process.assert_called_once() + assert mock_process.call_args.kwargs["is_export_control"] is True + mock_run_deployment.assert_not_called() + + +def test_dispatcher_iec_variable_overrides(mocker: MockFixture): + """IEC param=False, Variable=True: OR-merge forces IEC mode, no downstream deployments.""" + Variable.set(name="is_export_control", value=True, overwrite=True, _sync=True) + + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + + from orchestration.flows.bl832.dispatcher import dispatcher + + result = asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=False, + config=MockConfig832(), + ) + ) + + assert result is None + mock_process.assert_called_once() + assert mock_process.call_args.kwargs["is_export_control"] is True + mock_run_deployment.assert_not_called() + + +def test_dispatcher_iec_variable_missing_defaults_safe(mocker: MockFixture): + """is_export_control Variable absent: Variable.get's default=False keeps normal flow.""" + real_variable_get = Variable.get + + def selective_get(name, default=None, *args, **kwargs): + if name == "is_export_control": + # Simulate the variable not existing: return the default + return default + # Pass other variables through to the real implementation + return real_variable_get(name, *args, **kwargs) + + mocker.patch( + "orchestration.flows.bl832.dispatcher.Variable.get", + side_effect=selective_get, + ) + + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + + from orchestration.flows.bl832.dispatcher import dispatcher + + result = asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=False, + config=MockConfig832(), + ) + ) + + assert result is None + mock_process.assert_called_once() + assert mock_process.call_args.kwargs["is_export_control"] is False + assert _called_deployment_names(mock_run_deployment) == DOWNSTREAM_DEPLOYMENTS diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index beb21965..7e9147fa 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -1,5 +1,3 @@ -import asyncio -# import uuid from unittest.mock import MagicMock from uuid import UUID, uuid4, uuid5 import warnings @@ -176,60 +174,6 @@ def __init__(self) -> None: self.scicat = config["scicat"] -def test_832_dispatcher(mocker: MockFixture): - """Test 832 uber decision flow.""" - - # Mock the Secret block load using a simple manual mock class - - mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) - - mock_prune_controller = mocker.MagicMock() - mock_prune_controller.prune.return_value = True - mocker.patch('orchestration.flows.bl832.move.get_prune_controller', return_value=mock_prune_controller) - - # Mock read_deployment_by_name with a manually defined mock class - class MockDeployment: - def __init__(self): - self.id = str(uuid4()) # Add this line - self.version = "1.0.0" - self.flow_id = str(uuid4()) - self.name = "test_deployment" - - mocker.patch('prefect.client.orchestration.PrefectClient.read_deployment_by_name', - return_value=MockDeployment()) - - # Mock run_deployment to avoid executing any Prefect workflows - async def mock_run_deployment(*args, **kwargs): - return None - - mocker.patch('prefect.deployments.run_deployment', new=mock_run_deployment) - - # Mock asyncio.gather to avoid actual async task execution - async def mock_gather(*args, **kwargs): - # Await any coroutines so we don't leak warnings, but don't care about results - for arg in args: - if asyncio.iscoroutine(arg): - try: - await arg - except Exception: - pass - return [None] - - mocker.patch('asyncio.gather', new=mock_gather) - # Import decision flow after mocking the necessary components - from orchestration.flows.bl832.dispatcher import dispatcher - - # Run the decision flow - result = asyncio.run(dispatcher( - file_path="/global/raw/transfer_tests/test.txt", - is_export_control=False, - config=MockConfig832() - )) - - # Ensure the flow runs without throwing an error - assert result is None, "The decision flow did not complete successfully." - - def test_alcf_recon_flow(mocker: MockFixture): """ Test the alcf_recon_flow in one function, covering: From cce6722bafeacc3e693ecd66cd87449c447e9a66 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 14:16:08 -0700 Subject: [PATCH 4/7] Adding test cases for dispatcher --- .../_tests/test_bl832/test_dispatcher.py | 92 +++++++++++++++++-- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/orchestration/_tests/test_bl832/test_dispatcher.py b/orchestration/_tests/test_bl832/test_dispatcher.py index c71458b8..cb6b7555 100644 --- a/orchestration/_tests/test_bl832/test_dispatcher.py +++ b/orchestration/_tests/test_bl832/test_dispatcher.py @@ -20,6 +20,15 @@ # --------------------------------------------------------------------------- +DEFAULT_DECISION_SETTINGS = { + "alcf_recon_flow/alcf_recon_flow": True, + "nersc_recon_flow/nersc_recon_flow": True, + "nersc_petiole_segment_flow/nersc_petiole_segment_flow": True, + "nersc_moon_segment_flow/nersc_moon_segment_flow": True, + "new_832_file_flow/new_file_832": True, +} + + @pytest.fixture(autouse=True, scope="session") def bl832_dispatcher_prefect_fixture(): """Set up Prefect test harness and bl832 variables/secrets for the session.""" @@ -36,13 +45,7 @@ def bl832_dispatcher_prefect_fixture(): ) Variable.set( name="decision-settings", - value={ - "alcf_recon_flow/alcf_recon_flow": True, - "nersc_recon_flow/nersc_recon_flow": True, - "nersc_petiole_segment_flow/nersc_petiole_segment_flow": True, - "nersc_moon_segment_flow/nersc_moon_segment_flow": True, - "new_832_file_flow/new_file_832": True, - }, + value=DEFAULT_DECISION_SETTINGS, overwrite=True, _sync=True, ) @@ -76,6 +79,21 @@ def reset_iec_variable(): yield +@pytest.fixture(autouse=True) +def reset_decision_settings(): + """Reset decision-settings Prefect Variable before each test. + + Tests that mutate decision-settings won't leak state to subsequent tests. + """ + Variable.set( + name="decision-settings", + value=DEFAULT_DECISION_SETTINGS, + overwrite=True, + _sync=True, + ) + yield + + # --------------------------------------------------------------------------- # Mocks # --------------------------------------------------------------------------- @@ -278,3 +296,63 @@ def selective_get(name, default=None, *args, **kwargs): mock_process.assert_called_once() assert mock_process.call_args.kwargs["is_export_control"] is False assert _called_deployment_names(mock_run_deployment) == DOWNSTREAM_DEPLOYMENTS + + +def test_dispatcher_respects_decision_settings(mocker: MockFixture): + """Only deployments enabled in decision-settings should launch. + + Verifies the per-deployment `if decision_settings.get(...):` branches in dispatcher. + """ + Variable.set( + name="decision-settings", + value={ + "alcf_recon_flow/alcf_recon_flow": True, + "nersc_recon_flow/nersc_recon_flow": False, + "nersc_petiole_segment_flow/nersc_petiole_segment_flow": False, + "nersc_moon_segment_flow/nersc_moon_segment_flow": True, + "new_832_file_flow/new_file_832": True, + }, + overwrite=True, + _sync=True, + ) + + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + + from orchestration.flows.bl832.dispatcher import dispatcher + + asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=False, + config=MockConfig832(), + ) + ) + + assert _called_deployment_names(mock_run_deployment) == { + "alcf_recon_flow/alcf_recon_flow", + "nersc_moon_segment_flow/nersc_moon_segment_flow", + } + mock_process.assert_called_once() + + +def test_dispatcher_raises_when_process_task_fails(mocker: MockFixture): + """If process_new_832_file_task raises, dispatcher wraps it in ValueError. + + Also verifies downstream deployments are NOT launched when the upstream + move task fails — confirming the synchronous-first ordering matters. + """ + mock_run_deployment, mock_process = _setup_dispatcher_mocks(mocker) + mock_process.side_effect = RuntimeError("disk full") + + from orchestration.flows.bl832.dispatcher import dispatcher + + with pytest.raises(ValueError, match="new_file_832 task Failed"): + asyncio.run( + dispatcher( + file_path="/global/raw/transfer_tests/test.txt", + is_export_control=False, + config=MockConfig832(), + ) + ) + + mock_run_deployment.assert_not_called() From 36d257f929dc219021efad039a5ef9f0e68a739e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 14:25:41 -0700 Subject: [PATCH 5/7] Adding a comment --- orchestration/flows/bl832/dispatcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index b88158d2..1560eb80 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -152,6 +152,7 @@ async def dispatcher( ) else: logger.info("is_export_control Prefect Variable is False — proceeding with normal flow") + # OR-merge so either the caller param OR the Prefect Variable can force IEC mode. inputs.is_export_control = inputs.is_export_control or iec_override # Run new_file_832 first (synchronously) From e1fbe4328fc72ccdb7db3885cd4c9109ed870bd7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 27 May 2026 10:25:34 -0700 Subject: [PATCH 6/7] removing stale reservations from config, and making sure NERSC jobs are set up for realtime/als/4 nodes --- config.yml | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/config.yml b/config.yml index 13572c53..fb0b9989 100644 --- a/config.yml +++ b/config.yml @@ -215,28 +215,28 @@ hpc_submission_settings832: # ── RECON + MULTIRES SETTINGS ─────────────────────────────────────────────── nersc_reconstruction: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: regular - account: amsc006 - reservation: "_CAP_SYNAPS_LIVEDEMO_CPU2" - num_nodes: 16 + qos: realtime + account: als + reservation: "" + num_nodes: 4 cpus-per-task: 128 walltime: "0:30:00" nersc_multiresolution: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: debug + qos: realtime account: als - reservation: "_CAP_SYNAPS_LIVEDEMO_CPU2" + reservation: "" cpus-per-task: 128 walltime: "0:15:00" # ── PETIOLE SEGMENTATION SETTINGS ─────────────────────────────────────────── nersc_segmentation_sam3: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: regular - account: amsc006 + qos: realtime + account: als constraint: gpu - reservation: "_CAP_SYNAPS_LIVEDEMO_GPU2" - num_nodes: 32 + reservation: "" + num_nodes: 4 ntasks-per-node: 1 gpus-per-node: 4 cpus-per-task: 128 @@ -263,11 +263,11 @@ hpc_submission_settings832: finetuned_checkpoint_path: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/sam3_finetune/sam3/checkpoint_v6.pt nersc_segmentation_dinov3: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: regular - account: amsc006 + qos: realtime + account: als constraint: gpu - reservation: "_CAP_SYNAPS_LIVEDEMO_GPU2" - num_nodes: 8 + reservation: "" + num_nodes: 4 ntasks-per-node: 1 nproc_per_node: 4 gpus-per-node: 4 @@ -283,10 +283,10 @@ hpc_submission_settings832: dino_checkpoint_path: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/dino/best.ckpt nersc_combine_segmentations: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: regular - account: amsc006 + qos: realtime + account: als constraint: cpu - reservation: "_CAP_SYNAPS_LIVEDEMO_CPU2" + reservation: "" num_nodes: 4 ntasks: 128 cpus-per-task: 1 @@ -302,10 +302,10 @@ hpc_submission_settings832: # ── MOON SEGMENTATION SETTINGS ─────────────────────────────────────────── nersc_segmentation_dinov3_moon: # ── SLURM resource allocation ───────────────────────────────────────────── - qos: regular + qos: realtime account: als constraint: gpu - reservation: "_CAP_TOMO_MOON_GPU2" + reservation: "" num_nodes: 4 ntasks-per-node: 1 nproc_per_node: 4 From b6de429874e64eb957ea5f8915fa72c609b1123e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 27 May 2026 10:26:01 -0700 Subject: [PATCH 7/7] Making SFAPI the default for job submission until IRIAPI support service accounts --- orchestration/_tests/test_bl832/test_nersc.py | 7 +- orchestration/flows/bl832/nersc.py | 268 ++++++++++-------- 2 files changed, 160 insertions(+), 115 deletions(-) diff --git a/orchestration/_tests/test_bl832/test_nersc.py b/orchestration/_tests/test_bl832/test_nersc.py index b8de0129..abf616fd 100644 --- a/orchestration/_tests/test_bl832/test_nersc.py +++ b/orchestration/_tests/test_bl832/test_nersc.py @@ -933,7 +933,7 @@ def test_petiole_segment_flow_recon_failure(mocker, mock_config832): def test_moon_segment_flow_succeeds(mocker, mock_config832, mock_recon_success): """Recon + DINOv3-moon both succeed → flow returns True.""" - from orchestration.flows.bl832.nersc import nersc_moon_segment_flow + from orchestration.flows.bl832.nersc import nersc_moon_segment_flow, NERSCLoginMethod mock_controller = mocker.MagicMock() mock_controller.reconstruct.return_value = mock_recon_success @@ -952,7 +952,10 @@ def test_moon_segment_flow_succeeds(mocker, mock_config832, mock_recon_success): assert result is True mock_controller.reconstruct.assert_called_once() mock_dinov3_task.submit.assert_called_once_with( - recon_folder_path="folder/recfile", config=mock_config832, project="moon" + recon_folder_path="folder/recfile", + config=mock_config832, + project="moon", + login_method=NERSCLoginMethod.SFAPI, ) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index b8701ded..f5850a61 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -164,7 +164,7 @@ def __init__( self, config: Config832, client: Client | httpx.Client | None = None, - login_method: NERSCLoginMethod = NERSCLoginMethod.IRIAPI, + login_method: NERSCLoginMethod = NERSCLoginMethod.SFAPI, ) -> None: TomographyHPCController.__init__(self, config) self.client = client @@ -179,7 +179,7 @@ def __init__( @staticmethod def create_nersc_client( config: Config832, - login_method: NERSCLoginMethod = NERSCLoginMethod.IRIAPI, + login_method: NERSCLoginMethod = NERSCLoginMethod.SFAPI, ) -> Client | httpx.Client: """Create and return a NERSC client for the requested login method. @@ -196,7 +196,7 @@ def create_nersc_client( Args: config: Config832 instance for accessing config settings needed during client creation. login_method: Which NERSC API to authenticate against. - Defaults to :attr:`NERSCLoginMethod.IRIAPI`. + Defaults to :attr:`NERSCLoginMethod.SFAPI`. Returns: An authenticated :class:`sfapi_client.Client` instance. @@ -1768,6 +1768,7 @@ def nersc_recon_flow( file_path: str, num_nodes: Optional[int] = 4, config: Optional[Config832] = None, + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI, ) -> bool: """ Perform multi-node tomography reconstruction on NERSC. @@ -1775,6 +1776,7 @@ def nersc_recon_flow( :param file_path: Path to the file to reconstruct. :param num_nodes: Number of nodes to use for reconstruction. :param config: Configuration object (if None, a default Config832 will be created). + :param login_method: Method to use for logging into NERSC (SFAPI or IRIAPI). :return: True if successful, False otherwise. """ logger = get_run_logger() @@ -1787,7 +1789,7 @@ def nersc_recon_flow( controller = get_controller( hpc_type=HPC.NERSC, config=config, - login_method=NERSCLoginMethod.SFAPI + login_method=login_method ) logger.info("NERSC reconstruction controller initialized") @@ -1943,7 +1945,7 @@ def nersc_petiole_segment_flow( file_path: str, config: Optional[Config832] = None, num_nodes: Optional[int] = None, - login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.IRIAPI + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> bool: """ Transfer raw data to NERSC, run reconstruction, then run SAM3 and DINOv3 @@ -1994,6 +1996,7 @@ def nersc_petiole_segment_flow( file_path=file_path, num_nodes=num_nodes, config=config, + login_method=login_method ) if isinstance(recon_result, dict): @@ -2030,24 +2033,24 @@ def nersc_petiole_segment_flow( logger.info("Reconstruction Successful.") # ── STEP 2: Transfer TIFFs to data832 ──────────────────────────────────── - # logger.info("Transferring reconstructed TIFFs from NERSC pscratch to data832") - # try: - # data832_tiff_future = globus_transfer_task.submit( - # file_path=scratch_path_tiff, - # source=config.nersc832_alsdev_pscratch_scratch, - # destination=config.data832_scratch, - # config=config, - # ) - # logger.info("TIFF transfer to data832 submitted.") - # except Exception as e: - # logger.error(f"Failed to transfer TIFFs to data832: {e}") - # data832_tiff_transfer_success = False + logger.info("Transferring reconstructed TIFFs from NERSC pscratch to data832") + try: + data832_tiff_future = globus_transfer_task.submit( + file_path=scratch_path_tiff, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.data832_scratch, + config=config, + ) + logger.info("TIFF transfer to data832 submitted.") + except Exception as e: + logger.error(f"Failed to transfer TIFFs to data832: {e}") + data832_tiff_transfer_success = False # ── STEP 3: SAM3 / DINOv3 ────────────────────────── logger.info("Submitting SAM3 and DINOv3 segmentation tasks concurrently.") sam3_future = nersc_segmentation_sam3_task.submit( - recon_folder_path=scratch_path_tiff, config=config + recon_folder_path=scratch_path_tiff, config=config, login_method=login_method ) dinov3_future = nersc_segmentation_dinov3_task.submit( recon_folder_path=scratch_path_tiff, config=config, project="petiole", login_method=login_method @@ -2059,15 +2062,15 @@ def nersc_petiole_segment_flow( logger.info(f"SAM3 segmentation result: {sam3_success}") if sam3_success: logger.info("Transferring SAM3 segmentation outputs to data832") - # sam3_segment_path = f"{folder_name}/seg{file_name}/sam3" + sam3_segment_path = f"{folder_name}/seg{file_name}/sam3" try: - # data832_sam3_future = globus_transfer_task.submit( - # file_path=sam3_segment_path, - # source=config.nersc832_alsdev_pscratch_scratch, - # destination=config.data832_scratch, - # config=config, - # ) - # logger.info("SAM3 transfer to data832 submitted") + data832_sam3_future = globus_transfer_task.submit( + file_path=sam3_segment_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.data832_scratch, + config=config, + ) + logger.info("SAM3 transfer to data832 submitted") data832_sam3_transfer_success = True logger.info(f"SAM3 transfer to data832 success: {data832_sam3_transfer_success}") except Exception as e: @@ -2077,15 +2080,15 @@ def nersc_petiole_segment_flow( logger.info(f"DINOv3 segmentation result: {dinov3_success}") if dinov3_success: logger.info("Transferring DINOv3 segmentation outputs to data832") - # dinov3_segment_path = f"{folder_name}/seg{file_name}/dino" + dinov3_segment_path = f"{folder_name}/seg{file_name}/dino" try: - # data832_dinov3_future = globus_transfer_task.submit( - # file_path=dinov3_segment_path, - # source=config.nersc832_alsdev_pscratch_scratch, - # destination=config.data832_scratch, - # config=config, - # ) - # logger.info("DINOv3 transfer to data832 submitted") + data832_dinov3_future = globus_transfer_task.submit( + file_path=dinov3_segment_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.data832_scratch, + config=config, + ) + logger.info("DINOv3 transfer to data832 submitted") data832_dinov3_transfer_success = True logger.info(f"DINOv3 transfer to data832 success: {data832_dinov3_transfer_success}") except Exception as e: @@ -2100,22 +2103,22 @@ def nersc_petiole_segment_flow( logger.info("Running segmentation combination.") combine_future = nersc_combine_segmentations_task.submit( - recon_folder_path=scratch_path_tiff, config=config + recon_folder_path=scratch_path_tiff, config=config, login_method=login_method ) combine_success = combine_future.result() logger.info(f"Combination result: {combine_success}") if combine_success: logger.info("Transferring combined segmentation outputs to data832") - # combined_segment_path = f"{folder_name}/seg{file_name}/combined/sam_dino" + combined_segment_path = f"{folder_name}/seg{file_name}/combined/sam_dino" try: - # data832_combined_future = globus_transfer_task.submit( - # file_path=combined_segment_path, - # source=config.nersc832_alsdev_pscratch_scratch, - # destination=config.data832_scratch, - # config=config, - # ) - # logger.info("Combined transfer to data832 submitted") + data832_combined_future = globus_transfer_task.submit( + file_path=combined_segment_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.data832_scratch, + config=config, + ) + logger.info("Combined transfer to data832 submitted") data832_combined_transfer_success = True logger.info(f"Combined transfer to data832 success: {data832_combined_transfer_success}") except Exception as e: @@ -2150,67 +2153,67 @@ def nersc_petiole_segment_flow( ) # ── STEP 6: Pruning ─────────────────────────────────────────────────────── - # logger.info("Scheduling file pruning tasks.") - # prune_controller = get_prune_controller(prune_type=PruneMethod.GLOBUS, config=config) - - # try: - # prune_controller.prune( - # file_path=f"{folder_name}/{path.name}", - # source_endpoint=config.nersc832_alsdev_pscratch_raw, - # check_endpoint=None, - # days_from_now=1.0 - # ) - # except Exception as e: - # logger.warning(f"Failed to schedule raw data pruning: {e}") - - # if nersc_reconstruction_success: - # try: - # prune_controller.prune( - # file_path=scratch_path_tiff, - # source_endpoint=config.nersc832_alsdev_pscratch_scratch, - # check_endpoint=config.data832_scratch if data832_tiff_transfer_success else None, - # days_from_now=1.0 - # ) - # except Exception as e: - # logger.warning(f"Failed to schedule reconstruction data pruning: {e}") - - # if any_seg_success: - # try: - # prune_controller.prune( - # file_path=scratch_path_segment, - # source_endpoint=config.nersc832_alsdev_pscratch_scratch, - # check_endpoint=config.data832_scratch if any([ - # data832_sam3_transfer_success, - # data832_dinov3_transfer_success, - # ]) else None, - # days_from_now=1.0 - # ) - # except Exception as e: - # logger.warning(f"Failed to schedule segmentation data pruning: {e}") - - # if data832_tiff_transfer_success: - # try: - # prune_controller.prune( - # file_path=scratch_path_tiff, - # source_endpoint=config.data832_scratch, - # check_endpoint=None, - # days_from_now=30.0 - # ) - # except Exception as e: - # logger.warning(f"Failed to schedule data832 tiff pruning: {e}") - - # if any([data832_sam3_transfer_success, - # data832_dinov3_transfer_success, - # data832_combined_transfer_success]): - # try: - # prune_controller.prune( - # file_path=scratch_path_segment, - # source_endpoint=config.data832_scratch, - # check_endpoint=None, - # days_from_now=30.0 - # ) - # except Exception as e: - # logger.warning(f"Failed to schedule data832 segment pruning: {e}") + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller(prune_type=PruneMethod.GLOBUS, config=config) + + try: + prune_controller.prune( + file_path=f"{folder_name}/{path.name}", + source_endpoint=config.nersc832_alsdev_pscratch_raw, + check_endpoint=None, + days_from_now=1.0 + ) + except Exception as e: + logger.warning(f"Failed to schedule raw data pruning: {e}") + + if nersc_reconstruction_success: + try: + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.nersc832_alsdev_pscratch_scratch, + check_endpoint=config.data832_scratch if data832_tiff_transfer_success else None, + days_from_now=1.0 + ) + except Exception as e: + logger.warning(f"Failed to schedule reconstruction data pruning: {e}") + + if any_seg_success: + try: + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.nersc832_alsdev_pscratch_scratch, + check_endpoint=config.data832_scratch if any([ + data832_sam3_transfer_success, + data832_dinov3_transfer_success, + ]) else None, + days_from_now=1.0 + ) + except Exception as e: + logger.warning(f"Failed to schedule segmentation data pruning: {e}") + + if data832_tiff_transfer_success: + try: + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + except Exception as e: + logger.warning(f"Failed to schedule data832 tiff pruning: {e}") + + if any([data832_sam3_transfer_success, + data832_dinov3_transfer_success, + data832_combined_transfer_success]): + try: + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + except Exception as e: + logger.warning(f"Failed to schedule data832 segment pruning: {e}") if nersc_reconstruction_success and any_seg_success: logger.info("NERSC reconstruction + multi-segmentation flow completed successfully.") @@ -2228,6 +2231,7 @@ def nersc_moon_segment_flow( file_path: str, config: Config832 | None = None, num_nodes: int | None = None, + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> bool: """Reconstruct a lunar regolith scan and run DINOv3-moon segmentation. @@ -2238,6 +2242,7 @@ def nersc_moon_segment_flow( :param file_path: Path to the raw .h5 file to be processed. :param config: Configuration object for the flow. :param num_nodes: Number of nodes for reconstruction. + :param login_method: Method to use for logging into NERSC (SFAPI or IRIAPI). :return: True if reconstruction and segmentation both succeeded. """ logger = get_run_logger() @@ -2254,7 +2259,7 @@ def nersc_moon_segment_flow( logger.info(f"Starting NERSC reconstruction + DINOv3-moon flow for {file_path=}") - controller = get_controller(hpc_type=HPC.NERSC, config=config) + controller = get_controller(hpc_type=HPC.NERSC, config=config, login_method=login_method) if num_nodes is None: num_nodes = config.nersc_recon_settings.get("num_nodes", 4) @@ -2310,7 +2315,7 @@ def nersc_moon_segment_flow( # ── STEP 3: DINOv3-moon segmentation ───────────────────────────────────── logger.info("Submitting DINOv3-moon segmentation task.") moon_future = nersc_segmentation_dinov3_task.submit( - recon_folder_path=scratch_path_tiff, config=config, project="moon" + recon_folder_path=scratch_path_tiff, config=config, project="moon", login_method=login_method ) moon_success = moon_future.result() @@ -2499,11 +2504,19 @@ def nersc_streaming_flow( def pull_shifter_image_flow( image: Optional[str] = None, config: Optional[Config832] = None, + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> bool: """ Pull a container image into NERSC's Shifter cache. Run this once when the container image is updated. + + Args: + image: The name of the container image to pull. If None, uses the default recon image from the config. + config: Configuration object for the flow. If None, a default Config832 will be created. + login_method: Method to use for logging into NERSC (SFAPI or IRIAPI). + Returns: + True if the image was pulled successfully, False otherwise. """ logger = get_run_logger() @@ -2517,7 +2530,8 @@ def pull_shifter_image_flow( controller = get_controller( hpc_type=HPC.NERSC, - config=config + config=config, + login_method=login_method ) # Check if already cached @@ -2535,7 +2549,7 @@ def nersc_reconstruction_task( file_path: str, num_nodes: int = 4, config: Optional[Config832] = None, - login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.IRIAPI + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> dict: """ Run tomography reconstruction at NERSC Perlmutter. @@ -2543,6 +2557,7 @@ def nersc_reconstruction_task( :param file_path: Path to the raw HDF5 file to reconstruct. :param num_nodes: Number of nodes to use for reconstruction. :param config: Configuration object for the flow. + :param login_method: NERSC API to authenticate against. :return: Dict with keys 'success', 'job_id', 'timing'. """ logger = get_run_logger() @@ -2559,13 +2574,14 @@ def nersc_reconstruction_task( def nersc_multiresolution_task( file_path: str, config: Optional[Config832] = None, - login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.IRIAPI + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> bool: """ Run multiresolution task at NERSC. :param file_path: Path to the reconstructed data folder to be processed. :param config: Configuration object for the flow. + :param login_method: NERSC API to authenticate against. :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -2613,12 +2629,14 @@ def nersc_multiresolution_integration_test() -> bool: def nersc_segmentation_sam3_task( recon_folder_path: str, config: Optional[Config832] = None, + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI, ) -> bool: """ Run segmentation task at NERSC. :param recon_folder_path: Path to the reconstructed data folder to be processed. :param config: Configuration object for the flow. + :param login_method: NERSC API to authenticate against. :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -2631,7 +2649,7 @@ def nersc_segmentation_sam3_task( tomography_controller = get_controller( hpc_type=HPC.NERSC, config=config, - login_method=NERSCLoginMethod.IRIAPI + login_method=login_method ) logger.info(f"Starting NERSC segmentation task for {recon_folder_path=}") nersc_segmentation_success = tomography_controller.segmentation_sam3( @@ -2652,13 +2670,25 @@ def nersc_segmentation_dinov3_task( recon_folder_path: str, config: Optional[Config832] = None, project: Optional[str] = "petiole", - login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.IRIAPI + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI ) -> bool: + """ + Run DINOv3 segmentation task at NERSC. + + Args: + recon_folder_path (str): Path to the reconstructed data folder to be processed. + config (Optional[Config832], optional): Configuration object for the flow. Defaults to None. + project (Optional[str], optional): Project name. Defaults to "petiole". + login_method (Optional[NERSCLoginMethod], optional): NERSC API to authenticate against. Defaults to SFAPI. + + Returns: + bool: True if the segmentation task completed successfully, False otherwise. + """ logger = get_run_logger() if config is None: logger.info("No config provided, using default Config832.") config = Config832() - tomography_controller = get_controller(hpc_type=HPC.NERSC, config=config, login_method=NERSCLoginMethod.IRIAPI) + tomography_controller = get_controller(hpc_type=HPC.NERSC, config=config, login_method=login_method) logger.info(f"Starting NERSC DINOv3 segmentation task for {recon_folder_path=}, {project=}") success = tomography_controller.segmentation_dinov3(recon_folder_path=recon_folder_path, project=project) if not success: @@ -2672,12 +2702,24 @@ def nersc_segmentation_dinov3_task( def nersc_combine_segmentations_task( recon_folder_path: str, config: Optional[Config832] = None, + login_method: Optional[NERSCLoginMethod] = NERSCLoginMethod.SFAPI, ) -> bool: + """ + Run combine segmentations task at NERSC + + Args: + recon_folder_path (str): Path to the reconstructed data folder to be processed. + config (Optional[Config832], optional): Configuration object for the flow. Defaults to None. + login_method (Optional[NERSCLoginMethod], optional): NERSC API to authenticate against. Defaults to SFAPI. + + Returns: + bool: True if the combine segmentations task completed successfully, False otherwise. + """ logger = get_run_logger() if config is None: logger.info("No config provided, using default Config832.") config = Config832() - tomography_controller = get_controller(hpc_type=HPC.NERSC, config=config, login_method=NERSCLoginMethod.IRIAPI) + tomography_controller = get_controller(hpc_type=HPC.NERSC, config=config, login_method=login_method) logger.info(f"Starting NERSC combine segmentations task for {recon_folder_path=}") success = tomography_controller.combine_segmentations(recon_folder_path=recon_folder_path) if not success: