|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from pathlib import Path |
| 4 | + |
| 5 | +import workflows.recipe |
| 6 | +import yaml |
| 7 | +from pydantic import BaseModel, Field, ValidationError |
| 8 | +from workflows.services.common_service import CommonService |
| 9 | + |
| 10 | +from dlstbx.util import ChainMapWithReplacement |
| 11 | + |
| 12 | + |
| 13 | +def scale_parameter( |
| 14 | + value: float, scale_factor: float, limits: tuple[float, float] | None = None |
| 15 | +) -> tuple[float, float]: |
| 16 | + def apply_limit(parameter: float, limits: tuple[float, float]) -> float: |
| 17 | + lower_limit, upper_limit = limits |
| 18 | + if lower_limit is not None: |
| 19 | + parameter = max(lower_limit, parameter) |
| 20 | + if upper_limit is not None: |
| 21 | + parameter = min(upper_limit, parameter) |
| 22 | + return parameter |
| 23 | + |
| 24 | + ref_value = value * scale_factor |
| 25 | + if limits is not None: |
| 26 | + scaled_value = apply_limit(ref_value, limits) |
| 27 | + else: |
| 28 | + scaled_value = ref_value |
| 29 | + if scaled_value == 0: |
| 30 | + raise ValueError("Scaled value cannot be zero") |
| 31 | + # Scale factor to apply to opposite parameter to achieve the desired scaling effect, accounting for limits |
| 32 | + corrective_scale_factor = ref_value / scaled_value |
| 33 | + return scaled_value, corrective_scale_factor |
| 34 | + |
| 35 | + |
| 36 | +def get_resolution_scale(resolution: float) -> float: |
| 37 | + """ |
| 38 | + Set dose scaling factor based on resolution. Comes from polynomial fit to empirical user and test data |
| 39 | + modelling dose limits vs resolution. Loosely based on: Atakisi, H., Conger, L., Moreau, D. W., & Thorne, |
| 40 | + R. E. (2019). Resolution and dose dependence of radiation damage in biomolecular systems. IUCrJ, 6(Pt 6), |
| 41 | + 1040–1053. https://doi.org/10.1107/S2052252519008777 |
| 42 | + """ |
| 43 | + return resolution**2 - 0.4 * resolution + 0.5 |
| 44 | + |
| 45 | + |
| 46 | +def get_wavelength_scale(wavelength: float, default_wavelength: float) -> float: |
| 47 | + """ |
| 48 | + Set dose scaling factor for wavelength. Dose is proportional to energy * X-ray absorption. Dominant |
| 49 | + absorption at typical wavelengths is by photoelectric effect, which is proportional to wavelength^3, |
| 50 | + but energy is inversely proportional to wavelength, so overall dose is approximately proportional to |
| 51 | + wavelength^2. |
| 52 | + """ |
| 53 | + return (default_wavelength / wavelength) ** 2 |
| 54 | + |
| 55 | + |
| 56 | +class AgamemnonParameters(BaseModel): |
| 57 | + chi: float |
| 58 | + comment: str |
| 59 | + exposure_time: float = Field(gt=0) |
| 60 | + dose: float = Field(gt=0) |
| 61 | + kappa: float |
| 62 | + number_of_images: int = Field(gt=0) |
| 63 | + omega_increment: float = Field(gt=0) |
| 64 | + omega_overlap: float |
| 65 | + omega_start: float |
| 66 | + phi_increment: float |
| 67 | + phi_overlap: float |
| 68 | + phi_start: float |
| 69 | + scan_axis: str |
| 70 | + transmission: float = Field(gt=0) |
| 71 | + two_theta: float |
| 72 | + wavelength: float = Field(gt=0) |
| 73 | + |
| 74 | + |
| 75 | +def parse_agamemnon_recipe(recipe_path: Path) -> list[AgamemnonParameters]: |
| 76 | + with open(recipe_path, "r") as f: |
| 77 | + recipe = yaml.safe_load(f) |
| 78 | + return [AgamemnonParameters(**step) for step in recipe] |
| 79 | + |
| 80 | + |
| 81 | +def parse_config_file(config_file: Path) -> dict: |
| 82 | + config = {} |
| 83 | + |
| 84 | + for record in open(config_file, errors="ignore"): |
| 85 | + if "#" in record: |
| 86 | + record = record.split("#")[0] |
| 87 | + record = record.strip() |
| 88 | + if not record: |
| 89 | + continue |
| 90 | + if "=" not in record: |
| 91 | + continue |
| 92 | + |
| 93 | + key, value = record.split("=") |
| 94 | + key = key.strip() |
| 95 | + value = value.strip() |
| 96 | + |
| 97 | + if key == "include": |
| 98 | + if value.startswith(".."): |
| 99 | + include = config_file.parent / value |
| 100 | + name = Path(value).name.split(".")[0] |
| 101 | + included = parse_config_file(include) |
| 102 | + for k in included: |
| 103 | + config[f"{name}.{k}"] = included[k] |
| 104 | + continue |
| 105 | + |
| 106 | + config[key] = value |
| 107 | + # Resolve references to other variables |
| 108 | + for key, val in config.items(): |
| 109 | + if isinstance(val, str) and val[:2] == "${" and val[-1] == "}": |
| 110 | + try: |
| 111 | + config[key] = config[val[2:-1]] |
| 112 | + except KeyError: |
| 113 | + continue |
| 114 | + return config |
| 115 | + |
| 116 | + |
| 117 | +def get_beamline_param( |
| 118 | + config: dict, param_names: tuple[str, ...], default: float |
| 119 | +) -> float: |
| 120 | + """ |
| 121 | + Get a beamline parameter from the config, trying multiple possible parameter names and returning the first one found, or a default value if none are found. |
| 122 | + """ |
| 123 | + for param_name in param_names: |
| 124 | + if param_name in config: |
| 125 | + return float(config[param_name]) |
| 126 | + return default |
| 127 | + |
| 128 | + |
| 129 | +class DLSStrategy(CommonService): |
| 130 | + """Service for creating data collection strategies.""" |
| 131 | + |
| 132 | + # Human readable service name |
| 133 | + _service_name = "Strategy" |
| 134 | + |
| 135 | + # Logger name |
| 136 | + _logger_name = "dlstbx.services.strategy" |
| 137 | + |
| 138 | + def initializing(self): |
| 139 | + """Subscribe to channel.""" |
| 140 | + self.log.info("Strategy service starting") |
| 141 | + workflows.recipe.wrap_subscribe( |
| 142 | + self._transport, |
| 143 | + "strategy", |
| 144 | + self.generate_strategy, |
| 145 | + acknowledgement=True, |
| 146 | + log_extender=self.extend_log, |
| 147 | + ) |
| 148 | + |
| 149 | + def generate_strategy( |
| 150 | + self, rw: workflows.recipe.RecipeWrapper, header: dict, message: dict |
| 151 | + ): |
| 152 | + """Generate a strategy from the results of an upstream pipeline""" |
| 153 | + self.log.info("Received strategy request, generating strategy") |
| 154 | + |
| 155 | + parameters = ChainMapWithReplacement( |
| 156 | + message.get("parameters", {}) if isinstance(message, dict) else {}, |
| 157 | + rw.recipe_step["parameters"].get("ispyb_parameters", {}), |
| 158 | + substitutions=rw.environment, |
| 159 | + ) |
| 160 | + self.log.info(f"Received parameters for strategy generation:\n{parameters}") |
| 161 | + # Conditionally acknowledge receipt of the message |
| 162 | + txn = self._transport.transaction_begin(subscription_id=header["subscription"]) |
| 163 | + self._transport.ack(header, transaction=txn) |
| 164 | + |
| 165 | + beamline = ( |
| 166 | + parameters["beamline"][0] |
| 167 | + if isinstance(parameters["beamline"], list) |
| 168 | + else parameters["beamline"] |
| 169 | + ) |
| 170 | + wavelength = ( |
| 171 | + float(parameters["wavelength"][0]) |
| 172 | + if isinstance(parameters["wavelength"], list) |
| 173 | + else float(parameters["wavelength"]) |
| 174 | + ) |
| 175 | + resolution_estimate = ( |
| 176 | + float(parameters["resolution"][0]) |
| 177 | + if isinstance(parameters["resolution"], list) |
| 178 | + else float(parameters["resolution"]) |
| 179 | + ) |
| 180 | + resolution_offset = 0.5 |
| 181 | + min_resolution = 0.9 |
| 182 | + resolution = max((resolution_estimate) - resolution_offset, min_resolution) |
| 183 | + |
| 184 | + beamline_config_file = Path( |
| 185 | + f"/dls_sw/{beamline}/software/daq_configuration/domain/domain.properties" |
| 186 | + ) |
| 187 | + if not beamline_config_file.is_file(): |
| 188 | + self.log.error( |
| 189 | + f"Beamline configuration file {beamline_config_file} not found, terminating strategy generation" |
| 190 | + ) |
| 191 | + raise FileNotFoundError( |
| 192 | + f"Beamline configuration file {beamline_config_file} not found" |
| 193 | + ) |
| 194 | + beamline_config = parse_config_file(beamline_config_file) |
| 195 | + |
| 196 | + transmission_limits = ( |
| 197 | + get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), |
| 198 | + get_beamline_param(beamline_config, ("gda.mx.udc.maxTransmission",), 1.0), |
| 199 | + ) |
| 200 | + exposure_time_limits = ( |
| 201 | + get_beamline_param( |
| 202 | + beamline_config, |
| 203 | + ("gda.mx.udc.minExposureTime", "gda.exptTableModel.minExposureTime"), |
| 204 | + 0.002, |
| 205 | + ), |
| 206 | + get_beamline_param( |
| 207 | + beamline_config, |
| 208 | + ("gda.mx.udc.maxExposureTime", "gda.exptTableModel.maxExposureTime"), |
| 209 | + float("inf"), |
| 210 | + ), |
| 211 | + ) |
| 212 | + |
| 213 | + recipes = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} |
| 214 | + ispyb_command_list = [] |
| 215 | + |
| 216 | + for recipe, recipe_alias in recipes.items(): |
| 217 | + recipe_path = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes/{recipe}") |
| 218 | + if not recipe_path.is_file(): |
| 219 | + self.log.error( |
| 220 | + f"Recipe file {recipe_path} not found, terminating strategy generation" |
| 221 | + ) |
| 222 | + raise FileNotFoundError(f"Recipe file {recipe_path} not found") |
| 223 | + try: |
| 224 | + recipe_steps = parse_agamemnon_recipe(recipe_path) |
| 225 | + except ValidationError as e: |
| 226 | + self.log.error(f"Invalid recipe step in {recipe_path}: {e}") |
| 227 | + raise ValidationError(f"Invalid recipe step in {recipe_path}: {e}") |
| 228 | + |
| 229 | + # Step 1: Create screeningOutput record for recipe, linked to the screeningId |
| 230 | + # Keep the screeningOutputId |
| 231 | + d = { |
| 232 | + "program": "udc-strategy", |
| 233 | + "strategysuccess": 1, |
| 234 | + "ispyb_command": "insert_screening_output", |
| 235 | + "screening_id": "$ispyb_screening_id", |
| 236 | + "store_result": "ispyb_screening_output_id", |
| 237 | + } |
| 238 | + ispyb_command_list.append(d) |
| 239 | + |
| 240 | + # Step 2: Store screeningStrategy results, linked to the screeningOutputId |
| 241 | + # Keep the screeningStrategyId |
| 242 | + d = { |
| 243 | + "program": f"udc-strategy: {recipe_alias}", |
| 244 | + "ispyb_command": "insert_screening_strategy", |
| 245 | + "screening_output_id": "$ispyb_screening_output_id", |
| 246 | + "store_result": "ispyb_screening_strategy_id", |
| 247 | + } |
| 248 | + ispyb_command_list.append(d) |
| 249 | + |
| 250 | + for n_step, recipe_step in enumerate(recipe_steps, start=1): |
| 251 | + scale = 1.0 |
| 252 | + default_wavelength = recipe_step.wavelength |
| 253 | + scale *= get_wavelength_scale(wavelength, default_wavelength) |
| 254 | + scale *= get_resolution_scale(resolution) |
| 255 | + |
| 256 | + dose, _ = scale_parameter(recipe_step.dose, scale) |
| 257 | + |
| 258 | + rotation_axis = recipe_step.scan_axis |
| 259 | + rotation_start = recipe_step.__getattribute__(f"{rotation_axis}_start") |
| 260 | + rotation_increment = recipe_step.__getattribute__( |
| 261 | + f"{rotation_axis}_increment" |
| 262 | + ) |
| 263 | + transmission = recipe_step.transmission |
| 264 | + exposure_time = recipe_step.exposure_time |
| 265 | + |
| 266 | + # Runs twice to ensure that limits are applied correctly to both parameters, as they are interdependent - is this necessary? |
| 267 | + for _ in range(2): |
| 268 | + if scale > 1.0: |
| 269 | + transmission, scale = scale_parameter( |
| 270 | + transmission, scale, limits=transmission_limits |
| 271 | + ) |
| 272 | + exposure_time, scale = scale_parameter( |
| 273 | + exposure_time, scale, limits=exposure_time_limits |
| 274 | + ) |
| 275 | + else: |
| 276 | + exposure_time, scale = scale_parameter( |
| 277 | + exposure_time, scale, limits=exposure_time_limits |
| 278 | + ) |
| 279 | + transmission, scale = scale_parameter( |
| 280 | + transmission, scale, limits=transmission_limits |
| 281 | + ) |
| 282 | + self.log.debug( |
| 283 | + f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" |
| 284 | + ) |
| 285 | + |
| 286 | + # Step 3: Store screeningStrategyWedge results, linked to the screeningStrategyId |
| 287 | + # Keep the screeningStrategyWedgeId |
| 288 | + d = { |
| 289 | + "wedgenumber": n_step, |
| 290 | + "resolution": resolution, |
| 291 | + "phi": recipe_step.phi_start, |
| 292 | + "chi": recipe_step.chi, |
| 293 | + "kappa": recipe_step.kappa, |
| 294 | + "wavelength": wavelength, |
| 295 | + "dosetotal": dose, |
| 296 | + "comments": recipe_alias, |
| 297 | + "ispyb_command": "insert_screening_strategy_wedge", |
| 298 | + "screening_strategy_id": "$ispyb_screening_strategy_id", |
| 299 | + "store_result": f"ispyb_screening_strategy_wedge_id_{n_step}", |
| 300 | + } |
| 301 | + ispyb_command_list.append(d) |
| 302 | + |
| 303 | + # Step 4: Store second screeningStrategySubWedge results, linked to the screeningStrategyWedgeId |
| 304 | + # Keep the screeningStrategyWedgeId |
| 305 | + d = { |
| 306 | + "subwedgenumber": 1, |
| 307 | + "rotationaxis": recipe_step.scan_axis, |
| 308 | + "axisstart": rotation_start, |
| 309 | + "axisend": rotation_start |
| 310 | + + rotation_increment * recipe_step.number_of_images, |
| 311 | + "exposuretime": exposure_time, |
| 312 | + "transmission": transmission, |
| 313 | + "oscillationrange": rotation_increment, |
| 314 | + "numberOfImages": recipe_step.number_of_images, |
| 315 | + "resolution": resolution, |
| 316 | + "ispyb_command": "insert_screening_strategy_sub_wedge", |
| 317 | + "screening_strategy_wedge_id": f"$ispyb_screening_strategy_wedge_id_{n_step}", |
| 318 | + "store_result": f"ispyb_screening_strategy_sub_wedge_id_{n_step}", |
| 319 | + } |
| 320 | + ispyb_command_list.append(d) |
| 321 | + |
| 322 | + # Send results onwards |
| 323 | + rw.set_default_channel("ispyb") |
| 324 | + rw.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}, transaction=txn) |
| 325 | + self.log.info(f"Sent {len(ispyb_command_list)} commands to ISPyB") |
| 326 | + self.log.debug(f"Commands sent to ISPyB:\n{ispyb_command_list}") |
| 327 | + |
| 328 | + # Commit transaction |
| 329 | + self._transport.transaction_commit(txn) |
| 330 | + self.log.info("Strategy generation complete") |
0 commit comments