-
Notifications
You must be signed in to change notification settings - Fork 8
Added OTEL tracing #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added OTEL tracing #196
Changes from 33 commits
722bf3e
52cb04d
cc9ee12
f7cc658
8b2a2f1
0686e28
2d9e21c
3a5283a
4b999f1
4b86715
9c13d07
3e0b902
d446e80
16b0e10
7ad857f
468f940
7aae664
902a7df
9e5adb7
1d7457e
ff3679c
e6be925
aebe1ca
77eb9e9
5f94077
786bd00
33fbce2
ed6bc93
2369ba1
554baa2
7221ec4
b7e7999
2fd00fa
e4f69ed
a6fdbb1
229214f
30a67d2
6897e13
5aae712
0d4d825
106fcd8
d437b47
85eba77
4004a7d
4202c21
d92a080
8dceb2c
f8062b0
52fc2d5
621044d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,11 @@ | |
| import functools | ||
| import logging | ||
| from collections.abc import Callable | ||
| from contextlib import ExitStack | ||
| from typing import Any | ||
|
|
||
| from opentelemetry import trace | ||
|
|
||
| from workflows.recipe.recipe import Recipe | ||
| from workflows.recipe.validate import validate_recipe | ||
| from workflows.recipe.wrapper import RecipeWrapper | ||
|
|
@@ -68,11 +71,52 @@ def unwrap_recipe(header, message): | |
| if mangle_for_receiving: | ||
| message = mangle_for_receiving(message) | ||
| if header.get("workflows-recipe") in {True, "True", "true", 1}: | ||
| otel_logs = None | ||
|
davidigandan marked this conversation as resolved.
Outdated
|
||
| rw = RecipeWrapper(message=message, transport=transport_layer) | ||
| if log_extender and rw.environment and rw.environment.get("ID"): | ||
| with log_extender("recipe_ID", rw.environment["ID"]): | ||
|
|
||
| if hasattr(rw, "environment") and rw.environment.get("ID"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original check here is more correct, you want to check:
tl;dr the original if statement was what you still want
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i've reduced this to |
||
| # Extract recipe ID from environment and add to current span | ||
| span = trace.get_current_span() | ||
| recipe_id = rw.environment.get("ID") | ||
|
davidigandan marked this conversation as resolved.
Outdated
|
||
|
|
||
| if recipe_id: | ||
|
davidigandan marked this conversation as resolved.
Outdated
|
||
| span.set_attribute("recipe_id", recipe_id) | ||
|
|
||
| # Extract span_id and trace_id for logging | ||
| span_context = span.get_span_context() | ||
| if span_context and span_context.is_valid: | ||
| span_id = span_context.span_id | ||
| trace_id = span_context.trace_id | ||
|
|
||
| otel_logs = { | ||
| "span_id": span_id, | ||
| "trace_id": trace_id, | ||
| } | ||
|
|
||
| if recipe_id: | ||
|
davidigandan marked this conversation as resolved.
Outdated
|
||
| otel_logs["recipe_id"] = recipe_id | ||
|
|
||
| with ExitStack() as stack: | ||
| # Configure the context depending on if service is emitting spans | ||
| if ( | ||
| otel_logs | ||
| and log_extender | ||
| and rw.environment | ||
| and rw.environment.get("ID") | ||
| ): | ||
| stack.enter_context( | ||
| log_extender("recipe_ID", rw.environment.get("ID")) | ||
| ) | ||
| stack.enter_context(log_extender("otel_logs", otel_logs)) | ||
|
davidigandan marked this conversation as resolved.
Outdated
|
||
| elif log_extender and rw.environment and rw.environment.get("ID"): | ||
| stack.enter_context( | ||
| log_extender("recipe_ID", rw.environment.get("ID")) | ||
| ) | ||
|
davidigandan marked this conversation as resolved.
|
||
|
|
||
| return callback(rw, header, message.get("payload")) | ||
|
|
||
| return callback(rw, header, message.get("payload")) | ||
|
|
||
| if allow_non_recipe_messages: | ||
| return callback(None, header, message) | ||
| # self.log.warning('Discarding non-recipe message:\n' + \ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.