Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b1e3225
Initial commit for adding NERSC IRI-API support alongside SFAPI for j…
davramov Mar 16, 2026
924c4a0
Removed NERSCLoginMethod(Enum) from nersc.py. Created a temporary tes…
davramov Mar 17, 2026
5b0607e
Updating pytests
davramov Mar 17, 2026
ce35832
successfully ran reconstruction using the IRI-API
davramov Mar 30, 2026
1b98b7c
removing token.py and moving the logic to get_globus_token.py
davramov Apr 1, 2026
e7c0eec
moving get_globus_token.py to orchestration/globus/ to be used as a m…
davramov Apr 1, 2026
252199d
Updating unit tests
davramov Apr 1, 2026
c472f9d
Rebasing and including segmentation flows as part of iri/sfapi abstra…
davramov Apr 7, 2026
1b4624d
Making IRIAPI the default login method for now
davramov Apr 13, 2026
c36d9c4
Making the IRI job submission read sbatch settings
davramov Apr 14, 2026
c3d3b1e
Fixing IRIAPI bugs, also commenting out Globus transfers for now
davramov Apr 14, 2026
33bb1f3
Updating logger comments
davramov Apr 23, 2026
5192ecd
connecting to AmSC MLflow service
davramov Apr 24, 2026
fbfaac7
removing old commented code
davramov Apr 24, 2026
6c1837d
adjusting import in pytest to avoid error on github that did not occu…
davramov Apr 24, 2026
8055eae
Getting NERSC reservations working with IRI API
davramov May 7, 2026
cbd7b7e
fixing globus token race condition when jobs are launch simultaneously
davramov May 7, 2026
34b51c0
updating config with confab reservation
davramov May 12, 2026
91dfa00
moving nersc iri/sf-api resource definitions to config (no longer glo…
davramov May 20, 2026
284de47
Updating nersc.py to pull iri/sf-api parameters from the config, rath…
davramov May 20, 2026
d864e80
itial commit for a new orchestration/jobs/ structure
davramov May 21, 2026
afb3fe2
Large refactoring of job submission code. Moving ALCF/NERSC generic j…
davramov May 26, 2026
2806e9a
Adding pytests mirroring the jobs/ directory structure
davramov May 26, 2026
31de0f1
renaming dummy/fake --> mock in unit tests
davramov May 26, 2026
d583480
polishing up post-rebase
davramov May 27, 2026
e4e1b5a
removing reservation from config
davramov May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions orchestration/_tests/test_bl832/test_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ class TestLoadJobOptionsMLflowLayer:

def _patch_variable_defaults(self, mocker):
mocker.patch(
"orchestration.flows.bl832.nersc.Variable.get",
"orchestration.jobs.options.Variable.get",
return_value={"defaults": True},
)

def test_mlflow_nersc_path_mapped_to_checkpoint_key(self, mocker, mock_config832):
"""When MLflow returns a checkpoint, nersc_path is written to mlflow_checkpoint_key."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options
from orchestration.mlflow import ModelCheckpointInfo

self._patch_variable_defaults(mocker)
Expand All @@ -246,12 +246,12 @@ def test_mlflow_nersc_path_mapped_to_checkpoint_key(self, mocker, mock_config832
inference_params={},
)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
return_value=checkpoint_info,
)

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand All @@ -263,7 +263,7 @@ def test_mlflow_nersc_path_mapped_to_checkpoint_key(self, mocker, mock_config832

def test_mlflow_inference_params_overlay_config_defaults(self, mocker, mock_config832):
"""inference_params from MLflow overwrite matching config keys."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options
from orchestration.mlflow import ModelCheckpointInfo

self._patch_variable_defaults(mocker)
Expand All @@ -281,12 +281,12 @@ def test_mlflow_inference_params_overlay_config_defaults(self, mocker, mock_conf
},
)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
return_value=checkpoint_info,
)

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand All @@ -300,13 +300,13 @@ def test_mlflow_inference_params_overlay_config_defaults(self, mocker, mock_conf

def test_mlflow_layer_skipped_when_config_is_none(self, mocker, mock_config832):
"""Passing config=None skips the MLflow layer entirely."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options

self._patch_variable_defaults(mocker)
spy = mocker.patch("orchestration.flows.bl832.nersc.get_checkpoint_info")
spy = mocker.patch("orchestration.jobs.options.get_checkpoint_info")

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=None,
Expand All @@ -320,13 +320,13 @@ def test_mlflow_layer_skipped_when_config_is_none(self, mocker, mock_config832):

def test_mlflow_layer_skipped_when_model_name_is_none(self, mocker, mock_config832):
"""Passing mlflow_model_name=None skips the MLflow layer."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options

self._patch_variable_defaults(mocker)
spy = mocker.patch("orchestration.flows.bl832.nersc.get_checkpoint_info")
spy = mocker.patch("orchestration.jobs.options.get_checkpoint_info")

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
_load_job_options(
load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand All @@ -337,16 +337,16 @@ def test_mlflow_layer_skipped_when_model_name_is_none(self, mocker, mock_config8

def test_config_defaults_used_when_mlflow_returns_none(self, mocker, mock_config832):
"""get_checkpoint_info returning None → config defaults unchanged."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options

self._patch_variable_defaults(mocker)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
return_value=None,
)

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand All @@ -358,16 +358,16 @@ def test_config_defaults_used_when_mlflow_returns_none(self, mocker, mock_config

def test_config_defaults_used_when_mlflow_raises(self, mocker, mock_config832):
"""An exception from get_checkpoint_info is caught; config defaults are used."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options

self._patch_variable_defaults(mocker)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
side_effect=RuntimeError("Network timeout"),
)

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand All @@ -379,12 +379,12 @@ def test_config_defaults_used_when_mlflow_raises(self, mocker, mock_config832):

def test_prefect_variable_wins_over_mlflow(self, mocker, mock_config832):
"""Prefect Variable overrides take priority over MLflow inference params (layer 3 > layer 2)."""
from orchestration.flows.bl832.nersc import _load_job_options
from orchestration.jobs.options import load_job_options
from orchestration.mlflow import ModelCheckpointInfo

# MLflow says batch_size=8; Prefect Variable says batch_size=16 → 16 wins
mocker.patch(
"orchestration.flows.bl832.nersc.Variable.get",
"orchestration.jobs.options.Variable.get",
return_value={"defaults": False, "batch_size": 16},
)

Expand All @@ -397,12 +397,12 @@ def test_prefect_variable_wins_over_mlflow(self, mocker, mock_config832):
inference_params={"batch_size": 8},
)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
return_value=checkpoint_info,
)

