Skip to content

Commit 50b7aa7

Browse files
committed
Updating job number handlers
1 parent f19137e commit 50b7aa7

2 files changed

Lines changed: 132 additions & 74 deletions

File tree

src/murfey/server/feedback.py

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,46 @@
4545
)
4646
from murfey.util.processing_params import default_spa_parameters, motion_corrected_mrc
4747
from murfey.util.tomo import midpoint
48+
from gemmi import cif
49+
from pipeliner.star_keys import GENERAL_BLOCK, JOB_COUNTER
4850

4951
logger = logging.getLogger("murfey.server.feedback")
5052

5153

54+
def _current_pipeline_job_counter(visit_name: str) -> int:
55+
"""Return the next jobNNN Pipeliner will allocate for visit_name.
56+
57+
Reads the JOB_COUNTER value from default_pipeline.star so that
58+
SPA feedback decisions are anchored to Pipeliner's actual state instead
59+
of an independent integer counter that drifts.
60+
61+
Falls back to 7 (previous default) if the file is
62+
missing — this preserves the previous behaviour for non Doppio runs
63+
"""
64+
pipeline_file = Path(visit_name) / "default_pipeline.star"
65+
if not pipeline_file.is_file():
66+
return 7
67+
try:
68+
dp = cif.read_file(str(pipeline_file))
69+
block = dp.find_block(GENERAL_BLOCK)
70+
if block is None:
71+
return 7
72+
return int(block.find_value(JOB_COUNTER))
73+
except Exception:
74+
logger.warning(
75+
"Failed to read JOB_COUNTER from %s — falling back to legacy job number",
76+
pipeline_file,
77+
exc_info=True,
78+
)
79+
return 7
80+
81+
82+
def _visit_name_for_session(session_id: int, _db) -> str:
83+
"""Return the visit (project directory) for a Murfey session id."""
84+
session_row = _db.exec(select(db.Session).where(db.Session.id == session_id)).one()
85+
return session_row.visit
86+
87+
5288
try:
5389
_url = url(get_security_config())
5490
engine = create_engine(_url)
@@ -373,9 +409,8 @@ def _release_2d_hold(message: dict, _db):
373409
"recipes": ["em-spa-class2d"],
374410
}
375411
if first_class2d.complete:
376-
feedback_params.next_job += (
377-
4 if default_spa_parameters.do_icebreaker_jobs else 3
378-
)
412+
visit_name = _visit_name_for_session(message["session_id"], _db)
413+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
379414
feedback_params.rerun_class2d = False
380415
_db.add(feedback_params)
381416
if first_class2d.complete:
@@ -585,7 +620,9 @@ def _register_incomplete_2d_batch(message: dict, _db):
585620
_db.commit()
586621
_db.close()
587622
return
588-
feedback_params.next_job = 10 if default_spa_parameters.do_icebreaker_jobs else 7
623+
# Get next_job from the actual Pipeliner counter
624+
visit_name = _visit_name_for_session(message["session_id"], _db)
625+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
589626
feedback_params.hold_class2d = True
590627
relion_options = dict(relion_params)
591628
other_options = dict(feedback_params)
@@ -735,15 +772,8 @@ def _register_complete_2d_batch(message: dict, _db):
735772
murfey_ids, class2d_message["particles_file"], _app_id(pj_id, _db), _db
736773
)
737774
elif not feedback_params.class_selection_score:
738-
# For the first batch, start a container and set the database to wait
739-
job_number_after_first_batch = (
740-
10 if default_spa_parameters.do_icebreaker_jobs else 7
741-
)
742-
if (
743-
feedback_params.next_job is not None
744-
and feedback_params.next_job < job_number_after_first_batch
745-
):
746-
feedback_params.next_job = job_number_after_first_batch
775+
visit_name = _visit_name_for_session(message["session_id"], _db)
776+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
747777
if not feedback_params.star_combination_job:
748778
feedback_params.star_combination_job = feedback_params.next_job + (
749779
3 if default_spa_parameters.do_icebreaker_jobs else 2
@@ -815,14 +845,14 @@ def _register_complete_2d_batch(message: dict, _db):
815845
"processing_recipe", zocalo_message, new_connection=True
816846
)
817847
feedback_params.hold_class2d = True
818-
feedback_params.next_job += (
819-
4 if default_spa_parameters.do_icebreaker_jobs else 3
820-
)
848+
# next_job is re-anchored from Pipeliner on the next entry to this
849+
# function — no manual increment needed.
821850
_db.add(feedback_params)
822851
_db.commit()
823852
_db.close()
824853
else:
825-
# Send all other messages on to a container
854+
visit_name = _visit_name_for_session(message["session_id"], _db)
855+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
826856
if _db.exec(
827857
select(func.count(db.Class2DParameters.particles_file))
828858
.where(db.Class2DParameters.pj_id == pj_id)
@@ -889,9 +919,6 @@ def _register_complete_2d_batch(message: dict, _db):
889919
murfey.server._transport_object.send(
890920
"processing_recipe", zocalo_message, new_connection=True
891921
)
892-
feedback_params.next_job += (
893-
3 if default_spa_parameters.do_icebreaker_jobs else 2
894-
)
895922
_db.add(feedback_params)
896923
_db.commit()
897924
_db.close()
@@ -936,10 +963,9 @@ def _flush_class2d(
936963
.where(db.Class2DParameters.pj_id == pj_id)
937964
.where(db.Class2DParameters.complete)
938965
).all()
939-
if not feedback_params.next_job:
940-
feedback_params.next_job = (
941-
10 if default_spa_parameters.do_icebreaker_jobs else 7
942-
)
966+
# Check pipeliner counter
967+
visit_name = _visit_name_for_session(session_id, _db)
968+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
943969
if not feedback_params.star_combination_job:
944970
feedback_params.star_combination_job = feedback_params.next_job + (
945971
3 if default_spa_parameters.do_icebreaker_jobs else 2
@@ -1196,6 +1222,10 @@ def _register_3d_batch(message: dict, _db):
11961222
.visit
11971223
)
11981224

1225+
# Check Pipeliner's job counter
1226+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
1227+
other_options["next_job"] = feedback_params.next_job
1228+
11991229
provided_initial_model = _find_initial_model(visit_name, machine_config)
12001230
if provided_initial_model and not feedback_params.initial_model:
12011231
rescaled_initial_model_path = (
@@ -1218,7 +1248,6 @@ def _register_3d_batch(message: dict, _db):
12181248
class3d_dir = (
12191249
f"{class3d_message['class3d_dir']}{(feedback_params.next_job + 1):03}"
12201250
)
1221-
feedback_params.next_job += 1
12221251
_db.add(feedback_params)
12231252
_db.commit()
12241253

@@ -1253,7 +1282,6 @@ def _register_3d_batch(message: dict, _db):
12531282
_db.close()
12541283
elif not feedback_params.initial_model:
12551284
# For the first batch, start a container and set the database to wait
1256-
next_job = feedback_params.next_job
12571285
class3d_dir = (
12581286
f"{class3d_message['class3d_dir']}{(feedback_params.next_job + 1):03}"
12591287
)
@@ -1273,8 +1301,6 @@ def _register_3d_batch(message: dict, _db):
12731301
)
12741302

12751303
feedback_params.hold_class3d = True
1276-
next_job += 2
1277-
feedback_params.next_job = next_job
12781304
zocalo_message: dict = {
12791305
"parameters": {
12801306
"particles_file": class3d_message["particles_file"],
@@ -1534,6 +1560,11 @@ def _register_refinement(message: dict, _db):
15341560
db.ClassificationFeedbackParameters.pj_id == pj_id_params
15351561
)
15361562
).one()
1563+
1564+
# Re-anchor next_job to Pipeliner's actual counter so the predicted
1565+
# Refine3D / MaskCreate / PostProcess slots line up with reality.
1566+
visit_name = _visit_name_for_session(message["session_id"], _db)
1567+
feedback_params.next_job = _current_pipeline_job_counter(visit_name)
15371568
other_options = dict(feedback_params)
15381569

15391570
if feedback_params.hold_refine:
@@ -1564,7 +1595,6 @@ def _register_refinement(message: dict, _db):
15641595
.where(db.RefineParameters.tag == "symmetry")
15651596
).one()
15661597
except SQLAlchemyError:
1567-
next_job = feedback_params.next_job
15681598
refine_dir = f"{message['refine_dir']}{(feedback_params.next_job + 2):03}"
15691599
refined_grp_uuid = _murfey_id(message["program_id"], _db)[0]
15701600
refined_class_uuid = _murfey_id(message["program_id"], _db)[0]
@@ -1605,14 +1635,6 @@ def _register_refinement(message: dict, _db):
16051635
_db=_db,
16061636
)
16071637

