From d67dfb3ab154cc98216543c2b543321a74fc230e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:55:14 -0700 Subject: [PATCH 01/33] adding beegfs globus endpoint to config.yaml for bl832 --- orchestration/flows/bl832/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 8bbbf78c..f92bc5ce 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,6 +27,7 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.beegfs_raw = self.endpoints["bl832-beegfs-raw"] # SciCat self.scicat = self.config["scicat"] # MLflow From 15ac2d9d095ecaf7020549445d4186d569af480b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:55:49 -0700 Subject: [PATCH 02/33] updating move.py to copy to beegfs --- orchestration/flows/bl832/move.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index e5e10b02..3d0a5983 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +from splash_flows.orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") @@ -156,6 +157,19 @@ def process_new_832_file_task( except Exception as e: logger.error(f"SciCat ingest failed with {e}") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config, + prometheus_metrics=None + ) + + transfer_controller.copy( + file_path=relative_path, + source=config.data832, + destination=config.beegfs_raw + ) + logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, From 4be93ed4fef11390935704132aa9f9cef43be165 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:56:15 -0700 Subject: [PATCH 03/33] adding tiled[client] to requirements --- pyproject.toml | 4 +++- requirements.txt | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1b1f7010..a5b3a470 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,9 @@ dependencies = [ "python-dotenv", "pyyaml", "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", - "sfapi_client" + "sfapi_client", + "tiled[client]", + "watchfiles" ] [build-system] diff --git a/requirements.txt b/requirements.txt index 7fdf24af..272693e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,5 @@ python-dotenv pyyaml scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 sfapi_client +tiled[client] +watchfiles \ No newline at end of file From ad329db1d2a3c8df5a0799fae810cf904facc7ca Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:56:40 -0700 Subject: [PATCH 04/33] adding beegfs globus endpoint to config.yaml for bl832 --- config.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/config.yml b/config.yml index ea576b54..8159544c 100644 --- a/config.yml +++ b/config.yml @@ -139,6 +139,12 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 + bl832-beegfs-raw: + root_path: /beamline_staging/bl832/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-raw + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} From 6b5f297bd67d0c5385b070447499dc3bf4e3b975 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:57:02 -0700 Subject: [PATCH 05/33] adding orchestration/tiled.py for ingesting data on beegfs --- orchestration/tiled.py | 69 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 orchestration/tiled.py diff --git a/orchestration/tiled.py b/orchestration/tiled.py new file mode 100644 index 00000000..e871d7a8 --- /dev/null +++ b/orchestration/tiled.py @@ -0,0 +1,69 @@ +"""Register data files and Zarr/HDF5 stores to the Tiled catalog. + +Intended to run on a Ride worker. The file path must be accessible +to the Tiled server's filesystem. +""" +from dotenv import load_dotenv +import os +from pathlib import Path + +from prefect import flow, get_run_logger, task +from tiled.client import from_uri +from tiled.client.register import register + + +@task(name="register-file-to-tiled", task_run_name="register-{file_path}") +async def register_file_to_tiled( + file_path: Path, + catalog_path: str | None = None, + tiled_path: Path | None = None, +) -> None: + """Register a file or Zarr store to the Tiled catalog. + + Args: + file_path: Absolute path on the client filesystem (used for logging). + catalog_path: Optional sub-path within the Tiled catalog. + tiled_path: Path as seen by the Tiled server. Defaults to file_path. + """ + logger = get_run_logger() + load_dotenv() + tiled_uri = os.environ["TILED_URI"] + api_key = os.environ["TILED_SINGLE_USER_API_KEY"] + + server_path = tiled_path if tiled_path is not None else file_path + + client = from_uri(tiled_uri, api_key=api_key) + catalog = client[catalog_path] if catalog_path else client + + logger.info(f"Registering {file_path} → {server_path}") + await register(catalog, server_path, overwrite=False) + logger.info(f"Registered {server_path} to Tiled catalog") + + +@flow(name="register-to-tiled", flow_run_name="register-{file_path}") +async def register_to_tiled( + file_path: Path | str, + catalog_path: str | None = None, + tiled_path: Path | str | None = None, +) -> None: + """Register a file or Zarr store to the Tiled server. + + Args: + file_path: Path to the file or Zarr store (client filesystem). + catalog_path: Optional sub-path within the Tiled catalog. + tiled_path: Path as seen by the Tiled server. Defaults to file_path. + """ + logger = get_run_logger() + file_path = Path(file_path) + tiled_path = Path(tiled_path) if tiled_path else None + logger.info(f"Registering {file_path} to Tiled (catalog_path={catalog_path!r})") + await register_file_to_tiled(file_path, catalog_path=catalog_path, tiled_path=tiled_path) + + +if __name__ == "__main__": + import asyncio + + zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") + h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") + asyncio.run(register_to_tiled(zarr)) + asyncio.run(register_to_tiled(h5)) From 8b36f10a8bcc3075a7a23d649354e478121d81b4 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:45:28 -0700 Subject: [PATCH 06/33] cleaning up --- orchestration/tiled.py | 59 +++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index e871d7a8..849d567a 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -12,52 +12,59 @@ from tiled.client.register import register -@task(name="register-file-to-tiled", task_run_name="register-{file_path}") +@task(name="register-file-to-tiled", task_run_name="register-{path}") async def register_file_to_tiled( - file_path: Path, - catalog_path: str | None = None, - tiled_path: Path | None = None, + path: Path, + prefix: str | None = None, + overwrite: bool = False, ) -> None: """Register a file or Zarr store to the Tiled catalog. Args: - file_path: Absolute path on the client filesystem (used for logging). - catalog_path: Optional sub-path within the Tiled catalog. - tiled_path: Path as seen by the Tiled server. Defaults to file_path. + path: Absolute path on the client filesystem (used for logging). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. """ logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] api_key = os.environ["TILED_SINGLE_USER_API_KEY"] - server_path = tiled_path if tiled_path is not None else file_path - client = from_uri(tiled_uri, api_key=api_key) - catalog = client[catalog_path] if catalog_path else client - logger.info(f"Registering {file_path} → {server_path}") - await register(catalog, server_path, overwrite=False) - logger.info(f"Registered {server_path} to Tiled catalog") + logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") + try: + await register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite + ) + except Exception as e: + raise RuntimeError( + f"Failed to register {path} to Tiled catalog at {tiled_uri} " + f"(prefix={prefix!r}): {e}" + ) from e + logger.info(f"Registered {path} to Tiled catalog") -@flow(name="register-to-tiled", flow_run_name="register-{file_path}") +@flow(name="register-to-tiled", flow_run_name="register-{path}") async def register_to_tiled( - file_path: Path | str, - catalog_path: str | None = None, - tiled_path: Path | str | None = None, + path: Path | str, + prefix: str | None = None, + overwrite: bool = False, ) -> None: """Register a file or Zarr store to the Tiled server. Args: - file_path: Path to the file or Zarr store (client filesystem). - catalog_path: Optional sub-path within the Tiled catalog. - tiled_path: Path as seen by the Tiled server. Defaults to file_path. + path: Path to the file or Zarr store (client filesystem). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. """ logger = get_run_logger() - file_path = Path(file_path) - tiled_path = Path(tiled_path) if tiled_path else None - logger.info(f"Registering {file_path} to Tiled (catalog_path={catalog_path!r})") - await register_file_to_tiled(file_path, catalog_path=catalog_path, tiled_path=tiled_path) + path = Path(path) + logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") + await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite) if __name__ == "__main__": @@ -65,5 +72,5 @@ async def register_to_tiled( zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - asyncio.run(register_to_tiled(zarr)) - asyncio.run(register_to_tiled(h5)) + asyncio.run(register_to_tiled(zarr, prefix="scratch")) + asyncio.run(register_to_tiled(h5, prefix="raw")) From 6f8ff1d1e8c08b1b2f31fedfb7ab20075fb5861e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:46:32 -0700 Subject: [PATCH 07/33] Adding todo note --- orchestration/flows/bl832/move.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 3d0a5983..b72d2d8d 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -170,6 +170,8 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + # TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, From b1db5f56c8f7cf0be49c0d9d922a135c0498415a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:53:42 -0700 Subject: [PATCH 08/33] fixing bad import statement --- orchestration/flows/bl832/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index b72d2d8d..de5d30f9 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,7 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics -from splash_flows.orchestration.transfer_controller import CopyMethod, get_transfer_controller +from orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") From 3af7f2fe104a6bd2261de2ad6653871fdf7a29f1 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:53:57 -0700 Subject: [PATCH 09/33] adding beegfs endpoint to unit tests --- orchestration/_tests/test_globus_flow.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index c75317e1..352c09cb 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -150,6 +150,7 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_scratch")), "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")) } # Mock apps @@ -169,6 +170,7 @@ def __init__(self) -> None: self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] + self.beegfs_raw = self.endpoints["beegfs_raw"] self.scicat = config["scicat"] @@ -250,6 +252,7 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_recon_scripts": mocker.MagicMock(), "alcf832_raw": mocker.MagicMock(), "alcf832_scratch": mocker.MagicMock(), + "bl832-beegfs-raw": mocker.MagicMock(), } ) mocker.patch( From d6ba15330041e151a2004ae1822ffe555db02532 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 24 Apr 2026 14:14:40 -0700 Subject: [PATCH 10/33] pointing beegfs root_path to the correct directory --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 8159544c..6415328d 100644 --- a/config.yml +++ b/config.yml @@ -140,7 +140,7 @@ globus: name: nersc832 bl832-beegfs-raw: - root_path: /beamline_staging/bl832/raw/ + root_path: /global/beegfs/beamlines/bl832/raw/ uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl832-beegfs-raw From 1502dc6e110e9cd6af2907b34c572f2d4d68f4f2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 24 Apr 2026 14:16:51 -0700 Subject: [PATCH 11/33] adding tags to ingested data --- orchestration/tiled.py | 83 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 849d567a..8f9d68f4 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -17,20 +17,14 @@ async def register_file_to_tiled( path: Path, prefix: str | None = None, overwrite: bool = False, + tags: list[str] | None = None, ) -> None: - """Register a file or Zarr store to the Tiled catalog. - - Args: - path: Absolute path on the client filesystem (used for logging). - prefix: Optional sub-path within the Tiled catalog. - overwrite: Whether to overwrite existing entries in the Tiled catalog. - """ logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] - api_key = os.environ["TILED_SINGLE_USER_API_KEY"] + tiled_api_key = os.environ["TILED_API_KEY"] - client = from_uri(tiled_uri, api_key=api_key) + client = from_uri(tiled_uri, api_key=tiled_api_key) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: @@ -38,14 +32,54 @@ async def register_file_to_tiled( node=client, path=path, prefix=prefix or "/", - overwrite=overwrite + overwrite=overwrite, ) except Exception as e: raise RuntimeError( f"Failed to register {path} to Tiled catalog at {tiled_uri} " f"(prefix={prefix!r}): {e}" ) from e - logger.info(f"Registered {path} to Tiled catalog") + + if not tags: + return + + def _apply_tags(entry_node): + existing_blob = entry_node.access_blob + existing_tags = (existing_blob or {}).get("tags", []) + merged_tags = list(set(existing_tags) | set(tags)) + op = "replace" if existing_blob is not None else "add" + try: + entry_node.patch_metadata( + access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], + ) + logger.info(f"Tagged {entry_node.uri} with {merged_tags}") + except Exception as e: + logger.warning(f"Could not tag {entry_node.uri}: {e}") + + # Navigate to prefix node after registration + node = client + for segment in (prefix or "").strip("/").split("/"): + if segment: + node = node[segment] + + if path.is_dir() and not path.suffix: + # TIFF directory: Tiled registers each file flat into the prefix node + for key in node: + _apply_tags(node[key]) + else: + # .h5 or .zarr: registered as single node, keyed by stem + # Even on COLLISION the entry exists — just try it directly + entry_key = path.stem + logger.info(f"Looking up entry key {entry_key!r} under {prefix!r}") + try: + _apply_tags(node[entry_key]) + except KeyError: + # Key not found even after registration — log all available keys to diagnose + available = sorted(node) + logger.warning( + f"Entry {entry_key!r} not found under {prefix!r}. " + f"Available keys: {available}" + ) @flow(name="register-to-tiled", flow_run_name="register-{path}") @@ -53,6 +87,7 @@ async def register_to_tiled( path: Path | str, prefix: str | None = None, overwrite: bool = False, + tags: list[str] | None = None, ) -> None: """Register a file or Zarr store to the Tiled server. @@ -64,13 +99,31 @@ async def register_to_tiled( logger = get_run_logger() path = Path(path) logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") - await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite) + await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) if __name__ == "__main__": import asyncio - zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - asyncio.run(register_to_tiled(zarr, prefix="scratch")) - asyncio.run(register_to_tiled(h5, prefix="raw")) + tiffs = Path("/Users/david/Documents/data/tomo/rec20230224_132553_sea_shell/") + zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia_flat-AQ_fungi2_fast.zarr") + + asyncio.run(register_to_tiled(path=h5, prefix="beamlines/bl832/raw/", tags=["bl832"], overwrite=False)) + asyncio.run(register_to_tiled(path=tiffs, prefix="beamlines/bl832/scratch", tags=["bl832", "dabramov"], overwrite=False)) + asyncio.run(register_to_tiled(path=zarr, prefix="beamlines/bl832/scratch", tags=["bl832"], overwrite=False)) + + load_dotenv() + client = from_uri(os.environ["TILED_URI"]) + checks = [ + (client["beamlines"]["bl832"]["raw"][h5.stem], ["bl832"], h5), + (client["beamlines"]["bl832"]["scratch"], ["bl832", "dabramov"], tiffs), + (client["beamlines"]["bl832"]["scratch"][zarr.stem], ["bl832"], zarr), + ] + for node, expected_tags, check_path in checks: + if check_path.is_dir() and not check_path.suffix: + key = next(iter(node)) + node = node[key] + actual = node.access_blob.get("tags", []) + status = "✓" if set(expected_tags) <= set(actual) else "✗" + print(f"{status} {node.uri}: tags={actual}") # prefix should be beamlines/bl832/raw// From 1e7422fc44a497170c523ce5d8d9d9ab6f6a007c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 13:10:21 -0700 Subject: [PATCH 12/33] removing TILED_API_KEY from tiled.py --- orchestration/tiled.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 8f9d68f4..692d1b91 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -22,9 +22,8 @@ async def register_file_to_tiled( logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] - tiled_api_key = os.environ["TILED_API_KEY"] - client = from_uri(tiled_uri, api_key=tiled_api_key) + client = from_uri(tiled_uri) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: From 38c0d660a9f92aeef5bcf8ac413fb7d51ff72cdd Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 15:38:42 -0700 Subject: [PATCH 13/33] removing check tags logic from main, moving it to a new method (check_tags). Also removing file paths from main, and setting as .env variables --- orchestration/tiled.py | 92 +++++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 692d1b91..99d12eb9 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -81,6 +81,62 @@ def _apply_tags(entry_node): ) +@task(name="check-tiled-tags", task_run_name="check-tags-{path}") +def check_tags( + path: Path | str, + prefix: str, + expected_tags: set[str], +) -> tuple[bool, list[str]]: + """Check whether a registered dataset has the expected tags applied. + + Navigates to the entry corresponding to ``path`` under ``prefix`` in the + Tiled catalog and compares its ``access_blob.tags`` against ``expected_tags``. + + For TIFF directories (a directory with no suffix), the first child entry + under the prefix node is checked, since ``register`` registers each TIFF + file flat into the prefix node. + + Args: + path: Path to the file or store that was registered. + prefix: Sub-path within the Tiled catalog where the entry was registered. + expected_tags: Tags that must be present on the entry. + + Returns: + A tuple ``(ok, actual_tags)`` where ``ok`` is True iff every tag in + ``expected_tags`` is present in the entry's ``access_blob.tags``. + + Raises: + KeyError: If the entry cannot be located under ``prefix``. + """ + logger = get_run_logger() + load_dotenv() + path = Path(path) + tiled_uri = os.environ["TILED_URI"] + client = from_uri(tiled_uri) + + # Navigate to the prefix node + node = client + for segment in prefix.strip("/").split("/"): + if segment: + node = node[segment] + + # For TIFF directories, register flattens files into the prefix node; + # for .h5 / .zarr, the entry is keyed by the path stem. + if path.is_dir() and not path.suffix: + key = next(iter(node)) + node = node[key] + else: + node = node[path.stem] + + actual = node.access_blob.get("tags", []) if node.access_blob else [] + ok = expected_tags <= set(actual) + logger.info( + f"{path.name} under {prefix!r}: " + f"expected={sorted(expected_tags)} actual={actual} ok={ok}" + ) + return ok, actual + + @flow(name="register-to-tiled", flow_run_name="register-{path}") async def register_to_tiled( path: Path | str, @@ -94,6 +150,7 @@ async def register_to_tiled( path: Path to the file or Zarr store (client filesystem). prefix: Optional sub-path within the Tiled catalog. overwrite: Whether to overwrite existing entries in the Tiled catalog. + tags: Optional list of tags to apply to the registered entry. """ logger = get_run_logger() path = Path(path) @@ -104,25 +161,20 @@ async def register_to_tiled( if __name__ == "__main__": import asyncio - h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - tiffs = Path("/Users/david/Documents/data/tomo/rec20230224_132553_sea_shell/") - zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia_flat-AQ_fungi2_fast.zarr") - - asyncio.run(register_to_tiled(path=h5, prefix="beamlines/bl832/raw/", tags=["bl832"], overwrite=False)) - asyncio.run(register_to_tiled(path=tiffs, prefix="beamlines/bl832/scratch", tags=["bl832", "dabramov"], overwrite=False)) - asyncio.run(register_to_tiled(path=zarr, prefix="beamlines/bl832/scratch", tags=["bl832"], overwrite=False)) - load_dotenv() - client = from_uri(os.environ["TILED_URI"]) - checks = [ - (client["beamlines"]["bl832"]["raw"][h5.stem], ["bl832"], h5), - (client["beamlines"]["bl832"]["scratch"], ["bl832", "dabramov"], tiffs), - (client["beamlines"]["bl832"]["scratch"][zarr.stem], ["bl832"], zarr), + h5 = Path(os.environ["EXAMPLE_H5_PATH"]) + tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) + zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) + + cases = [ + (h5, "beamlines/bl832/raw/", {"bl832"}), + (tiffs, "beamlines/bl832/scratch", {"bl832", "dabramov"}), + (zarr, "beamlines/bl832/scratch", {"bl832"}), ] - for node, expected_tags, check_path in checks: - if check_path.is_dir() and not check_path.suffix: - key = next(iter(node)) - node = node[key] - actual = node.access_blob.get("tags", []) - status = "✓" if set(expected_tags) <= set(actual) else "✗" - print(f"{status} {node.uri}: tags={actual}") # prefix should be beamlines/bl832/raw// + + for path, prefix, tags in cases: + asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) + + for path, prefix, expected in cases: + ok, actual = check_tags.fn(path, prefix, expected) + print(f"{'✓' if ok else '✗'} {path.name}: expected={sorted(expected)} actual={actual}") From c4657f4c1ae72bda6a5a8479a674c9710a67e018 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 16:50:39 -0700 Subject: [PATCH 14/33] Removing requirements.txt, moving any missing deps to pyproject.toml, updating github action --- .github/workflows/python-app.yml | 6 ++---- pyproject.toml | 21 +++++++++++---------- requirements-dev.txt | 5 ----- requirements.txt | 22 ---------------------- 4 files changed, 13 insertions(+), 41 deletions(-) delete mode 100644 requirements-dev.txt delete mode 100644 requirements.txt diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 1164dede..6345a494 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -17,13 +17,11 @@ jobs: with: python-version: 3.11 cache: 'pip' + cache-dependency-path: pyproject.toml - name: Install dependencies run: | python -m pip install --no-cache-dir --upgrade pip - pip install --no-cache-dir flake8 pytest - pip install --no-cache-dir . - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + pip install -e . --group dev - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/pyproject.toml b/pyproject.toml index a5b3a470..7ff59ad6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.11" dependencies = [ "authlib", + "dynaconf", "globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk", "globus-sdk>=3.0", "griffe>=0.49.0,<2.0.0", @@ -28,15 +29,15 @@ dependencies = [ "watchfiles" ] -[build-system] -requires = [ - "black", - "flake8", - "freezegun", - "pytest", - "pytest-mock", - "setuptools" +[dependency-groups] +dev = [ + "black", + "flake8", + "freezegun", + "pytest", + "pytest-mock", ] -build-backend = "setuptools.build_meta" - +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index d5ac223e..00000000 --- a/requirements-dev.txt +++ /dev/null @@ -1,5 +0,0 @@ -pytest -pytest-mock -freezegun -flake8 -black \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 272693e4..00000000 --- a/requirements.txt +++ /dev/null @@ -1,22 +0,0 @@ -authlib==1.6.1 -dynaconf -globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk -globus-sdk>=3.0 -griffe>=0.49.0,<2.0.0 -h5py -httpx>=0.22.0 -mkdocs -mkdocs-material -mkdocs-mermaid2-plugin -mlflow==2.22.0 -numpy>=1.26.4 -pillow -prefect==3.4.2 -prometheus_client==0.21.1 -pydantic==2.11 -python-dotenv -pyyaml -scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 -sfapi_client -tiled[client] -watchfiles \ No newline at end of file From d3796129b7c20a8482518db8264e49f069a0273c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 5 May 2026 09:47:07 -0700 Subject: [PATCH 15/33] moving load_dotenv() to top of the module --- orchestration/tiled.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 99d12eb9..5616e16b 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -12,6 +12,9 @@ from tiled.client.register import register +load_dotenv() + + @task(name="register-file-to-tiled", task_run_name="register-{path}") async def register_file_to_tiled( path: Path, @@ -20,7 +23,6 @@ async def register_file_to_tiled( tags: list[str] | None = None, ) -> None: logger = get_run_logger() - load_dotenv() tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) @@ -109,7 +111,6 @@ def check_tags( KeyError: If the entry cannot be located under ``prefix``. """ logger = get_run_logger() - load_dotenv() path = Path(path) tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) From 1d91f7ee66642880c829d881b9bc5910fa3ca31a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 5 May 2026 14:50:44 -0700 Subject: [PATCH 16/33] moving _apply_tags() into a prefect task, logger.info -> logger.debug, cleanup. Files and tags register with tiled, and I can see them via the Tiled python api, but not in the Tiled UI for some reason (maybe a sign access tags are working...) --- orchestration/tiled.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 5616e16b..bd2f58af 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -44,19 +44,6 @@ async def register_file_to_tiled( if not tags: return - def _apply_tags(entry_node): - existing_blob = entry_node.access_blob - existing_tags = (existing_blob or {}).get("tags", []) - merged_tags = list(set(existing_tags) | set(tags)) - op = "replace" if existing_blob is not None else "add" - try: - entry_node.patch_metadata( - access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], - ) - logger.info(f"Tagged {entry_node.uri} with {merged_tags}") - except Exception as e: - logger.warning(f"Could not tag {entry_node.uri}: {e}") - # Navigate to prefix node after registration node = client for segment in (prefix or "").strip("/").split("/"): @@ -66,14 +53,14 @@ def _apply_tags(entry_node): if path.is_dir() and not path.suffix: # TIFF directory: Tiled registers each file flat into the prefix node for key in node: - _apply_tags(node[key]) + _apply_tags(entry_node=node[key], tags=tags) else: # .h5 or .zarr: registered as single node, keyed by stem # Even on COLLISION the entry exists — just try it directly entry_key = path.stem logger.info(f"Looking up entry key {entry_key!r} under {prefix!r}") try: - _apply_tags(node[entry_key]) + _apply_tags(entry_node=node[entry_key], tags=tags) except KeyError: # Key not found even after registration — log all available keys to diagnose available = sorted(node) @@ -83,6 +70,24 @@ def _apply_tags(entry_node): ) +@task(name="apply-tags", task_run_name="apply-tags-{tags}") +def _apply_tags(entry_node, tags: list[str]) -> None: + logger = get_run_logger() + existing_blob = entry_node.access_blob + existing_tags = (existing_blob or {}).get("tags", []) + merged_tags = list(set(existing_tags) | set(tags)) + op = "replace" if existing_blob is not None else "add" + try: + # entry_node.update_metadata(access_tags=merged_tags) + entry_node.patch_metadata( + access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], + ) + + logger.debug(f"Tagged {entry_node.uri} with {merged_tags}") + except Exception as e: + logger.debug(f"Could not tag {entry_node.uri}: {e}") + + @task(name="check-tiled-tags", task_run_name="check-tags-{path}") def check_tags( path: Path | str, @@ -162,7 +167,6 @@ async def register_to_tiled( if __name__ == "__main__": import asyncio - load_dotenv() h5 = Path(os.environ["EXAMPLE_H5_PATH"]) tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) @@ -177,5 +181,5 @@ async def register_to_tiled( asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) for path, prefix, expected in cases: - ok, actual = check_tags.fn(path, prefix, expected) + ok, actual = check_tags(path, prefix, expected) print(f"{'✓' if ok else '✗'} {path.name}: expected={sorted(expected)} actual={actual}") From 187977cae2ba68117c5b2a5466433f031387fae9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:15:49 -0700 Subject: [PATCH 17/33] Adding register_file_to_tiled() call to move.py --- orchestration/flows/bl832/move.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index de5d30f9..14b645d7 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +from orchestration.tiled import register_file_to_tiled from orchestration.transfer_controller import CopyMethod, get_transfer_controller @@ -170,7 +171,14 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") - # TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs + tiled_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+relative_path, + prefix="beamlines/bl832/raw", + overwrite=False, + tags=["raw", "bl832"], + ) + # TODO: find proposal id in h5, make that a tag + tiled_future.result() # wait for registration to complete before scheduling deletes logger.info("Initializing prune controller") prune_controller = get_prune_controller( From 254a1b9962ff56cc52a7d4a212e235a1fed5a49d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:24:31 -0700 Subject: [PATCH 18/33] Adding calls to register_file_to_tiled() from nersc_recon_flow() for tiffs and zarr --- orchestration/flows/bl832/nersc.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index f4ffc9fb..c27cabf4 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -18,12 +18,14 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController from orchestration.mlflow import get_checkpoint_info -from orchestration.prune_controller import get_prune_controller, PruneMethod -from orchestration.transfer_controller import globus_transfer_task from orchestration.flows.bl832.streaming_mixin import ( NerscStreamingMixin, SlurmJobBlock, cancellation_hook, monitor_streaming_job, save_block ) from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import get_prune_controller, PruneMethod +from orchestration.tiled import register_file_to_tiled +from orchestration.transfer_controller import globus_transfer_task + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -1710,6 +1712,24 @@ def nersc_recon_flow( zarr_file_path=zarr_file_path ) + # Register the reconstructed TIFFs in tiled + tiled_tiffs_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+tiff_file_path, + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["scratch", "bl832"], + ) + tiled_tiffs_future = tiled_tiffs_future.result() + + # Register the reconstructed ZARRs in tiled + tiled_zarr_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+zarr_file_path, + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["scratch", "bl832"], + ) + tiled_zarr_future = tiled_zarr_future.result() + # TODO: Ingest into SciCat if nersc_reconstruction_success: return True From 61840135922dc9ad591ac5f008a48653e8240e2d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:24:57 -0700 Subject: [PATCH 19/33] Adding TILED_URI to .env.example --- .env.example | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.env.example b/.env.example index e3728e89..458e71e0 100644 --- a/.env.example +++ b/.env.example @@ -4,4 +4,5 @@ PREFECT_API_URL= PREFECT_API_KEY= PUSHGATEWAY_URL= JOB_NAME= -INSTANCE_LABEL= \ No newline at end of file +INSTANCE_LABEL= +TILED_URI= \ No newline at end of file From 9b6579a9fe4b7d026aff40b82a66c8fe245bb5ef Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:42:19 -0700 Subject: [PATCH 20/33] Adding BEAMLINE=<> to .env.example --- .env.example | 1 + 1 file changed, 1 insertion(+) diff --git a/.env.example b/.env.example index 458e71e0..9db497e7 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,4 @@ +BEAMLINE=<832> GLOBUS_CLIENT_ID= GLOBUS_CLIENT_SECRET= PREFECT_API_URL= From 21536f89e7c3aa4e0e2613b1afb8422120cd3997 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:13:16 -0700 Subject: [PATCH 21/33] Making register_file_to_tiled() sync to fix async/sync nesting. Example: dispatcher (async) -> move.py (sync) -> register_file_to_tiled (async). Using Prefect's built in helper from prefect.utilities.asyncutils import run_coro_as_sync to run the async Tiled register() function synchronously --- orchestration/flows/bl832/move.py | 5 ++--- orchestration/flows/bl832/nersc.py | 6 ++---- orchestration/tiled.py | 26 ++++++++++++++------------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 14b645d7..73709d04 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -171,14 +171,13 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") - tiled_future = register_file_to_tiled( - path=config.beegfs_raw.root_path+relative_path, + register_file_to_tiled( + path=Path(config.beegfs_raw.root_path+relative_path), prefix="beamlines/bl832/raw", overwrite=False, tags=["raw", "bl832"], ) # TODO: find proposal id in h5, make that a tag - tiled_future.result() # wait for registration to complete before scheduling deletes logger.info("Initializing prune controller") prune_controller = get_prune_controller( diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index c27cabf4..5d54a4e1 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1713,22 +1713,20 @@ def nersc_recon_flow( ) # Register the reconstructed TIFFs in tiled - tiled_tiffs_future = register_file_to_tiled( + register_file_to_tiled( path=config.beegfs_raw.root_path+tiff_file_path, prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) - tiled_tiffs_future = tiled_tiffs_future.result() # Register the reconstructed ZARRs in tiled - tiled_zarr_future = register_file_to_tiled( + register_file_to_tiled( path=config.beegfs_raw.root_path+zarr_file_path, prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) - tiled_zarr_future = tiled_zarr_future.result() # TODO: Ingest into SciCat if nersc_reconstruction_success: diff --git a/orchestration/tiled.py b/orchestration/tiled.py index bd2f58af..90687c7b 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -8,6 +8,7 @@ from pathlib import Path from prefect import flow, get_run_logger, task +from prefect.utilities.asyncutils import run_coro_as_sync from tiled.client import from_uri from tiled.client.register import register @@ -16,24 +17,27 @@ @task(name="register-file-to-tiled", task_run_name="register-{path}") -async def register_file_to_tiled( - path: Path, +def register_file_to_tiled( + path: Path | str, prefix: str | None = None, overwrite: bool = False, tags: list[str] | None = None, ) -> None: logger = get_run_logger() + path = Path(path) tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: - await register( - node=client, - path=path, - prefix=prefix or "/", - overwrite=overwrite, + run_coro_as_sync( # Bridge synchronous Prefect task to async Tiled client method + register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite, + ) ) except Exception as e: raise RuntimeError( @@ -144,7 +148,7 @@ def check_tags( @flow(name="register-to-tiled", flow_run_name="register-{path}") -async def register_to_tiled( +def register_to_tiled( path: Path | str, prefix: str | None = None, overwrite: bool = False, @@ -161,12 +165,10 @@ async def register_to_tiled( logger = get_run_logger() path = Path(path) logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") - await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) + register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) if __name__ == "__main__": - import asyncio - h5 = Path(os.environ["EXAMPLE_H5_PATH"]) tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) @@ -178,7 +180,7 @@ async def register_to_tiled( ] for path, prefix, tags in cases: - asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) + register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False) for path, prefix, expected in cases: ok, actual = check_tags(path, prefix, expected) From 9b7470a21967d7c1599d7b34802c33eca5ce15ce Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:53:26 -0700 Subject: [PATCH 22/33] adding beegfs copy to nersc.py --- orchestration/flows/bl832/nersc.py | 35 +++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 5d54a4e1..adeb4cec 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1697,24 +1697,31 @@ def nersc_recon_flow( config=config, ) + logger.info("Copy from NERSC /global/cfs/cdirs/als/data_mover/8.3.2/scratch to beegfs") + nersc_to_beegfs_tiff_future = globus_transfer_task.submit( + file_path=tiff_file_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.beegfs_scratch + ) + + nersc_to_beegfs_zarr_future = globus_transfer_task.submit( + file_path=zarr_file_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.beegfs_scratch + ) + # Resolve before pruning (which needs to know what landed where) pscratch_to_cfs_tiff_future.result() pscratch_to_cfs_zarr_future.result() pscratch_to_data832_tiff_future.result() pscratch_to_data832_zarr_future.result() + nersc_to_beegfs_tiff_future.result() + nersc_to_beegfs_zarr_future.result() logger.info("All transfers complete.") - logger.info("Scheduling pruning tasks.") - schedule_pruning( - config=config, - raw_file_path=file_path, - tiff_file_path=tiff_file_path, - zarr_file_path=zarr_file_path - ) - # Register the reconstructed TIFFs in tiled register_file_to_tiled( - path=config.beegfs_raw.root_path+tiff_file_path, + path=Path(config.beegfs_scratch.root_path+tiff_file_path), prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], @@ -1722,12 +1729,20 @@ def nersc_recon_flow( # Register the reconstructed ZARRs in tiled register_file_to_tiled( - path=config.beegfs_raw.root_path+zarr_file_path, + path=Path(config.beegfs_scratch.root_path+zarr_file_path), prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) + logger.info("Scheduling pruning tasks.") + schedule_pruning( + config=config, + raw_file_path=file_path, + tiff_file_path=tiff_file_path, + zarr_file_path=zarr_file_path + ) + # TODO: Ingest into SciCat if nersc_reconstruction_success: return True From 10bd2aa22c80e848e4b5359f65d6c16cd521babe Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:55:22 -0700 Subject: [PATCH 23/33] create unit tests in test_tiled.py --- orchestration/_tests/test_tiled.py | 471 +++++++++++++++++++++++++++++ 1 file changed, 471 insertions(+) create mode 100644 orchestration/_tests/test_tiled.py diff --git a/orchestration/_tests/test_tiled.py b/orchestration/_tests/test_tiled.py new file mode 100644 index 00000000..deca5bc0 --- /dev/null +++ b/orchestration/_tests/test_tiled.py @@ -0,0 +1,471 @@ +"""Unit tests for ``orchestration.tiled``. + +Tests run inside a Prefect ephemeral server (``prefect_test_harness``) so the +real task engine is exercised, but Tiled itself is mocked — no real Tiled +server, no network calls. + +Coverage: + - ``register_file_to_tiled``: h5/zarr branch, TIFF directory branch, + no-tags skip, ``str`` path coercion, missing-entry warning, register failure. + - ``_apply_tags``: existing access_blob (replace), no blob (add), tag + deduplication, ``patch_metadata`` failure swallowed. + - ``check_tags``: h5 ok, h5 tag-missing, TIFF dir uses first child, + no access_blob, missing entry raises ``KeyError``. + - ``register_to_tiled`` flow: delegates with the right args, coerces ``str``. + +The async/sync bridge inside ``register_file_to_tiled`` (``run_coro_as_sync`` +calling Tiled's async ``register``) is exercised on every happy path: tests +patch ``register`` as an ``AsyncMock`` and let ``run_coro_as_sync`` actually +drive it. The error-path test patches ``run_coro_as_sync`` directly to inject +a failure. +""" +import warnings + +from prefect.testing.utilities import prefect_test_harness +import pytest +from pytest_mock import MockFixture + +from orchestration.tiled import ( + _apply_tags, + check_tags, + register_file_to_tiled, + register_to_tiled, +) + +warnings.filterwarnings("ignore", category=DeprecationWarning) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """Provide an ephemeral Prefect server for the whole test session.""" + with prefect_test_harness(): + yield + + +@pytest.fixture(autouse=True) +def _tiled_uri_env(monkeypatch): + """Set ``TILED_URI`` so ``os.environ[...]`` reads in the module don't error.""" + monkeypatch.setenv("TILED_URI", "http://tiled.test") + + +# --------------------------------------------------------------------------- +# Mock Tiled node +# --------------------------------------------------------------------------- + +class MockNode: + """Stand-in for a Tiled node. + + Strict by design: ``node[missing_key]`` raises ``KeyError`` to match real + Tiled behavior, so tests for the missing-entry warning path actually work. + Build prefix chains explicitly with ``add_child`` or the + ``build_prefix_chain`` helper below. + """ + + def __init__(self, access_blob=None, uri="http://tiled.test/node"): + self.access_blob = access_blob + self.uri = uri + self._children: dict[str, "MockNode"] = {} + self.patch_calls: list[dict] = [] + + def add_child(self, key: str, node: "MockNode") -> "MockNode": + self._children[key] = node + return node + + def __getitem__(self, key): + # Strict: missing keys raise like a real Tiled node would + return self._children[key] + + def __iter__(self): + return iter(self._children) + + def patch_metadata(self, **kwargs): + self.patch_calls.append(kwargs) + + +def build_prefix_chain(segments: list[str], leaf: MockNode) -> MockNode: + """Build ``client[seg1][seg2]...[segN] -> leaf`` and return the root. + + Used to mirror the prefix-navigation loop in ``register_file_to_tiled`` + and ``check_tags`` (``for segment in prefix.strip("/").split("/")``). + """ + current = leaf + for segment in reversed(segments): + parent = MockNode() + parent.add_child(segment, current) + current = parent + return current + + +# --------------------------------------------------------------------------- +# _apply_tags +# --------------------------------------------------------------------------- + +def test_apply_tags_uses_add_op_when_no_existing_blob(): + """No access_blob present → JSON Patch op is ``add``.""" + node = MockNode(access_blob=None) + _apply_tags(entry_node=node, tags=["bl832"]) + + assert len(node.patch_calls) == 1 + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "add" + assert patch["path"] == "" + assert set(patch["value"]["tags"]) == {"bl832"} + + +def test_apply_tags_uses_replace_op_when_blob_exists(): + """Existing access_blob → op is ``replace`` and tags are merged.""" + node = MockNode(access_blob={"tags": ["existing"]}) + _apply_tags(entry_node=node, tags=["bl832"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "replace" + assert set(patch["value"]["tags"]) == {"existing", "bl832"} + + +def test_apply_tags_deduplicates_overlapping_tags(): + """Tag merging is a set union, so overlap doesn't produce duplicates.""" + node = MockNode(access_blob={"tags": ["bl832", "old"]}) + _apply_tags(entry_node=node, tags=["bl832", "new"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + merged = patch["value"]["tags"] + assert set(merged) == {"bl832", "old", "new"} + assert len(merged) == 3 # no duplicates + + +def test_apply_tags_swallows_patch_metadata_failure(mocker: MockFixture): + """If ``patch_metadata`` raises, ``_apply_tags`` logs but does not propagate.""" + node = MockNode(access_blob={"tags": []}) + mock_patch_metadata = mocker.patch.object( + node, "patch_metadata", side_effect=RuntimeError("permission denied") + ) + + # Should not raise + _apply_tags(entry_node=node, tags=["bl832"]) + + # Verify the call actually happened — the test is about swallowing, not skipping + mock_patch_metadata.assert_called_once() + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — happy paths (exercise run_coro_as_sync bridge) +# --------------------------------------------------------------------------- + +def test_register_h5_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """For an .h5 file, the entry keyed by ``path.stem`` should be tagged.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw", "bl832"], + ) + + fake_register.assert_awaited_once() + assert len(entry_node.patch_calls) == 1 + tags_written = entry_node.patch_calls[0]["access_blob_patch"][0]["value"]["tags"] + assert set(tags_written) == {"raw", "bl832"} + + +def test_register_zarr_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """A .zarr store follows the same stem-keyed lookup as .h5.""" + zarr = tmp_path / "sample.zarr" + # Real .zarr is a directory, but the code path is suffix-driven, so a file is fine + zarr.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("sample", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=zarr, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_no_tags_skips_apply_tags(mocker: MockFixture, tmp_path): + """When ``tags`` is None/empty, the tag-application branch is skipped entirely.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + prefix_node = MockNode() + mocker.patch("orchestration.tiled.from_uri", return_value=prefix_node) + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled(path=h5) + + fake_register.assert_awaited_once() + assert prefix_node.patch_calls == [] + + +def test_register_string_path_is_coerced(mocker: MockFixture, tmp_path): + """Passing ``path`` as a ``str`` must be coerced to ``Path``. + + Regression test: without ``Path(path)``, the ``path.is_dir()`` and + ``path.suffix`` calls explode with ``AttributeError``. + """ + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + # Pass a string, not a Path — would fail without the Path(path) coercion + register_file_to_tiled( + path=str(h5), + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_tiff_dir_tags_each_child(mocker: MockFixture, tmp_path): + """A directory with no suffix should tag every child under the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + (tiff_dir / "frame_0001.tiff").touch() + + child_a = MockNode(access_blob=None) + child_b = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", child_a) + prefix_node.add_child("frame_0001", child_b) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(child_a.patch_calls) == 1 + assert len(child_b.patch_calls) == 1 + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — edge cases +# --------------------------------------------------------------------------- + +def test_register_missing_entry_after_register_logs_warning(mocker: MockFixture, tmp_path): + """If ``node[stem]`` raises ``KeyError`` post-register, no exception leaks and a warning is logged.""" + h5 = tmp_path / "missing.h5" + h5.touch() + + # Prefix node has *some* entries but not the one we'll look up + prefix_node = MockNode() + prefix_node.add_child("other-entry", MockNode()) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + mock_logger = mocker.patch("orchestration.tiled.get_run_logger") + + # Should not raise — the KeyError is caught and logged with available keys + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + mock_logger.return_value.warning.assert_called_once() + warning_msg = mock_logger.return_value.warning.call_args[0][0] + assert "missing" in warning_msg # stem of the file + assert "other-entry" in warning_msg # available keys listed in the message + + +def test_register_raises_runtime_error_on_failure(mocker: MockFixture, tmp_path): + """If the bridge or Tiled's ``register`` raises, wrap it in ``RuntimeError``.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + mocker.patch("orchestration.tiled.from_uri", return_value=MockNode()) + mocker.patch( + "orchestration.tiled.run_coro_as_sync", + side_effect=Exception("connection refused"), + ) + + with pytest.raises(RuntimeError, match="Failed to register .* connection refused"): + register_file_to_tiled(path=h5) + + +# --------------------------------------------------------------------------- +# check_tags +# --------------------------------------------------------------------------- + +def test_check_tags_returns_true_when_all_expected_present(mocker: MockFixture, tmp_path): + """Subset semantics: ok iff expected_tags <= actual_tags.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832", "raw", "extra"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is True + assert set(actual) == {"bl832", "raw", "extra"} + + +def test_check_tags_returns_false_when_tag_missing(mocker: MockFixture, tmp_path): + """A single missing expected tag flips ok to False.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is False + assert actual == ["bl832"] + + +def test_check_tags_tiff_dir_uses_first_child(mocker: MockFixture, tmp_path): + """For a TIFF dir, ``check_tags`` reads the first child of the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + + first_child = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", first_child) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + expected_tags={"bl832"}, + ) + + assert ok is True + assert actual == ["bl832"] + + +def test_check_tags_no_access_blob_returns_empty(mocker: MockFixture, tmp_path): + """When ``access_blob`` is None, actual tags should be ``[]``.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + assert ok is False + assert actual == [] + + +def test_check_tags_missing_entry_raises_key_error(mocker: MockFixture, tmp_path): + """If the stem isn't present under the prefix, the lookup raises ``KeyError``.""" + h5 = tmp_path / "missing.h5" + h5.touch() + + prefix_node = MockNode() + prefix_node.add_child("other", MockNode()) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + with pytest.raises(KeyError): + check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + +# --------------------------------------------------------------------------- +# register_to_tiled flow +# --------------------------------------------------------------------------- + +def test_register_to_tiled_flow_delegates_to_task(mocker: MockFixture, tmp_path): + """The flow is a thin wrapper that just calls the task with the same args.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled( + path=h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + mock_task.assert_called_once_with( + h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + +def test_register_to_tiled_flow_coerces_string_path(mocker: MockFixture): + """Passing a string should be coerced to ``Path`` before forwarding.""" + from pathlib import Path + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled(path="/data/sample.h5", prefix="any") + + forwarded_path = mock_task.call_args.args[0] + assert isinstance(forwarded_path, Path) + assert forwarded_path == Path("/data/sample.h5") From 239269c2688c64eb4ffd05059847874a42482c7c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 10:48:18 -0700 Subject: [PATCH 24/33] Only transferring Zarrs to Beegfs and ingesting into Tiled; not transferring/ingesting raw or reconstructed tiffs --- orchestration/flows/bl832/move.py | 44 ++++++++++++++++-------------- orchestration/flows/bl832/nersc.py | 23 +++++++++++----- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 73709d04..1d9ea158 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,8 +12,8 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics -from orchestration.tiled import register_file_to_tiled -from orchestration.transfer_controller import CopyMethod, get_transfer_controller +# from orchestration.tiled import register_file_to_tiled +# from orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") @@ -158,25 +158,27 @@ def process_new_832_file_task( except Exception as e: logger.error(f"SciCat ingest failed with {e}") - transfer_controller = get_transfer_controller( - transfer_type=CopyMethod.GLOBUS, - config=config, - prometheus_metrics=None - ) - - transfer_controller.copy( - file_path=relative_path, - source=config.data832, - destination=config.beegfs_raw - ) - logger.info(f"File successfully transferred from data832 to beegfs {file_path}") - - register_file_to_tiled( - path=Path(config.beegfs_raw.root_path+relative_path), - prefix="beamlines/bl832/raw", - overwrite=False, - tags=["raw", "bl832"], - ) + # Holding off from moving and registering Raw Data to Beegfs Tiled for storage concerns. + + # transfer_controller = get_transfer_controller( + # transfer_type=CopyMethod.GLOBUS, + # config=config, + # prometheus_metrics=None + # ) + + # transfer_controller.copy( + # file_path=relative_path, + # source=config.data832, + # destination=config.beegfs_raw + # ) + # logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + + # register_file_to_tiled( + # path=Path(config.beegfs_raw.root_path+relative_path), + # prefix="beamlines/bl832/raw", + # overwrite=False, + # tags=["raw", "8.3.2"], + # ) # TODO: find proposal id in h5, make that a tag logger.info("Initializing prune controller") diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index adeb4cec..f7b114d6 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1698,11 +1698,20 @@ def nersc_recon_flow( ) logger.info("Copy from NERSC /global/cfs/cdirs/als/data_mover/8.3.2/scratch to beegfs") - nersc_to_beegfs_tiff_future = globus_transfer_task.submit( - file_path=tiff_file_path, - source=config.nersc832_alsdev_pscratch_scratch, - destination=config.beegfs_scratch - ) + + # Holding off on copying tiffs to beegfs for now since they are large and we may not need them all. + # nersc_to_beegfs_tiff_future = globus_transfer_task.submit( + # file_path=tiff_file_path, + # source=config.nersc832_alsdev_pscratch_scratch, + # destination=config.beegfs_scratch + # ) + # Register the reconstructed TIFFs in tiled + # register_file_to_tiled( + # path=Path(config.beegfs_scratch.root_path+tiff_file_path), + # prefix="beamlines/bl832/scratch", + # overwrite=False, + # tags=["scratch", "8.3.2", folder_name], + # ) nersc_to_beegfs_zarr_future = globus_transfer_task.submit( file_path=zarr_file_path, @@ -1715,7 +1724,7 @@ def nersc_recon_flow( pscratch_to_cfs_zarr_future.result() pscratch_to_data832_tiff_future.result() pscratch_to_data832_zarr_future.result() - nersc_to_beegfs_tiff_future.result() + # nersc_to_beegfs_tiff_future.result() nersc_to_beegfs_zarr_future.result() logger.info("All transfers complete.") @@ -1732,7 +1741,7 @@ def nersc_recon_flow( path=Path(config.beegfs_scratch.root_path+zarr_file_path), prefix="beamlines/bl832/scratch", overwrite=False, - tags=["scratch", "bl832"], + tags=["scratch", "8.3.2", folder_name], ) logger.info("Scheduling pruning tasks.") From 332a02baf69f3984513ca2e579bc4d86aa46d404 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:07:15 -0700 Subject: [PATCH 25/33] pytest fixes to avoid a few warnings --- orchestration/_tests/test_globus_flow.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 352c09cb..4d7b2fca 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -204,10 +204,16 @@ async def mock_run_deployment(*args, **kwargs): # Mock asyncio.gather to avoid actual async task execution async def mock_gather(*args, **kwargs): + # Await any coroutines so we don't leak warnings, but don't care about results + for arg in args: + if asyncio.iscoroutine(arg): + try: + await arg + except Exception: + pass return [None] mocker.patch('asyncio.gather', new=mock_gather) - # Import decision flow after mocking the necessary components from orchestration.flows.bl832.dispatcher import dispatcher @@ -309,6 +315,11 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=True ) + mocker.patch( + "orchestration.flows.bl832.alcf.schedule_prefect_flow", + return_value=None + ) + file_path = "/global/raw/transfer_tests/test.h5" # ---------- CASE 1: SUCCESS PATH ---------- From 4d62c29e064730d942d39fb9e92b4cd916301d46 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:07:44 -0700 Subject: [PATCH 26/33] Adding zarr->beegfs transfer and tiled ingestion to the ALCF reconstruction flow --- orchestration/flows/bl832/alcf.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac2..3b6539b4 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -14,6 +14,7 @@ from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.prefect import schedule_prefect_flow +from splash_flows.orchestration.tiled import register_file_to_tiled class ALCFTomographyHPCController(TomographyHPCController): @@ -456,6 +457,23 @@ def alcf_recon_flow( destination=config.data832_scratch ) + beegfs_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_scratch, + destination=config.beegfs_scratch + ) + + if beegfs_zarr_transfer_success: + logger.info("Successfully transferred Zarr to beegfs. Now ingesting to Tiled.") + register_file_to_tiled( + path=Path(config.beegfs_scratch.root_path+scratch_path_zarr), + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["8.3.2", folder_name], + ) + else: + logger.error("Failed to transfer Zarr to beegfs, skipping registration to Tiled.") + # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False From 2b7dabf63aa3374fd941eceda0777340d771309a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:08:05 -0700 Subject: [PATCH 27/33] removing the scratch tag for tiled ingestion --- orchestration/flows/bl832/nersc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index f7b114d6..6e008474 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1741,7 +1741,7 @@ def nersc_recon_flow( path=Path(config.beegfs_scratch.root_path+zarr_file_path), prefix="beamlines/bl832/scratch", overwrite=False, - tags=["scratch", "8.3.2", folder_name], + tags=["8.3.2", folder_name], ) logger.info("Scheduling pruning tasks.") From 15028073c443c51388a6cb44fe5ad75f10696f31 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:11:30 -0700 Subject: [PATCH 28/33] fixing broken import --- orchestration/flows/bl832/alcf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 3b6539b4..28ac7813 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -14,7 +14,7 @@ from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.prefect import schedule_prefect_flow -from splash_flows.orchestration.tiled import register_file_to_tiled +from orchestration.tiled import register_file_to_tiled class ALCFTomographyHPCController(TomographyHPCController): From 6c703f08da0861d44353e48c09965bef102e420c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:31:59 -0700 Subject: [PATCH 29/33] Adding beegfs/bl832/scratch endpoint --- config.yml | 6 ++++++ orchestration/flows/bl832/config.py | 1 + 2 files changed, 7 insertions(+) diff --git a/config.yml b/config.yml index 6415328d..989609fd 100644 --- a/config.yml +++ b/config.yml @@ -145,6 +145,12 @@ globus: uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl832-beegfs-raw + bl832-beegfs-scratch: + root_path: /global/beegfs/beamlines/bl832/scratch/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-scratch + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index f92bc5ce..319182e9 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -28,6 +28,7 @@ def _beam_specific_config(self) -> None: self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] self.beegfs_raw = self.endpoints["bl832-beegfs-raw"] + self.beegfs_scratch = self.endpoints["bl832-beegfs-scratch"] # SciCat self.scicat = self.config["scicat"] # MLflow From ee355b4c22e58d70b103439f9a2dec8d7580607a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:33:02 -0700 Subject: [PATCH 30/33] adding beegfs endpoints to test_globus_flow.py --- orchestration/_tests/test_globus_flow.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 4d7b2fca..beb21965 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -150,7 +150,8 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_scratch")), "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), - "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")) + "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")), + "beegfs_scratch": MockEndpoint("mock_beegfs_scratch_path", MockSecret.for_endpoint("beegfs_scratch")) } # Mock apps @@ -171,6 +172,7 @@ def __init__(self) -> None: self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] self.beegfs_raw = self.endpoints["beegfs_raw"] + self.beegfs_scratch = self.endpoints["beegfs_scratch"] self.scicat = config["scicat"] @@ -259,6 +261,7 @@ def test_alcf_recon_flow(mocker: MockFixture): "alcf832_raw": mocker.MagicMock(), "alcf832_scratch": mocker.MagicMock(), "bl832-beegfs-raw": mocker.MagicMock(), + "bl832-beegfs-scratch": mocker.MagicMock() } ) mocker.patch( @@ -320,6 +323,12 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=None ) + # Patch ingestion to Tiled + mocker.patch( + "orchestration.flows.bl832.alcf.register_file_to_tiled", + return_value=None + ) + file_path = "/global/raw/transfer_tests/test.h5" # ---------- CASE 1: SUCCESS PATH ---------- @@ -329,7 +338,7 @@ def test_alcf_recon_flow(mocker: MockFixture): result = alcf_recon_flow(file_path=file_path, config=mock_config) assert result is True, "Flow should return True if HPC + Tiff->Zarr + transfers all succeed" - assert mock_transfer_controller.copy.call_count == 3, "Should do 3 transfers in success path" + assert mock_transfer_controller.copy.call_count == 4, "Should do 4 transfers in success path" mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() mock_schedule_pruning.assert_called_once() From 116bb97e60e0c30edbaaa14e9ad48d80277983da Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 20 May 2026 11:44:27 -0700 Subject: [PATCH 31/33] trying to reduce pytest warnings --- orchestration/_tests/test_tiled.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/orchestration/_tests/test_tiled.py b/orchestration/_tests/test_tiled.py index deca5bc0..58e1e2e8 100644 --- a/orchestration/_tests/test_tiled.py +++ b/orchestration/_tests/test_tiled.py @@ -279,38 +279,40 @@ def test_register_tiff_dir_tags_each_child(mocker: MockFixture, tmp_path): # --------------------------------------------------------------------------- def test_register_missing_entry_after_register_logs_warning(mocker: MockFixture, tmp_path): - """If ``node[stem]`` raises ``KeyError`` post-register, no exception leaks and a warning is logged.""" h5 = tmp_path / "missing.h5" h5.touch() - # Prefix node has *some* entries but not the one we'll look up prefix_node = MockNode() prefix_node.add_child("other-entry", MockNode()) client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) mocker.patch("orchestration.tiled.from_uri", return_value=client) mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) - mock_logger = mocker.patch("orchestration.tiled.get_run_logger") + + # Patch the specific logger instance returned by get_run_logger + # rather than the function itself, to avoid Prefect engine confusion + import logging + mock_logger = mocker.MagicMock(spec=logging.Logger) + mocker.patch("orchestration.tiled.get_run_logger", return_value=mock_logger) - # Should not raise — the KeyError is caught and logged with available keys register_file_to_tiled( path=h5, prefix="beamlines/bl832/raw", tags=["raw"], ) - mock_logger.return_value.warning.assert_called_once() - warning_msg = mock_logger.return_value.warning.call_args[0][0] - assert "missing" in warning_msg # stem of the file - assert "other-entry" in warning_msg # available keys listed in the message + mock_logger.warning.assert_called_once() + warning_msg = mock_logger.warning.call_args[0][0] + assert "missing" in warning_msg + assert "other-entry" in warning_msg def test_register_raises_runtime_error_on_failure(mocker: MockFixture, tmp_path): - """If the bridge or Tiled's ``register`` raises, wrap it in ``RuntimeError``.""" h5 = tmp_path / "scan.h5" h5.touch() mocker.patch("orchestration.tiled.from_uri", return_value=MockNode()) + mocker.patch("orchestration.tiled.register", return_value=None) # plain MagicMock, not AsyncMock mocker.patch( "orchestration.tiled.run_coro_as_sync", side_effect=Exception("connection refused"), From 210f9bc10fb4d134dd8db745056dae039c3be566 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 10:27:20 -0700 Subject: [PATCH 32/33] linting --- orchestration/_tests/test_tiled.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/_tests/test_tiled.py b/orchestration/_tests/test_tiled.py index 58e1e2e8..88ec8d6d 100644 --- a/orchestration/_tests/test_tiled.py +++ b/orchestration/_tests/test_tiled.py @@ -288,7 +288,7 @@ def test_register_missing_entry_after_register_logs_warning(mocker: MockFixture, mocker.patch("orchestration.tiled.from_uri", return_value=client) mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) - + # Patch the specific logger instance returned by get_run_logger # rather than the function itself, to avoid Prefect engine confusion import logging From 48403d01f102f6ea103ae123097d74c57838e284 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 26 May 2026 10:28:13 -0700 Subject: [PATCH 33/33] Adding TILED_URI and TILED_API_KEY to login helper script --- scripts/login_to_globus_and_prefect.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/login_to_globus_and_prefect.sh b/scripts/login_to_globus_and_prefect.sh index a38b629f..b54b1cca 100755 --- a/scripts/login_to_globus_and_prefect.sh +++ b/scripts/login_to_globus_and_prefect.sh @@ -18,4 +18,6 @@ export GLOBUS_CLI_CLIENT_SECRET="$GLOBUS_CLIENT_SECRET" export GLOBUS_COMPUTE_CLIENT_ID="$GLOBUS_CLIENT_ID" export GLOBUS_COMPUTE_CLIENT_SECRET="$GLOBUS_CLIENT_SECRET" export PREFECT_API_KEY="$PREFECT_API_KEY" -export PREFECT_API_URL="$PREFECT_API_URL" \ No newline at end of file +export PREFECT_API_URL="$PREFECT_API_URL" +export TILED_URI="$TILED_URI" +export TILED_API_KEY="$TILED_API_KEY" \ No newline at end of file