Skip to content

Commit 15a8666

Browse files
committed
Initial commit implementing single data upload per data collection
1 parent 660a612 commit 15a8666

9 files changed

Lines changed: 591 additions & 139 deletions

File tree

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@
125125
"DLSXRayCentering = dlstbx.services.xray_centering:DLSXRayCentering",
126126
"CloudStats = dlstbx.services.cloudstats:CloudStats",
127127
"CloudWatcher = dlstbx.services.cloudwatcher:CloudWatcher",
128+
"S3EchoCollector = dlstbx.services.s3echo_collector:S3EchoCollector",
128129
"S3EchoUploader = dlstbx.services.s3echo_uploader:S3EchoUploader",
130+
"S3EchoWatcher = dlstbx.services.s3echo_watcher:S3EchoWatcher",
129131
"SSXPlotter = dlstbx.services.ssx_plotter:SSXPlotter",
130132
# "LoadProducer = dlstbx.services.load_producer:LoadProducer", # tentatively disabled
131133
# "LoadReceiver = dlstbx.services.load_receiver:LoadReceiver", # tentatively disabled

src/dlstbx/ispybtbx/__init__.py

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ def ispyb_filter(
911911
)
912912
parameters["ispyb_space_group"] = ""
913913
parameters["ispyb_related_sweeps"] = []
914+
parameters["ispyb_related_images"] = []
914915
parameters["ispyb_reference_geometry"] = None
915916
if visit_directory:
916917
parameters["ispyb_pdb"] = i.get_linked_pdb_files_for_dcid(
@@ -957,16 +958,54 @@ def ispyb_filter(
957958
# if a sample is linked to the dc, then get dcids on the same sample
958959
sample_id = parameters["ispyb_dc_info"].get("BLSAMPLEID")
959960
related_dcids = i.get_sample_dcids(sample_id, session)
960-
elif dcid := parameters.get("ispyb_dcid"):
961+
elif dc_id:
961962
# else get dcids collected into the same image directory
962-
related_dcids = i.get_related_dcids_same_directory(dcid, session)
963+
related_dcids = i.get_related_dcids_same_directory(dc_id, session)
963964
if related_dcids:
964965
parameters["ispyb_related_dcids"].append(related_dcids)
965966
logger.debug(f"ispyb_related_dcids: {parameters['ispyb_related_dcids']}")
966967
parameters["ispyb_dcg_dcids"] = i.get_dcg_dcids(
967968
dc_info.get("dataCollectionId"), dc_info.get("dataCollectionGroupId"), session
968969
)
969970

971+
# for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi
972+
# beware if other projects start using this directory structure will
973+
# need to be smarter here...
974+
975+
# Handle related DCID properties via DataCollectionGroup, if there is one
976+
if dcg_id:
977+
stmt = select(
978+
models.DataCollection.dataCollectionId,
979+
models.DataCollection.startImageNumber,
980+
models.DataCollection.numberOfImages,
981+
models.DataCollection.overlap,
982+
models.DataCollection.axisRange,
983+
models.DataCollection.fileTemplate,
984+
models.DataCollection.imageDirectory,
985+
).where(models.DataCollection.dataCollectionGroupId == dcg_id)
986+
987+
related_images = []
988+
989+
for dc in session.execute(stmt).mappings():
990+
start, end = i.dc_info_to_start_end(dc)
991+
parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end))
992+
parameters["ispyb_related_images"].append(
993+
(dc.dataCollectionId, i.dc_info_to_filename(dc))
994+
)
995+
996+
# We don't get related images for /dls/mx collections
997+
if (
998+
not parameters["ispyb_image_directory"].startswith("/dls/mx")
999+
and dc.dataCollectionId != dc_id
1000+
and i.dc_info_is_rotation_scan(dc)
1001+
):
1002+
related_images.append(
1003+
"%s:%d:%d" % (i.dc_info_to_filename(dc), start, end)
1004+
)
1005+
1006+
if not parameters.get("ispyb_images"):
1007+
parameters["ispyb_images"] = ",".join(related_images)
1008+
9701009
pin_info = i.get_pin_info_from_sample_id(
9711010
parameters["ispyb_dc_info"].get("BLSAMPLEID"), session
9721011
)
@@ -1005,41 +1044,6 @@ def ispyb_filter(
10051044
parameters["ispyb_ssx_events"] = events or None
10061045
parameters["ispyb_ssx_shots_per_image"] = shots_per_image or None
10071046

1008-
# for the moment we do not want multi-xia2 for /dls/mx i.e. VMXi
1009-
# beware if other projects start using this directory structure will
1010-
# need to be smarter here...
1011-
1012-
# Handle related DCID properties via DataCollectionGroup, if there is one
1013-
if dcg_id:
1014-
stmt = select(
1015-
models.DataCollection.dataCollectionId,
1016-
models.DataCollection.startImageNumber,
1017-
models.DataCollection.numberOfImages,
1018-
models.DataCollection.overlap,
1019-
models.DataCollection.axisRange,
1020-
models.DataCollection.fileTemplate,
1021-
models.DataCollection.imageDirectory,
1022-
).where(models.DataCollection.dataCollectionGroupId == dcg_id)
1023-
1024-
related_images = []
1025-
1026-
for dc in session.execute(stmt).mappings():
1027-
start, end = i.dc_info_to_start_end(dc)
1028-
parameters["ispyb_related_sweeps"].append((dc.dataCollectionId, start, end))
1029-
1030-
# We don't get related images for /dls/mx collections
1031-
if (
1032-
not parameters["ispyb_image_directory"].startswith("/dls/mx")
1033-
and dc.dataCollectionId != dc_id
1034-
and i.dc_info_is_rotation_scan(dc)
1035-
):
1036-
related_images.append(
1037-
"%s:%d:%d" % (i.dc_info_to_filename(dc), start, end)
1038-
)
1039-
1040-
if not parameters.get("ispyb_images"):
1041-
parameters["ispyb_images"] = ",".join(related_images)
1042-
10431047
return message, parameters
10441048

10451049

src/dlstbx/services/mimas.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,29 @@ def get_cloudbursting_spec(
231231
# Create cloud specification entry for each element in zocalo.mimas.cloud
232232
# Add specification to the list if science cluster if oversubscribed
233233
# and cluster statistics are up-to-date
234+
is_iris_live = (
235+
self.cluster_stats["iris"]["last_cluster_update"] > timeout_threshold
236+
)
237+
is_s3echo_live = (
238+
self.cluster_stats["s3echo"]["last_cluster_update"] > timeout_threshold
239+
)
240+
is_s3echo_quota = self.cluster_stats["s3echo"]["total"] < s3echo_quota
241+
cloudbursting = is_iris_live and is_s3echo_live and is_s3echo_quota
242+
self.log.debug(
243+
pformat(
244+
{
245+
"is_iris_live": is_iris_live,
246+
"is_s3echo_live": is_s3echo_live,
247+
"is_s3echo_quota": is_s3echo_quota,
248+
}
249+
)
250+
)
251+
if not cloudbursting:
252+
return cloud_spec_list
253+
254+
# Create cloud specification entry for each element in zocalo.mimas.cloud
255+
# Add specification to the list if science cluster if oversubscribed
256+
# and cluster statistics are up-to-date
234257
for group in self.config.storage.get("zocalo.mimas.cloud", []):
235258
if not group.get("cloudbursting", True):
236259
continue
@@ -240,30 +263,28 @@ def get_cloudbursting_spec(
240263
beamlines=set(group.get("beamlines", []))
241264
)
242265
group_max_jobs_waiting = group.get("max_jobs_waiting", max_jobs_waiting)
243-
if (
244-
(
245-
(
246-
self.cluster_stats["slurm"]["jobs_waiting"]
247-
> group_max_jobs_waiting["slurm"]
248-
)
249-
or (
250-
self.cluster_stats["slurm"]["last_cluster_update"]
251-
< timeout_threshold
252-
)
253-
)
254-
and (
255-
self.cluster_stats["iris"]["jobs_waiting"]
256-
< group_max_jobs_waiting["iris"]
257-
)
258-
and (
259-
self.cluster_stats["iris"]["last_cluster_update"]
260-
> timeout_threshold
261-
)
262-
and (
263-
self.cluster_stats["s3echo"]["last_cluster_update"]
264-
> timeout_threshold
266+
is_slurm_max_jobs = (
267+
self.cluster_stats["slurm"]["jobs_waiting"]
268+
> group_max_jobs_waiting["slurm"]
269+
)
270+
is_slurm_timeout = (
271+
self.cluster_stats["slurm"]["last_cluster_update"]
272+
< timeout_threshold
273+
)
274+
is_iris_max_jobs = (
275+
self.cluster_stats["iris"]["jobs_waiting"]
276+
< group_max_jobs_waiting["iris"]
277+
)
278+
self.log.debug(
279+
pformat(
280+
{
281+
"is_slurm_max_jobs": is_slurm_max_jobs,
282+
"is_slurm_timeout": is_slurm_timeout,
283+
"is_iris_max_jobs": is_iris_max_jobs,
284+
}
265285
)
266-
):
286+
)
287+
if (is_slurm_max_jobs or is_slurm_timeout) and is_iris_max_jobs:
267288
cloud_spec_list.append(
268289
{
269290
"cloud_spec": cloud_spec,
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
from __future__ import annotations
2+
3+
import minio
4+
import workflows.recipe
5+
from workflows.services.common_service import CommonService
6+
7+
from dlstbx.util import iris
8+
from dlstbx.util.iris import get_minio_client, update_dcid_info_file
9+
10+
11+
class S3EchoCollector(CommonService):
12+
"""
13+
A service that keeps status of uploads to S3 Echo object store and does garbage collection of unreferenced data.
14+
"""
15+
16+
# Human readable service name
17+
_service_name = "S3EchoCollector"
18+
19+
# Logger name
20+
_logger_name = "dlstbx.services.s3echocollector"
21+
22+
# STFC S3 Echo credentials
23+
_s3echo_credentials = "/dls_sw/apps/zocalo/secrets/credentials-echo-mx.cfg"
24+
25+
def initializing(self):
26+
"""
27+
Register callback functions to upload and download data from S3 Echo object store.
28+
"""
29+
self.log.info(f"{S3EchoCollector._service_name} starting")
30+
31+
self.minio_client: minio.Minio = get_minio_client(
32+
S3EchoCollector._s3echo_credentials
33+
)
34+
35+
self._message_delay = 5
36+
37+
workflows.recipe.wrap_subscribe(
38+
self._transport,
39+
"s3echo.start",
40+
self.on_start,
41+
acknowledgement=True,
42+
log_extender=self.extend_log,
43+
)
44+
45+
workflows.recipe.wrap_subscribe(
46+
self._transport,
47+
"s3echo.end",
48+
self.on_end,
49+
acknowledgement=True,
50+
log_extender=self.extend_log,
51+
)
52+
53+
def on_start(self, rw, header, message):
54+
"""
55+
Process request for uploading images to S3 Echo object store.
56+
"""
57+
# Conditionally acknowledge receipt of the message
58+
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
59+
rw.transport.ack(header, transaction=txn)
60+
61+
params = rw.recipe_step["parameters"]
62+
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)
63+
64+
bucket_name = params["bucket"]
65+
if not minio_client.bucket_exists(bucket_name):
66+
minio_client.make_bucket(bucket_name)
67+
rpid = int(params["rpid"])
68+
69+
s3echo_upload_files = {}
70+
if images := params.get("images"):
71+
dcid = int(params["dcid"])
72+
response_info = update_dcid_info_file(
73+
minio_client, bucket_name, dcid, 0, rpid, self.log
74+
)
75+
try:
76+
image_files = iris.get_image_files(images, self.log)
77+
s3echo_upload_files.update(
78+
{name: (dcid, pth) for name, pth in image_files.items()}
79+
)
80+
except Exception:
81+
self.log.exception("Error uploading image files to S3 Echo")
82+
if not response_info:
83+
self.log.debug("Sending message to upload endpoint")
84+
rw.send_to(
85+
"upload", {"s3echo_upload": {dcid: image_files}}, transaction=txn
86+
)
87+
rw.environment.update({"s3echo_upload": s3echo_upload_files})
88+
self.log.debug("Sending message to watch endpoint")
89+
rw.send_to("watch", message, transaction=txn)
90+
elif params.get("related_images"):
91+
for dcid, image_master_file in params.get("related_images"):
92+
response_info = update_dcid_info_file(
93+
minio_client, bucket_name, dcid, 0, rpid, self.log
94+
)
95+
try:
96+
image_files = iris.get_related_images_files_from_h5(
97+
image_master_file, self.log
98+
)
99+
s3echo_upload_files.update(
100+
{name: (dcid, pth) for name, pth in image_files.items()}
101+
)
102+
if not response_info:
103+
self.log.debug("Sending message to upload endpoint")
104+
rw.send_to(
105+
"upload",
106+
{"s3echo_upload": {dcid: image_files}},
107+
transaction=txn,
108+
)
109+
except Exception:
110+
self.log.exception("Error uploading image files to S3 Echo")
111+
rw.environment.update({"s3echo_upload": s3echo_upload_files})
112+
self.log.debug("Sending message to watch endpoint")
113+
rw.send_to("watch", message, transaction=txn)
114+
rw.transport.transaction_commit(txn)
115+
116+
def on_end(self, rw, header, message):
117+
"""
118+
Remove reference to image data in S3 Echo object store after end of processing.
119+
"""
120+
# Conditionally acknowledge receipt of the message
121+
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
122+
rw.transport.ack(header, transaction=txn)
123+
124+
params = rw.recipe_step["parameters"]
125+
minio_client = get_minio_client(S3EchoCollector._s3echo_credentials)
126+
bucket_name = params["bucket"]
127+
rpid = int(params["rpid"])
128+
129+
for dcid, _ in params.get("related_images", [(int(params["dcid"]), None)]):
130+
response_info = update_dcid_info_file(
131+
minio_client, bucket_name, dcid, None, None, self.log
132+
)
133+
if not response_info:
134+
self.log.warning(f"No {dcid}_info data read from the object store")
135+
elif response_info["status"] == -1 or (
136+
response_info["status"] == 1 and response_info["pid"] == [rpid]
137+
):
138+
dc_objects = {
139+
obj.object_name
140+
for obj in minio_client.list_objects(bucket_name)
141+
if obj.object_name is not None
142+
}
143+
for obj_name in dc_objects:
144+
if obj_name.startswith(f"{dcid}_"):
145+
minio_client.remove_object(bucket_name, obj_name)
146+
else:
147+
update_dcid_info_file(
148+
minio_client, bucket_name, dcid, None, -rpid, self.log
149+
)
150+
151+
rw.transport.transaction_commit(txn)

0 commit comments

Comments
 (0)