1608-
if relion_options["symmetry"] == "C1":
1609-
# Extra Refine, Mask, PostProcess beyond for determined symmetry
1610-
next_job += 8
1611-
else:
1612-
# Select and Extract particles, then Refine, Mask, PostProcess
1613-
next_job += 5
1614-
feedback_params.next_job = next_job
1615-
16161638
zocalo_message: dict = {
16171639
"parameters": {
16181640
"refine_job_dir": refine_params.refine_dir,

src/murfey/util/processing_params.py

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
import os
13
from datetime import datetime
24
from functools import lru_cache
35
from pathlib import Path
@@ -6,55 +8,89 @@
68
from pydantic import BaseModel
79
from werkzeug.utils import secure_filename
810

9-
from gemmi import cif
10-
from pipeliner.star_keys import GENERAL_BLOCK, JOB_COUNTER
11+
from pipeliner.project_graph import ProjectGraph
1112

1213
from murfey.util.config import MachineConfig, get_machine_config
1314

14-
import os
15+
logger = logging.getLogger("murfey.util.processing_params")
1516

16-
def get_current_job_number(visit_name: str, machine_config: MachineConfig) -> int:
17-
if os.path.exists(visit_name):
18-
default_pipeline_path = os.path.join(visit_name, "default_pipeline.star")
19-
"""elif machine_config.processed_directory_name:
20-
core = Path(visit_name).parts[0]
21-
extra_path = machine_config.processed_extra_directory
22-
sub
23-
default_pipeline_path = (core
24-
/ machine_config.processed_directory_name
25-
/ sub_dataset
26-
/ extra_path)"""
27-
if os.path.exists(default_pipeline_path):
28-
dp = cif.read_file(default_pipeline_path)
29-
dp_job_counter = dp.find_block(GENERAL_BLOCK).find_value(JOB_COUNTER)
30-
current_counter = int(dp_job_counter)
31-
return current_counter
32-
33-
return 2 # Default to job002 if the file doesn't exist or the value is not found
3417

35-
def motion_corrected_mrc(
36-
input_movie: Path, visit_name: str, machine_config: MachineConfig
37-
):
38-
movie = os.path.basename(input_movie)
18+
_DEFAULT_MOTIONCORR_FALLBACK = "job002"
19+
20+
21+
@lru_cache(maxsize=16)
22+
def _job_dir_for_alias_cached(
23+
visit_name: str, alias: str, mtime_ns: int
24+
) -> Optional[str]:
25+
"""Read default_pipeline.star and return the jobNNN for the given alias.
3926
40-
""" if not os.path.exists(visit_name):
41-
parts = [secure_filename(p) for p in input_movie.parts]
42-
visit_idx = parts.index(visit_name)
43-
core = Path("/") / Path(*parts[: visit_idx + 1])
44-
ppath = Path("/") / Path(*parts)
45-
if machine_config.process_multiple_datasets:
46-
sub_dataset = ppath.relative_to(core).parts[0]
47-
else:
48-
sub_dataset = ""
49-
extra_path = machine_config.processed_extra_directory
27+
Returns None on any failure (missing file, graph read error, alias
28+
not found). The mtime_ns argument is a cache key — when Pipeliner rewrites
29+
default_pipeline.star its mtime changes and the next call falls through
30+
to a fresh read.
5031
"""
32+
project_dir = Path(visit_name)
33+
pipeline_file = project_dir / "default_pipeline.star"
34+
if not pipeline_file.is_file():
35+
return None
36+
try:
37+
with ProjectGraph(pipeline_dir=project_dir, read_only=True) as graph:
38+
for proc in graph.process_list:
39+
proc_alias = getattr(proc, "alias", None)
40+
if proc_alias and proc_alias.rstrip("/").endswith(alias):
41+
# proc.name is e.g. "MotionCorr/job003/"
42+
return Path(proc.name).name
43+
except Exception:
44+
logger.error(
45+
"ProjectGraph read failed while looking up alias %r in %s",
46+
alias,
47+
pipeline_file,
48+
exc_info=True,
49+
)
50+
return None
51+
return None
52+
5153

52-
#job_number = get_current_job_number(visit_name, machine_config)
54+
def _job_dir_for_alias(visit_name: str, alias: str) -> str:
55+
"""Return the Pipeliner jobNNN for alias in the given project.
5356
57+
visit_name must be an path to the project directory.
58+
Falls back to the positional default job002 and logs a warning so
59+
drift from the live pipeline is visible in the logs instead of silent.
60+
"""
61+
project_dir = Path(visit_name).resolve()
62+
pipeline_file = project_dir / "default_pipeline.star"
63+
try:
64+
mtime_ns = pipeline_file.stat().st_mtime_ns
65+
except FileNotFoundError:
66+
logger.warning(
67+
"default_pipeline.star missing at %s — falling back to %s for alias %r",
68+
pipeline_file,
69+
_DEFAULT_MOTIONCORR_FALLBACK,
70+
alias,
71+
)
72+
return _DEFAULT_MOTIONCORR_FALLBACK
73+
job_dir = _job_dir_for_alias_cached(str(project_dir), alias, mtime_ns)
74+
if job_dir is None:
75+
logger.warning(
76+
"Alias %r not found in %s — falling back to %s",
77+
alias,
78+
pipeline_file,
79+
_DEFAULT_MOTIONCORR_FALLBACK,
80+
)
81+
return _DEFAULT_MOTIONCORR_FALLBACK
82+
return job_dir
83+
84+
85+
def motion_corrected_mrc(
86+
input_movie: Path, visit_name: str, machine_config: MachineConfig
87+
):
88+
movie = os.path.basename(input_movie)
89+
job_dir = _job_dir_for_alias(visit_name, "Live_motioncorr")
5490
mrc_out = (
5591
Path(visit_name)
5692
/ "MotionCorr"
57-
/ f"job002"
93+
/ job_dir
5894
/ "Movies"
5995
/ str(movie + "_motion_corrected.mrc")
6096
)

0 commit comments

Comments
 (0)