Skip to content

Commit 77eb9e9

Browse files
committed
Implement ExitStack() to manage multiple context managers and clean them up
1 parent aebe1ca commit 77eb9e9

3 files changed

Lines changed: 24 additions & 8 deletions

File tree

src/workflows/recipe/__init__.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from __future__ import annotations
2-
2+
from contextlib import ExitStack
33
import functools
44
import logging
55
from collections.abc import Callable
@@ -70,6 +70,7 @@ def unwrap_recipe(header, message):
7070
if mangle_for_receiving:
7171
message = mangle_for_receiving(message)
7272
if header.get("workflows-recipe") in {True, "True", "true", 1}:
73+
otel_logs = None
7374
rw = RecipeWrapper(message=message, transport=transport_layer)
7475

7576
# Extract recipe ID from environment and add to current span
@@ -79,6 +80,8 @@ def unwrap_recipe(header, message):
7980
if recipe_id:
8081
span.set_attribute("recipe_id", recipe_id)
8182

83+
84+
8285
# Extract span_id and trace_id for logging
8386
span_context = span.get_span_context()
8487
if span_context and span_context.is_valid:
@@ -92,13 +95,17 @@ def unwrap_recipe(header, message):
9295

9396
if recipe_id:
9497
otel_logs["recipe_id"] = recipe_id
95-
else:
96-
otel_logs = "No OTEL related logs available"
97-
98-
if log_extender and rw.environment and rw.environment.get("ID"):
99-
with log_extender("recipe_ID", rw.environment["ID"]), log_extender("otel_logs", otel_logs):
100-
return callback(rw, header, message.get("payload"))
101-
return callback(rw, header, message.get("payload"))
98+
99+
with ExitStack() as stack:
100+
# Configure the context depending on if service is emitting spans
101+
if otel_logs and log_extender and rw.environment and rw.environment.get("ID"):
102+
stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID")))
103+
stack.enter_context(log_extender('otel_logs', otel_logs))
104+
elif log_extender and rw.environment and rw.environment.get("ID"):
105+
stack.enter_context(log_extender('recipe_ID', rw.environment.get("ID")))
106+
107+
return callback(rw, header, message.get("payload"))
108+
102109

103110
if allow_non_recipe_messages:
104111
return callback(None, header, message)

src/workflows/services/common_service.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,13 @@ def start_transport(self):
197197
otel_config = (
198198
self.config._opentelemetry if self.config and hasattr(self.config, "opentelemetry") else None
199199
)
200+
# debugging
201+
with open("/scratch/logs.txt", 'w+') as file:
202+
if otel_config:
203+
import json
204+
json.dump(otel_config, file, indent=4)
205+
else:
206+
file.write("otel config was not truthy")
200207

201208
if otel_config and "timeout" not in otel_config:
202209
self.log.warning("Missing optional OTEL configuration field `timeout`. Will default to 10 seconds. ")

src/workflows/util/zocalo/configuration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def activate(configuration):
3333
OTEL.config["endpoint"] = endpoint
3434
OTEL.config["timeout"] = configuration.get("timeout", 10)
3535

36+
return OTEL.config
37+
3638

3739
class Stomp:
3840
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""

0 commit comments

Comments
 (0)