Skip to content

Commit b0be54c

Browse files
Generalise multiplex triggering behaviour to enable multiplex clustering on additional beamlines (#336)
New 'trigger_every_collection' parameter to decide if multiplex is triggered after every collection or not. Clustering is now able to be output for either triggering pathway. Downstream recipes triggered by multiplex now merged into one recipe.
1 parent e18819e commit b0be54c

1 file changed

Lines changed: 62 additions & 44 deletions

File tree

src/dlstbx/services/trigger.py

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ class MultiplexParameters(pydantic.BaseModel):
222222
diffraction_plan_info: Optional[DiffractionPlanInfo] = None
223223
recipe: Optional[str] = None
224224
use_clustering: Optional[List[str]] = None
225+
beamline: str
226+
trigger_every_collection: bool
225227

226228

227229
class Xia2SsxReduceParameters(pydantic.BaseModel):
@@ -1736,17 +1738,22 @@ def trigger_multiplex(
17361738
will be created, and the resulting list of processingJobIds will be sent to
17371739
the `processing_recipe` queue.
17381740
1739-
If clustering algorithm is enabled, skip triggering multiplex if new related dcid
1740-
values are added into all defined sample groups to run multiplex only once when
1741-
all samples in one of the groups have been collected. When running multiplex
1742-
only include results from datasets collected prior to the current one in all
1743-
sample groups.
1741+
If clustering algorithm is enabled, further commandline options are added.
1742+
1743+
There are two ways to trigger multiplex controlled by trigger_every_collection.
1744+
Multiplex will either be triggered after every dials collection (useful for
1745+
beamlines that use multiplex for mid-experiment feedback), or if there are
1746+
no subsequent datasets still processing (vmxi). The latter equates to
1747+
"once per sample group" if the experiment is rapidly collecting with no
1748+
significant delays.
17441749
17451750
Recipe parameters:
17461751
- target: set this to "multiplex"
1752+
- beamline: the beamline as a string
17471753
- dcid: the dataCollectionId for the given data collection
17481754
- comment: a comment to be stored in the ProcessingJob.comment field
17491755
- automatic: boolean value passed to ProcessingJob.automatic field
1756+
- trigger_every_collection: decide triggering behaviour of multiplex
17501757
- ispyb_parameters: a dictionary of ispyb_reprocessing_parameters set in the
17511758
parent xia2-dials processing job
17521759
- related_dcids: a list of groups of related data collection ids. Each item in
@@ -1788,6 +1795,7 @@ def trigger_multiplex(
17881795
dcid = parameters.dcid
17891796
program_id = parameters.program_id
17901797
parameters.recipe = "postprocessing-xia2-multiplex"
1798+
output_clusters = False
17911799

17921800
# Take related dcids from recipe in preference or checkpointed message
17931801
if isinstance(related_dcid_group := message.get("related_dcid_group"), list):
@@ -1803,53 +1811,63 @@ def trigger_multiplex(
18031811
return {"success": True}
18041812

18051813
self.log.debug(f"related_dcids for dcid={dcid}: {related_dcids}")
1814+
1815+
# Turn on multiplex clustering
1816+
1817+
if (
1818+
parameters.use_clustering
1819+
and parameters.beamline in parameters.use_clustering
1820+
):
1821+
output_clusters = True
1822+
1823+
# For beamlines where multiplex is triggered alongside xia2-dials need extra checks
18061824
# Check if we have any new data collections added to any sample group
18071825
# to decide if we need to processed triggering multiplex.
18081826
# Run multiplex only once when processing for all samples in the group have been collected.
1809-
if parameters.use_clustering and parameters.program_id:
1827+
1828+
if parameters.trigger_every_collection:
1829+
self.log.info("Triggering xia2.multiplex after every data collection.")
1830+
1831+
else:
1832+
self.log.info("Checking for subsequent dcids that are still processing.")
1833+
18101834
# Get currnent list of data collections for all samples in the sample groups
18111835
_, ispyb_info = dlstbx.ispybtbx.ispyb_filter(
18121836
{}, {"ispyb_dcid": dcid}, session
18131837
)
18141838
ispyb_related_dcids = ispyb_info.get("ispyb_related_dcids", [])
1815-
beamline = ispyb_info.get("ispyb_beamline", "")
1816-
visit = ispyb_info.get("ispyb_visit", "")
1817-
if beamline in parameters.use_clustering or any(
1818-
el in visit for el in parameters.use_clustering
1819-
):
1820-
parameters.recipe = "postprocessing-xia2-multiplex-clustering"
1821-
# If we have a sample group that doesn't have any new data collections,
1822-
# proceed with triggering multiplex for all sample groups
1823-
if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids):
1824-
added_dcids = []
1825-
for el in ispyb_related_dcids:
1826-
added_dcids.extend([d for d in el.get("dcids", []) if d > dcid])
1827-
# Check if there are xia2 dials jobs that were triggered on any new
1828-
# data collections after current multiplex job was triggered
1829-
min_start_time = datetime.now() - timedelta(hours=12)
1830-
query = (
1831-
(
1832-
session.query(
1833-
AutoProcProgram, ProcessingJob.dataCollectionId
1834-
).join(
1835-
ProcessingJob,
1836-
ProcessingJob.processingJobId
1837-
== AutoProcProgram.processingJobId,
1838-
)
1839+
# If we have a sample group that doesn't have any new data collections,
1840+
# proceed with triggering multiplex for all sample groups
1841+
if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids):
1842+
added_dcids = []
1843+
for el in ispyb_related_dcids:
1844+
added_dcids.extend([d for d in el.get("dcids", []) if d > dcid])
1845+
# Check if there are xia2 dials jobs that were triggered on any new
1846+
# data collections after current multiplex job was triggered
1847+
min_start_time = datetime.now() - timedelta(hours=12)
1848+
query = (
1849+
(
1850+
session.query(
1851+
AutoProcProgram, ProcessingJob.dataCollectionId
1852+
).join(
1853+
ProcessingJob,
1854+
ProcessingJob.processingJobId
1855+
== AutoProcProgram.processingJobId,
18391856
)
1840-
.filter(ProcessingJob.dataCollectionId.in_(added_dcids))
1841-
.filter(ProcessingJob.automatic == True) # noqa E712
1842-
.filter(AutoProcProgram.processingPrograms == "xia2 dials")
1843-
.filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711
1844-
.filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711
18451857
)
1846-
# Abort triggering multiplex if we have xia2 dials running on any subsequent
1847-
# data collection in all sample groups
1848-
if triggered_processing_job := query.first():
1849-
self.log.info(
1850-
f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}"
1851-
)
1852-
return {"success": True}
1858+
.filter(ProcessingJob.dataCollectionId.in_(added_dcids))
1859+
.filter(ProcessingJob.automatic == True) # noqa E712
1860+
.filter(AutoProcProgram.processingPrograms == "xia2 dials")
1861+
.filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711
1862+
.filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711
1863+
)
1864+
# Abort triggering multiplex if we have xia2 dials running on any subsequent
1865+
# data collection in all sample groups
1866+
if triggered_processing_job := query.first():
1867+
self.log.info(
1868+
f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}"
1869+
)
1870+
return {"success": True}
18531871

18541872
# Calculate message delay for exponential backoff in case a processing
18551873
# program for a related data collection is still running, in which case
@@ -2065,7 +2083,7 @@ def trigger_multiplex(
20652083
self.log.debug(set_dcids)
20662084
self.log.debug(multiplex_job_dcids)
20672085
# Check if upstream dials job has succeeded when we run multiplex per data collection
2068-
if ("clustering" not in parameters.recipe) and (dcid not in set_dcids):
2086+
if (not output_clusters) and (dcid not in set_dcids):
20692087
self.log.info(
20702088
f"Skipping xia2.multiplex trigger: upstream dials job failed for dcid={dcid} group={group}"
20712089
)
@@ -2136,7 +2154,7 @@ def trigger_multiplex(
21362154
("absorption_level", "high"),
21372155
]
21382156
)
2139-
if "clustering" in parameters.recipe:
2157+
if output_clusters:
21402158
job_parameters.extend(
21412159
[
21422160
("clustering.method", "coordinate"),

0 commit comments

Comments
 (0)