base_settings = dict(mock_config832.nersc_segment_sam3_settings)
opts = _load_job_options(
opts = load_job_options(
"nersc-segmentation-options",
base_settings,
config=mock_config832,
Expand Down Expand Up @@ -437,7 +437,7 @@ def test_mlflow_checkpoint_appears_in_job_script(self, mocker, mock_config832):
resolved_settings["finetuned_checkpoint_path"] = mlflow_checkpoint

mocker.patch(
"orchestration.flows.bl832.nersc._load_job_options",
"orchestration.flows.bl832.nersc.load_job_options",
return_value=resolved_settings,
)

Expand All @@ -455,16 +455,16 @@ def capture_script(script, *args, **kwargs):
config=mock_config832,
login_method=NERSCLoginMethod.IRIAPI,
)
mocker.patch.object(controller, "_submit_job", side_effect=capture_script)
mocker.patch.object(controller, "_wait_for_job", return_value=True)
mocker.patch.object(controller, "_mkdir_remote", return_value=None)
mocker.patch.object(controller, "submit_job", side_effect=capture_script)
mocker.patch.object(controller, "wait_for_job", return_value=True)
mocker.patch.object(controller, "mkdir_remote", return_value=None)
mocker.patch.object(controller, "_fetch_seg_timing_from_output", return_value=None)
# _get_nersc_username reads NERSC_USERNAME for IRIAPI; stub it
mocker.patch.object(controller, "_get_nersc_username", return_value="testuser")
mocker.patch.object(controller, "get_nersc_username", return_value="testuser")

result = controller.segmentation_sam3(recon_folder_path="folder/recfile")

assert captured, "_submit_job was never called"
assert captured, "submit_job was never called"
assert mlflow_checkpoint in captured[0], (
"The MLflow checkpoint path must appear in the SLURM job script"
)
Expand All @@ -475,11 +475,11 @@ def test_config_default_checkpoint_used_when_mlflow_unavailable(self, mocker, mo

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch(
"orchestration.flows.bl832.nersc.Variable.get",
"orchestration.jobs.options.Variable.get",
return_value={"defaults": True},
)
mocker.patch(
"orchestration.flows.bl832.nersc.get_checkpoint_info",
"orchestration.jobs.options.get_checkpoint_info",
return_value=None,
)

Expand All @@ -494,16 +494,16 @@ def capture_script(script, *args, **kwargs):
config=mock_config832,
login_method=NERSCLoginMethod.IRIAPI,
)
mocker.patch.object(controller, "_submit_job", side_effect=capture_script)
mocker.patch.object(controller, "_wait_for_job", return_value=True)
mocker.patch.object(controller, "_mkdir_remote", return_value=None)
mocker.patch.object(controller, "submit_job", side_effect=capture_script)
mocker.patch.object(controller, "wait_for_job", return_value=True)
mocker.patch.object(controller, "mkdir_remote", return_value=None)
mocker.patch.object(controller, "_fetch_seg_timing_from_output", return_value=None)
mocker.patch.object(controller, "_get_nersc_username", return_value="testuser")
mocker.patch.object(controller, "get_nersc_username", return_value="testuser")

controller.segmentation_sam3(recon_folder_path="folder/recfile")

config_default = mock_config832.nersc_segment_sam3_settings["finetuned_checkpoint_path"]
assert captured, "_submit_job was never called"
assert captured, "submit_job was never called"
assert config_default in captured[0], (
"Config default checkpoint path must be used when MLflow is unavailable"
)
74 changes: 10 additions & 64 deletions orchestration/_tests/test_bl832/test_nersc.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,60 +201,6 @@ def mock_iriapi_client(mocker):

return client

# ---------------------------------------------------------------------------
# _create_sfapi_client
# ---------------------------------------------------------------------------


def test_create_sfapi_client_success(mocker):
"""Valid credentials produce a Client instance."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", side_effect=lambda x: {
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
"PATH_NERSC_PRI_KEY": "/path/to/client_secret",
}.get(x))
mocker.patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=True)
mocker.patch(
"builtins.open",
side_effect=[
mocker.mock_open(read_data="my-client-id")(),
mocker.mock_open(read_data='{"kty": "RSA", "n": "x", "e": "y"}')(),
]
)
mocker.patch("orchestration.flows.bl832.nersc.JsonWebKey.import_key", return_value="mock_secret")
mock_client_cls = mocker.patch("orchestration.flows.bl832.nersc.Client")

client = NERSCTomographyHPCController._create_sfapi_client()

mock_client_cls.assert_called_once_with("my-client-id", "mock_secret")
assert client is mock_client_cls.return_value


def test_create_sfapi_client_missing_paths(mocker):
"""Unset env vars raise ValueError."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", return_value=None)

