Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
722bf3e
Add basic tracing middleware and global control
davidigandan Jan 13, 2026
52cb04d
Instrument on subscribe and add dcid to span attributes
davidigandan Jan 26, 2026
cc9ee12
Add spanid and traceid metadata to greylog
davidigandan Jan 26, 2026
f7cc658
Add recipe_id to spans
davidigandan Jan 26, 2026
8b2a2f1
Add dev and prod dependencies
davidigandan Jan 26, 2026
0686e28
Remove dcid extract from message and inject to span logic. Will be ad…
davidigandan Jan 26, 2026
2d9e21c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
3a5283a
Use plugin configurations to configure connection to OTELCollector
davidigandan Jan 26, 2026
4b999f1
Remove vestigial dcid handling and unnecessary debug statements
davidigandan Jan 26, 2026
4b86715
remove unhelpful docstring
davidigandan Jan 26, 2026
9c13d07
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 26, 2026
3e0b902
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
d446e80
imported OTEL config class to common_service
davidigandan Jan 26, 2026
16b0e10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
7ad857f
add marshmallow dependency
davidigandan Jan 27, 2026
468f940
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 27, 2026
7aae664
add zocalo dependency
davidigandan Jan 27, 2026
902a7df
Fix possibly unbound error
davidigandan Jan 27, 2026
9e5adb7
Moved plugin functionality to python-workflows
davidigandan Feb 2, 2026
1d7457e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 2, 2026
ff3679c
Fixed typos, vestigial code and improper use of log_extender
davidigandan Feb 6, 2026
e6be925
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
aebe1ca
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
77eb9e9
Implement ExitStack() to manage multiple context managers and clean t…
davidigandan Feb 6, 2026
5f94077
Fix broken tracing functionality
davidigandan Feb 17, 2026
786bd00
Fix
davidigandan Feb 17, 2026
33fbce2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
ed6bc93
Fix rw.environment.get('ID') bug
davidigandan Feb 17, 2026
2369ba1
Ensure environment and environment.id exists
davidigandan Feb 17, 2026
554baa2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
7221ec4
Remove the need for enironment variable in mock
davidigandan Feb 17, 2026
b7e7999
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
2fd00fa
Fix ruff error
davidigandan Feb 17, 2026
e4f69ed
removed redundant libs from requirements_dev.txt
davidigandan Feb 24, 2026
a6fdbb1
Use compatible release pinning
davidigandan Feb 24, 2026
229214f
Remove the option to manually configure full endpoint
davidigandan Feb 24, 2026
30a67d2
remove debugging
davidigandan Feb 24, 2026
6897e13
remove debugging
davidigandan Feb 24, 2026
5aae712
moved otel_logs closer to relevant code block
davidigandan Feb 24, 2026
0d4d825
simplified logic for log context and corrected self.config.openteleme…
davidigandan Feb 24, 2026
106fcd8
abstracted away span attribute setting logic
davidigandan Feb 24, 2026
d437b47
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 24, 2026
85eba77
remove unnecessary pinning of dependencies and correct formatting in …
davidigandan Mar 3, 2026
4004a7d
remove all dependency pinning
davidigandan Mar 3, 2026
4202c21
Give span_id and trace_id their own log_extender contexts, so that th…
davidigandan Mar 11, 2026
d92a080
Remove the possibility of KeyError exception being thrown if rw.envir…
davidigandan Mar 11, 2026
8dceb2c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 11, 2026
f8062b0
Fix commit pinning
davidigandan Mar 11, 2026
52fc2d5
Remove all version pinning
davidigandan Mar 11, 2026
621044d
Remove unnecessary condition on if block. There is no need to guard a…
davidigandan Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ]
Comment thread
davidigandan marked this conversation as resolved.
Outdated

