@@ -49,14 +49,14 @@ def create_metrics(self):
4949
5050class PanDDAParameters (pydantic .BaseModel ):
5151 dcid : int = pydantic .Field (gt = 0 )
52- prerun_threshold : int = pydantic .Field (default = 300 )
52+ comparator_threshold : int = pydantic .Field (default = 300 )
5353 automatic : Optional [bool ] = False
5454 comment : Optional [str ] = None
5555 scaling_id : list [int ]
5656 timeout : float = pydantic .Field (default = 60 , alias = "timeout-minutes" )
57- backoff_delay : float = pydantic .Field (default = 30 , alias = "backoff-delay" )
58- backoff_max_try : int = pydantic .Field (default = 10 , alias = "backoff-max-try" )
59- backoff_multiplier : float = pydantic .Field (default = 1.4 , alias = "backoff-multiplier" )
57+ backoff_delay : float = pydantic .Field (default = 45 , alias = "backoff-delay" )
58+ backoff_max_try : int = pydantic .Field (default = 30 , alias = "backoff-max-try" )
59+ backoff_multiplier : float = pydantic .Field (default = 1.0 , alias = "backoff-multiplier" )
6060
6161
6262class PanDDA_PostParameters (pydantic .BaseModel ):
@@ -68,15 +68,6 @@ class PanDDA_PostParameters(pydantic.BaseModel):
6868 timeout : float = pydantic .Field (default = 60 , alias = "timeout-minutes" )
6969
7070
71- class PanDDA_RhofitParameters (pydantic .BaseModel ):
72- dcid : int = pydantic .Field (gt = 0 )
73- datasets : str
74- automatic : Optional [bool ] = False
75- comment : Optional [str ] = None
76- scaling_id : list [int ]
77- processing_directory : str
78-
79-
8071class DLSTriggerXChem (CommonService ):
8172 """A service that creates and runs downstream processing jobs."""
8273
@@ -186,15 +177,15 @@ def trigger_pandda_xchem(
186177 - dcid: the dataCollectionId for the given data collection i.e. "{ispyb_dcid}"
187178 - pdb: the output pdb from dimple i.e. "{ispyb_results_directory}/dimple/final.pdb"
188179 - mtz: the output mtz from dimple i.e. "{ispyb_results_directory}/dimple/final.mtz"
189- - prerun_threshold : the minimum number of comparator datasets needed to begin PanDDA
180+ - comparator_threshold : the minimum number of comparator datasets needed to begin PanDDA
190181 - comment: a comment to be stored in the ProcessingJob.comment field
191182 - timeout-minutes: (optional) the max time (in minutes) allowed to wait for
192183 processing PanDDA jobs
193184 - automatic: boolean value passed to ProcessingJob.automatic field
194185 Example recipe parameters:
195186 { "target": "pandda_xchem",
196187 "dcid": 123456,
197- "prerun_threshold ": 300,
188+ "comparator_threshold ": 300,
198189 "scaling_id": [123456],
199190 "automatic": true,
200191 "comment": "PanDDA2 triggered by dimple",
@@ -220,10 +211,6 @@ def trigger_pandda_xchem(
220211 )
221212 return {"success" : True }
222213
223- self .log .debug (
224- f"proposal code is { proposal .proposalCode } , proposal number { proposal .proposalNumber } "
225- )
226-
227214 # TEMPORARY, OPENBIND TEST VISIT
228215 if proposal .proposalNumber not in {"42888" }:
229216 self .log .debug (
@@ -330,6 +317,24 @@ def trigger_pandda_xchem(
330317 "xia2.multiplex" ,
331318 ] # will consider dimple output from these jobs to take forward
332319
320+ query = (
321+ session .query (AutoProcProgram .processingPrograms )
322+ .join (
323+ AutoProc ,
324+ AutoProcProgram .autoProcProgramId == AutoProc .autoProcProgramId ,
325+ )
326+ .join (
327+ AutoProcScaling ,
328+ AutoProc .autoProcId == AutoProcScaling .autoProcId ,
329+ )
330+ ).filter (AutoProcScaling .autoProcScalingId == scaling_id )
331+
332+ if query .first ()[0 ] == "fast_dp" :
333+ self .log .info (
334+ "Aborting PanDDA2 trigger as upstream processingProgram is fast_dp"
335+ )
336+ return {"success" : True }
337+
333338 # If other dimple/PanDDA2 job is running, quit, dimple set to trigger even if it fails
334339 min_start_time = datetime .now () - timedelta (hours = 12 )
335340
@@ -358,6 +363,28 @@ def trigger_pandda_xchem(
358363 )
359364 return {"success" : True }
360365
366+ # Stop-gap
367+ min_start_time = datetime .now () - timedelta (minutes = 20 )
368+
369+ query = (
370+ (
371+ session .query (AutoProcProgram , ProcessingJob .dataCollectionId ).join (
372+ ProcessingJob ,
373+ ProcessingJob .processingJobId == AutoProcProgram .processingJobId ,
374+ )
375+ )
376+ .filter (ProcessingJob .dataCollectionId == dcid )
377+ .filter (ProcessingJob .automatic == True ) # noqa E711
378+ .filter (AutoProcProgram .processingPrograms .in_ (["PanDDA2" ]))
379+ .filter (AutoProcProgram .recordTimeStamp > min_start_time )
380+ )
381+
382+ if triggered_processing_job := query .first ():
383+ self .log .info (
384+ "Aborting PanDDA2 trigger as another PanDDA2 job was recently launched"
385+ )
386+ return {"success" : True }
387+
361388 # Now check if other upstream pipeline is running and if so, checkpoint (it might fail)
362389 query = (
363390 (
@@ -586,9 +613,9 @@ def trigger_pandda_xchem(
586613 compound_dir = dataset_dir / "compound"
587614 self .log .info (f"Creating directory { dataset_dir } " )
588615 pathlib .Path (compound_dir ).mkdir (parents = True , exist_ok = True )
589- dataset_list = [p .parts [- 1 ] for p in model_dir .iterdir () if p .is_dir ()]
616+ dataset_list = sorted ( [p .parts [- 1 ] for p in model_dir .iterdir () if p .is_dir ()])
590617 dataset_count = sum (1 for p in model_dir .iterdir () if p .is_dir ())
591- self .log .info (f"Dataset_count is: { dataset_count } " )
618+ self .log .info (f"Dataset count is: { dataset_count } " )
592619
593620 # Copy the dimple files of the selected dataset
594621 shutil .copy (pdb , str (dataset_dir / "dimple.pdb" ))
@@ -599,34 +626,35 @@ def trigger_pandda_xchem(
599626
600627 # 4. Job launch logic
601628
602- prerun_threshold = parameters .prerun_threshold
629+ comparator_threshold = parameters .comparator_threshold
603630
604- if dataset_count < prerun_threshold :
631+ if dataset_count < comparator_threshold :
605632 self .log .info (
606- f"Dataset dataset_count { dataset_count } < PanDDA2 comparator dataset threshold of { prerun_threshold } , skipping for now..."
633+ f"Dataset dataset_count { dataset_count } < PanDDA2 comparator dataset threshold of { comparator_threshold } , skipping for now..."
607634 )
608635 return {"success" : True }
609- elif dataset_count == prerun_threshold :
610- datasets = dataset_list # list of datasets to process
636+ elif dataset_count == comparator_threshold :
637+ n_datasets = len (dataset_list )
638+ with open (model_dir / ".batch.json" , "w" ) as f :
639+ json .dump (dataset_list , f )
611640 self .log .info (
612- f"Dataset dataset_count { dataset_count } = prerun_threshold of { prerun_threshold } datasets, launching PanDDA2 array job"
641+ f"Dataset dataset_count { dataset_count } = comparator_threshold of { comparator_threshold } datasets, launching PanDDA2 array job"
613642 )
614- elif dataset_count > prerun_threshold :
615- datasets = [ dtag ]
643+ elif dataset_count > comparator_threshold :
644+ n_datasets = 1
616645 self .log .info (f"Launching single PanDDA2 job for dtag { dtag } " )
617646
618647 self .log .debug ("PanDDA2 trigger: Starting" )
619- self .log .debug (f"Launching job for datasets: { datasets } " )
620648
621649 pandda_parameters = {
622650 "dcid" : dcid , #
623651 "processing_directory" : str (processing_dir ),
624652 "model_directory" : str (model_dir ),
625653 "dataset_directory" : str (dataset_dir ),
626- "datasets " : json . dumps ( datasets ) ,
627- "n_datasets" : len ( datasets ) ,
654+ "dtag " : dtag ,
655+ "n_datasets" : n_datasets ,
628656 "scaling_id" : scaling_id ,
629- "prerun_threshold " : prerun_threshold ,
657+ "comparator_threshold " : comparator_threshold ,
630658 "database_path" : str (db_copy ),
631659 }
632660
@@ -699,7 +727,7 @@ def trigger_pandda_xchem_post(
699727 visit_number = visit .split ("-" )[1 ]
700728
701729 # If other PanDDA2 postrun within visit running, quit
702- min_start_time = datetime .now () - timedelta (hours = 6 )
730+ min_start_time = datetime .now () - timedelta (hours = 0.2 )
703731
704732 # from proposal and visit get all dcids
705733 query = (
@@ -775,81 +803,3 @@ def trigger_pandda_xchem_post(
775803 self .log .info (f"PanDDA2_post trigger: Processing job { jobid } triggered" )
776804
777805 return {"success" : True }
778-
779- @pydantic .validate_call (config = {"arbitrary_types_allowed" : True })
780- def trigger_pandda_rhofit (
781- self ,
782- rw : workflows .recipe .RecipeWrapper ,
783- * ,
784- message : Dict ,
785- parameters : PanDDA_RhofitParameters ,
786- session : sqlalchemy .orm .session .Session ,
787- transaction : int ,
788- ** kwargs ,
789- ):
790- """Trigger a PanDDA rhofit job for an XChem fragment screening experiment.
791- Recipe parameters are described below with appropriate ispyb placeholder "{}"
792- values:
793- - target: set this to "pandda_xchem_post"
794- - dcid: the dataCollectionId for the given data collection i.e. "{ispyb_dcid}"
795- - comment: a comment to be stored in the ProcessingJob.comment field
796- - timeout-minutes: (optional) the max time (in minutes) allowed to wait for
797- processing PanDDA jobs
798- - automatic: boolean value passed to ProcessingJob.automatic field
799- Example recipe parameters:
800- { "target": "pandda_rhofit",
801- "dcid": 123456,
802- "datasets": ['dtag1','dtag2']
803- "processing_directory": '/dls/labxchem/data/lb42888/lb42888-1/processing',
804- "scaling_id": [123456],
805- "automatic": true,
806- "comment": "PanDDA2 Rhofit",
807- "timeout-minutes": 60,
808- }
809- """
810-
811- dcid = parameters .dcid
812- scaling_id = parameters .scaling_id [0 ]
813- processing_directory = pathlib .Path (parameters .processing_directory )
814- datasets = json .loads (parameters .datasets )
815-
816- self .log .debug ("PanDDA2 rhofit trigger: Starting" )
817- self .log .info (f"Datasets for rhofit: { datasets } " )
818-
819- self .log .debug (f"Launching job for datasets: { datasets } " )
820- pandda_parameters = {
821- "dcid" : dcid , #
822- "processing_directory" : str (processing_directory ),
823- "datasets" : json .dumps (datasets ),
824- "n_datasets" : len (datasets ),
825- "scaling_id" : scaling_id ,
826- }
827-
828- jp = self .ispyb .mx_processing .get_job_params ()
829- jp ["automatic" ] = parameters .automatic
830- # jp["comments"] = parameters.comment
831- jp ["datacollectionid" ] = dcid
832- jp ["display_name" ] = "PanDDA2_Rhofit"
833- jp ["recipe" ] = "postprocessing-pandda2-rhofit"
834- self .log .info (jp )
835- jobid = self .ispyb .mx_processing .upsert_job (list (jp .values ()))
836- self .log .debug (f"PanDDA2 trigger: generated JobID { jobid } " )
837-
838- for key , value in pandda_parameters .items ():
839- jpp = self .ispyb .mx_processing .get_job_parameter_params ()
840- jpp ["job_id" ] = jobid
841- jpp ["parameter_key" ] = key
842- jpp ["parameter_value" ] = value
843- jppid = self .ispyb .mx_processing .upsert_job_parameter (list (jpp .values ()))
844- self .log .debug (
845- f"PanDDA2 trigger: generated JobParameterID { jppid } with { key } ={ value } "
846- )
847-
848- self .log .debug (f"PanDDA2_Rhofit trigger: Processing job { jobid } created" )
849-
850- message = {"recipes" : [], "parameters" : {"ispyb_process" : jobid }}
851- rw .transport .send ("processing_recipe" , message )
852-
853- self .log .info (f"PanDDA2_Rhofit trigger: Processing job { jobid } triggered" )
854-
855- return {"success" : True }
0 commit comments