Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions alphatrion/tracing/clickhouse_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from alphatrion.storage.tracestore import TraceStore
from alphatrion.tracing.span_processor import (
SEMANTIC_KIND_DB,
SEMANTIC_KIND_HTTP,
SEMANTIC_KIND_MESSAGING,
SEMANTIC_KIND_REASONING,
SEMANTIC_KIND_RPC,
SEMANTIC_KIND_UNKNOWN,
)

Expand Down Expand Up @@ -248,30 +251,63 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
def determine_semantic_kind(attributes: dict[str, str]) -> str:
"""Determine the semantic kind of a span.

Priority order:
1. Extended thinking/reasoning (LLM with reasoning tokens)
2. Traceloop decorators (workflow, task, tool, agent)
3. LLM operations (chat, completion, embeddings)
4. Database operations
5. HTTP operations
6. Message queue operations
7. Unknown fallback

Args:
attributes: Span attributes

Returns:
Semantic kind string
Semantic kind string (workflow, task, tool, agent, chat, completion,
embeddings, reasoning, db, http, messaging, rpc, unknown)
"""
if not attributes:
return SEMANTIC_KIND_UNKNOWN

if (
"gen_ai.usage.reasoning_tokens" in attributes
and int(attributes["gen_ai.usage.reasoning_tokens"]) > 0
):
# Priority 1: Extended thinking/reasoning
# Check for LLM operations with reasoning tokens (o1, Claude extended thinking)
if "gen_ai.usage.reasoning_tokens" in attributes:
return SEMANTIC_KIND_REASONING

if "llm.request.type" in attributes:
return attributes["llm.request.type"]

# Check for database operations
# Priority 2: Traceloop decorators (@workflow, @task, @tool)
# These are explicitly decorated by developers and should take precedence
if "traceloop.span.kind" in attributes:
traceloop_kind = attributes["traceloop.span.kind"]
# Valid values: workflow, task, tool, agent
if traceloop_kind in ("workflow", "task", "tool", "agent"):
return traceloop_kind

# Priority 3: LLM operations (auto-instrumented by Traceloop)
# Check for GenAI operations from OpenTelemetry semantic conventions
if "gen_ai.operation.name" in attributes:
operation = attributes["gen_ai.operation.name"]
# Common values: chat, completion, embeddings
return operation

# Priority 4: Database operations
# Auto-instrumented by OpenTelemetry (psycopg2, SQLAlchemy, etc.)
if "db.system" in attributes or "db.statement" in attributes:
return SEMANTIC_KIND_DB

# One of workflow, task, agent, tool
if "traceloop.span.kind" in attributes:
traceloop_kind = attributes["traceloop.span.kind"]
return traceloop_kind
# Priority 5: HTTP operations
# Auto-instrumented by OpenTelemetry (requests, httpx, urllib3, etc.)
if "http.method" in attributes or "http.request.method" in attributes:
return SEMANTIC_KIND_HTTP

# Priority 6: Messaging/Queue operations
# Auto-instrumented by OpenTelemetry (RabbitMQ, Kafka, SQS, etc.)
if "messaging.system" in attributes:
return SEMANTIC_KIND_MESSAGING

# Priority 7: RPC operations
if "rpc.system" in attributes:
return SEMANTIC_KIND_RPC

# Default to unknown
# Default: unknown
return SEMANTIC_KIND_UNKNOWN
17 changes: 16 additions & 1 deletion alphatrion/tracing/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,25 @@
logger = logging.getLogger(__name__)

# Semantic kind enums:
# Core application spans (decorated with @workflow, @task, @tool)
SEMANTIC_KIND_WORKFLOW = "workflow"
SEMANTIC_KIND_TASK = "task"
SEMANTIC_KIND_TOOL = "tool"
SEMANTIC_KIND_REASONING = "reasoning"
SEMANTIC_KIND_AGENT = "agent"

# LLM operations
SEMANTIC_KIND_CHAT = "chat"
SEMANTIC_KIND_COMPLETION = "completion"
SEMANTIC_KIND_EMBEDDINGS = "embeddings"
SEMANTIC_KIND_REASONING = "reasoning"

# Infrastructure operations
SEMANTIC_KIND_DB = "db"
SEMANTIC_KIND_HTTP = "http"
SEMANTIC_KIND_MESSAGING = "messaging"
SEMANTIC_KIND_RPC = "rpc"

# Fallback
SEMANTIC_KIND_UNKNOWN = "unknown"


Expand Down
40 changes: 0 additions & 40 deletions tests/integration/test_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,46 +125,6 @@ async def token_workflow():
# Query spans with token data (use tracestore database name)
database = tracestore.database

# Debug: Check if any spans exist for this experiment
debug_query = f"""
SELECT COUNT(*) as count
FROM {database}.otel_spans
WHERE ExperimentId = '{experiment_id}'
"""
total_spans = tracestore.client.query(debug_query).result_rows[0][0]
print(f"DEBUG: Total spans for experiment {experiment_id}: {total_spans}")

# Debug: Check what span names we have
debug_query_names = f"""
SELECT SpanName, COUNT(*) as count
FROM {database}.otel_spans
WHERE ExperimentId = '{experiment_id}'
GROUP BY SpanName
ORDER BY count DESC
"""
span_names = tracestore.client.query(debug_query_names).result_rows
print(f"DEBUG: Span names: {span_names}")

# Debug: Check what attributes exist in spans
debug_query_attrs = f"""
SELECT SpanName, mapKeys(SpanAttributes) as attr_keys
FROM {database}.otel_spans
WHERE ExperimentId = '{experiment_id}'
LIMIT 5
"""
span_attrs = tracestore.client.query(debug_query_attrs).result_rows
print(f"DEBUG: Sample span attributes: {span_attrs}")

# Debug: Check spans with gen_ai.usage attributes
debug_query2 = f"""
SELECT COUNT(*) as count
FROM {database}.otel_spans
WHERE ExperimentId = '{experiment_id}'
AND mapContains(SpanAttributes, 'gen_ai.usage.input_tokens')
"""
llm_spans = tracestore.client.query(debug_query2).result_rows[0][0]
print(f"DEBUG: Spans with gen_ai.usage.input_tokens: {llm_spans}")

query = f"""
SELECT
SpanId as span_id,
Expand Down
Loading