[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down Expand Up @@ -53,6 +53,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport"
pika = "workflows.util.zocalo.configuration:Pika"
stomp = "workflows.util.zocalo.configuration:Stomp"
transport = "workflows.util.zocalo.configuration:DefaultTransport"
opentelemetry = "workflows.util.zocalo.configuration:OTEL"

[project.scripts]
"workflows.validate_recipe" = "workflows.recipe.validate:main"
Expand Down
4 changes: 4 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
marshmallow
Comment thread
davidigandan marked this conversation as resolved.
Outdated
48 changes: 46 additions & 2 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import functools
import logging
from collections.abc import Callable
from contextlib import ExitStack
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -68,11 +71,52 @@ def unwrap_recipe(header, message):
if mangle_for_receiving:
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
otel_logs = None
Comment thread
davidigandan marked this conversation as resolved.
Outdated
rw = RecipeWrapper(message=message, transport=transport_layer)
if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):

if hasattr(rw, "environment") and rw.environment.get("ID"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original check here is more correct, you want to check:

  • log_extender is truthy, otherwise we cannot append the otel stuff to it
  • rw.environment is not empty (RecipeWrapper guarantees that this field is present, so hasattr(rw, "environment") is always true)
  • rw.environment.get("ID") - none of this makes sense if there is no recipe ID, we are not part of a normal workflow.

tl;dr the original if statement was what you still want

Copy link
Copy Markdown
Contributor Author

@davidigandan davidigandan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've reduced this to if log_extender and rw.environment.get("ID"):.
i think this may be better. I'll see how the tests go and resolve this conversation if they pass

# Extract recipe ID from environment and add to current span
span = trace.get_current_span()
recipe_id = rw.environment.get("ID")
Comment thread
davidigandan marked this conversation as resolved.
Outdated

if recipe_id:
Comment thread
davidigandan marked this conversation as resolved.
Outdated
span.set_attribute("recipe_id", recipe_id)

# Extract span_id and trace_id for logging
span_context = span.get_span_context()
if span_context and span_context.is_valid:
span_id = span_context.span_id
trace_id = span_context.trace_id

otel_logs = {
"span_id": span_id,
"trace_id": trace_id,
}

if recipe_id:
Comment thread
davidigandan marked this conversation as resolved.
Outdated
otel_logs["recipe_id"] = recipe_id

with ExitStack() as stack:
# Configure the context depending on if service is emitting spans
if (
otel_logs
and log_extender
and rw.environment
and rw.environment.get("ID")
):
stack.enter_context(
log_extender("recipe_ID", rw.environment.get("ID"))
)
stack.enter_context(log_extender("otel_logs", otel_logs))
Comment thread
davidigandan marked this conversation as resolved.
Outdated
elif log_extender and rw.environment and rw.environment.get("ID"):
stack.enter_context(
log_extender("recipe_ID", rw.environment.get("ID"))
)
Comment thread
davidigandan marked this conversation as resolved.

return callback(rw, header, message.get("payload"))

return callback(rw, header, message.get("payload"))

if allow_non_recipe_messages:
return callback(None, header, message)
# self.log.warning('Discarding non-recipe message:\n' + \
Expand Down
41 changes: 41 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +192,40 @@ def start_transport(self):
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)

# Configure OTELTracing if configuration is available
otel_config = (
self.config._opentelemetry
if self.config and hasattr(self.config, "opentelemetry")
Comment thread
davidigandan marked this conversation as resolved.
Outdated
else None
)
if otel_config:
# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(span_processor)

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)

metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
5 changes: 5 additions & 0 deletions src/workflows/transport/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ def wrapped_callback(header, message):


def wrap(f: Callable):
# debugging
if f.__name__ == "send":
print("we are wrapping send now")

Comment thread
davidigandan marked this conversation as resolved.
Outdated
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
return functools.reduce(
Expand All @@ -243,4 +247,5 @@ def wrapper(self, *args, **kwargs):
lambda *args, **kwargs: f(self, *args, **kwargs),
)(*args, **kwargs)

print(wrapper.__wrapped__)
Comment thread
davidigandan marked this conversation as resolved.
Outdated
return wrapper
Loading
Loading