Skip to content

Commit 3c503cd

Browse files
committed
Store s3echo.upload messages in DLQ
1 parent 3e9aee3 commit 3c503cd

1 file changed

Lines changed: 27 additions & 29 deletions

File tree

src/dlstbx/services/s3echo_uploader.py

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
from __future__ import annotations
22

33
from pathlib import Path
4-
from pprint import pformat
54

65
import workflows.recipe
76
from minio.error import S3Error
87
from workflows.services.common_service import CommonService
98

109
from dlstbx.util.iris import (
1110
get_minio_client,
12-
get_presigned_urls,
1311
remove_objects_from_s3,
1412
retrieve_results_from_s3,
1513
)
@@ -35,13 +33,13 @@ def initializing(self):
3533
"""
3634
self.log.info(f"{S3EchoUploader._service_name} starting")
3735

38-
# workflows.recipe.wrap_subscribe(
39-
# self._transport,
40-
# "s3echo.upload",
41-
# self.on_upload,
42-
# acknowledgement=True,
43-
# log_extender=self.extend_log,
44-
# )
36+
workflows.recipe.wrap_subscribe(
37+
self._transport,
38+
"s3echo.upload",
39+
self.on_upload,
40+
acknowledgement=True,
41+
log_extender=self.extend_log,
42+
)
4543

4644
workflows.recipe.wrap_subscribe(
4745
self._transport,
@@ -57,26 +55,26 @@ def on_upload(self, rw, header, message):
5755
"""
5856
# Conditionally acknowledge receipt of the message
5957
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
60-
rw.transport.ack(header, transaction=txn)
61-
62-
params = rw.recipe_step["parameters"]
63-
minio_client = get_minio_client(S3EchoUploader._s3echo_credentials)
64-
try:
65-
s3_urls = get_presigned_urls(
66-
minio_client,
67-
params["bucket"],
68-
params["rpid"],
69-
rw.environment["s3echo_upload"].values(),
70-
self.log,
71-
)
72-
except S3Error:
73-
self.log.exception(
74-
f"Error uploading following files to S3 bucket {params['bucket']}:\n{pformat(rw.environment['s3_upload'])}"
75-
)
76-
rw.send_to("failure", message, transaction=txn)
77-
else:
78-
rw.environment["s3_urls"] = s3_urls
79-
rw.send_to("success", message, transaction=txn)
58+
rw.transport.nack(header, transaction=txn)
59+
60+
# params = rw.recipe_step["parameters"]
61+
# minio_client = get_minio_client(S3EchoUploader._s3echo_credentials)
62+
# try:
63+
# s3_urls = get_presigned_urls(
64+
# minio_client,
65+
# params["bucket"],
66+
# params["rpid"],
67+
# rw.environment["s3echo_upload"].values(),
68+
# self.log,
69+
# )
70+
# except S3Error:
71+
# self.log.exception(
72+
# f"Error uploading following files to S3 bucket {params['bucket']}:\n{pformat(rw.environment['s3_upload'])}"
73+
# )
74+
# rw.send_to("failure", message, transaction=txn)
75+
# else:
76+
# rw.environment["s3_urls"] = s3_urls
77+
# rw.send_to("success", message, transaction=txn)
8078

8179
# Commit transaction
8280
rw.transport.transaction_commit(txn)

0 commit comments

Comments
 (0)