with pytest.raises(ValueError, match="Missing NERSC credentials paths."):
NERSCTomographyHPCController._create_sfapi_client()


def test_create_sfapi_client_missing_files(mocker):
"""Env vars set but files absent raise FileNotFoundError."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", side_effect=lambda x: {
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
"PATH_NERSC_PRI_KEY": "/path/to/client_secret",
}.get(x))
mocker.patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=False)

with pytest.raises(FileNotFoundError, match="NERSC credential files are missing."):
NERSCTomographyHPCController._create_sfapi_client()


# ──────────────────────────────────────────────────────────────────────────────
# build_multi_resolution
# ──────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -304,7 +250,7 @@ def test_segmentation_sam3_success(mocker, mock_sfapi_client, mock_config832):
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config832,
Expand All @@ -326,7 +272,7 @@ def test_segmentation_sam3_submission_failure(mocker, mock_sfapi_client, mock_co
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("GPU queue full")
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
Expand All @@ -346,7 +292,7 @@ def test_segmentation_sam3_uses_variable_options(mocker, mock_sfapi_client, mock
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={
mocker.patch("orchestration.jobs.options.Variable.get", return_value={
"defaults": False,
"batch_size": 8,
"patch_size": 512,
Expand Down Expand Up @@ -395,7 +341,7 @@ def test_segmentation_dinov3_success(mocker, mock_sfapi_client, mock_config832):
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config832,
Expand All @@ -414,7 +360,7 @@ def test_segmentation_dinov3_submission_failure(mocker, mock_sfapi_client, mock_
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("No GPU nodes")
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
Expand Down Expand Up @@ -522,7 +468,7 @@ def test_reconstruct_iriapi_job_failed(mocker, mock_iriapi_client, mock_config83

monkeypatch.setenv("NERSC_USERNAME", "alsdev")
mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mock_iriapi_client.get.return_value.json.return_value = {"status": {"state": "failed"}} # was {"state": "FAILED"}
mock_iriapi_client.get.return_value.json.return_value = {"status": {"state": "failed"}}

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
Expand Down Expand Up @@ -605,7 +551,7 @@ def test_segmentation_dinov3_output_paths(mocker, mock_sfapi_client, mock_config
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})

captured_scripts = []
original_return = mock_sfapi_client.compute.return_value.submit_job.return_value
Expand Down Expand Up @@ -636,7 +582,7 @@ def test_combine_segmentations_success(mocker, mock_sfapi_client, mock_config832
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config832,
Expand All @@ -655,7 +601,7 @@ def test_combine_segmentations_submission_failure(mocker, mock_sfapi_client, moc
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("Cluster down")
controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
Expand All @@ -673,7 +619,7 @@ def test_combine_segmentations_script_references_sam3_and_dino(mocker, mock_sfap
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mocker.patch("orchestration.jobs.options.Variable.get", return_value={"defaults": True})

captured_scripts = []
original_return = mock_sfapi_client.compute.return_value.submit_job.return_value
Expand Down
Empty file.
Empty file.
Loading
Loading