Skip to content

Commit 660f089

Browse files
committed
fix broken chain of responsibility in otel tracing middleware
1 parent 3ec07eb commit 660f089

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

src/workflows/transport/middleware/otel_tracing.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,72 @@ def wrapped_callback(header, message):
107107

108108
return call_next(channel_hint, wrapped_callback, **kwargs)
109109

110+
def raw_send(self, call_next: Callable, destination: str, message, **kwargs):
111+
# Get current span context (may be None if this is the root span)
112+
current_span = trace.get_current_span()
113+
parent_context = (
114+
trace.set_span_in_context(current_span) if current_span else None
115+
)
116+
117+
with self.tracer.start_as_current_span(
118+
"transport.raw_send",
119+
context=parent_context,
120+
) as span:
121+
self._set_span_attributes(span, destination=destination)
122+
123+
# Inject the current trace context into the message headers
124+
headers = kwargs.get("headers", {})
125+
if headers is None:
126+
headers = {}
127+
inject(headers) # This modifies headers in-place
128+
kwargs["headers"] = headers
129+
130+
return call_next(destination, message, **kwargs)
131+
132+
def broadcast(self, call_next: Callable, destination: str, message, **kwargs):
133+
# Get current span context (may be None if this is the root span)
134+
current_span = trace.get_current_span()
135+
parent_context = (
136+
trace.set_span_in_context(current_span) if current_span else None
137+
)
138+
139+
with self.tracer.start_as_current_span(
140+
"transport.broadcast",
141+
context=parent_context,
142+
) as span:
143+
self._set_span_attributes(span, destination=destination)
144+
145+
# Inject the current trace context into the message headers
146+
headers = kwargs.get("headers", {})
147+
if headers is None:
148+
headers = {}
149+
inject(headers) # This modifies headers in-place
150+
kwargs["headers"] = headers
151+
152+
return call_next(destination, message, **kwargs)
153+
154+
def raw_broadcast(self, call_next: Callable, destination: str, message, **kwargs):
155+
# Get current span context (may be None if this is the root span)
156+
current_span = trace.get_current_span()
157+
parent_context = (
158+
trace.set_span_in_context(current_span) if current_span else None
159+
)
160+
161+
with self.tracer.start_as_current_span(
162+
"transport.raw_broadcast",
163+
context=parent_context,
164+
) as span:
165+
self._set_span_attributes(span, destination=destination)
166+
167+
# Inject the current trace context into the message headers
168+
headers = kwargs.get("headers", {})
169+
if headers is None:
170+
headers = {}
171+
inject(headers) # This modifies headers in-place
172+
kwargs["headers"] = headers
173+
174+
return call_next(destination, message, **kwargs)
175+
110176
def unsubscribe(
111177
self,
112178
call_next: Callable,

0 commit comments

Comments
 (0)