Skip to content

Commit dde2189

Browse files
committed
Revert "Store s3echo.upload messages in DLQ"
This reverts commit 3c503cd.
1 parent 3c503cd commit dde2189

1 file changed

Lines changed: 29 additions & 27 deletions

File tree

src/dlstbx/services/s3echo_uploader.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from __future__ import annotations
22

33
from pathlib import Path
4+
from pprint import pformat
45

56
import workflows.recipe
67
from minio.error import S3Error
78
from workflows.services.common_service import CommonService
89

910
from dlstbx.util.iris import (
1011
get_minio_client,
12+
get_presigned_urls,
1113
remove_objects_from_s3,
1214
retrieve_results_from_s3,
1315
)
@@ -33,13 +35,13 @@ def initializing(self):
3335
"""
3436
self.log.info(f"{S3EchoUploader._service_name} starting")
3537

36-
workflows.recipe.wrap_subscribe(
37-
self._transport,
38-
"s3echo.upload",
39-
self.on_upload,
40-
acknowledgement=True,
41-
log_extender=self.extend_log,
42-
)
38+
# workflows.recipe.wrap_subscribe(
39+
# self._transport,
40+
# "s3echo.upload",
41+
# self.on_upload,
42+
# acknowledgement=True,
43+
# log_extender=self.extend_log,
44+
# )
4345

4446
workflows.recipe.wrap_subscribe(
4547
self._transport,
@@ -55,26 +57,26 @@ def on_upload(self, rw, header, message):
5557
"""
5658
# Conditionally acknowledge receipt of the message
5759
txn = rw.transport.transaction_begin(subscription_id=header["subscription"])
58-
rw.transport.nack(header, transaction=txn)
59-
60-
# params = rw.recipe_step["parameters"]
61-
# minio_client = get_minio_client(S3EchoUploader._s3echo_credentials)
62-
# try:
63-
# s3_urls = get_presigned_urls(
64-
# minio_client,
65-
# params["bucket"],
66-
# params["rpid"],
67-
# rw.environment["s3echo_upload"].values(),
68-
# self.log,
69-
# )
70-
# except S3Error:
71-
# self.log.exception(
72-
# f"Error uploading following files to S3 bucket {params['bucket']}:\n{pformat(rw.environment['s3_upload'])}"
73-
# )
74-
# rw.send_to("failure", message, transaction=txn)
75-
# else:
76-
# rw.environment["s3_urls"] = s3_urls
77-
# rw.send_to("success", message, transaction=txn)
60+
rw.transport.ack(header, transaction=txn)
61+
62+
params = rw.recipe_step["parameters"]
63+
minio_client = get_minio_client(S3EchoUploader._s3echo_credentials)
64+
try:
65+
s3_urls = get_presigned_urls(
66+
minio_client,
67+
params["bucket"],
68+
params["rpid"],
69+
rw.environment["s3echo_upload"].values(),
70+
self.log,
71+
)
72+
except S3Error:
73+
self.log.exception(
74+
f"Error uploading following files to S3 bucket {params['bucket']}:\n{pformat(rw.environment['s3_upload'])}"
75+
)
76+
rw.send_to("failure", message, transaction=txn)
77+
else:
78+
rw.environment["s3_urls"] = s3_urls
79+
rw.send_to("success", message, transaction=txn)
7880

7981
# Commit transaction
8082
rw.transport.transaction_commit(txn)

0 commit comments

Comments
 (0)