diff --git a/alphatrion/tracing/clickhouse_exporter.py b/alphatrion/tracing/clickhouse_exporter.py index 9a84362..557eb9c 100644 --- a/alphatrion/tracing/clickhouse_exporter.py +++ b/alphatrion/tracing/clickhouse_exporter.py @@ -10,7 +10,10 @@ from alphatrion.storage.tracestore import TraceStore from alphatrion.tracing.span_processor import ( SEMANTIC_KIND_DB, + SEMANTIC_KIND_HTTP, + SEMANTIC_KIND_MESSAGING, SEMANTIC_KIND_REASONING, + SEMANTIC_KIND_RPC, SEMANTIC_KIND_UNKNOWN, ) @@ -248,30 +251,63 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: def determine_semantic_kind(attributes: dict[str, str]) -> str: """Determine the semantic kind of a span. + Priority order: + 1. Extended thinking/reasoning (LLM with reasoning tokens) + 2. Traceloop decorators (workflow, task, tool, agent) + 3. LLM operations (chat, completion, embeddings) + 4. Database operations + 5. HTTP operations + 6. Message queue operations + 7. Unknown fallback + Args: attributes: Span attributes Returns: - Semantic kind string + Semantic kind string (workflow, task, tool, agent, chat, completion, + embeddings, reasoning, db, http, messaging, rpc, unknown) """ + if not attributes: + return SEMANTIC_KIND_UNKNOWN - if ( - "gen_ai.usage.reasoning_tokens" in attributes - and int(attributes["gen_ai.usage.reasoning_tokens"]) > 0 - ): + # Priority 1: Extended thinking/reasoning + # Check for LLM operations with reasoning tokens (o1, Claude extended thinking) + if "gen_ai.usage.reasoning_tokens" in attributes: return SEMANTIC_KIND_REASONING - if "llm.request.type" in attributes: - return attributes["llm.request.type"] - - # Check for database operations + # Priority 2: Traceloop decorators (@workflow, @task, @tool) + # These are explicitly decorated by developers and should take precedence + if "traceloop.span.kind" in attributes: + traceloop_kind = attributes["traceloop.span.kind"] + # Valid values: workflow, task, tool, agent + if traceloop_kind in ("workflow", "task", "tool", "agent"): + return traceloop_kind + + # Priority 3: LLM operations (auto-instrumented by Traceloop) + # Check for GenAI operations from OpenTelemetry semantic conventions + if "gen_ai.operation.name" in attributes: + operation = attributes["gen_ai.operation.name"] + # Common values: chat, completion, embeddings + return operation + + # Priority 4: Database operations + # Auto-instrumented by OpenTelemetry (psycopg2, SQLAlchemy, etc.) if "db.system" in attributes or "db.statement" in attributes: return SEMANTIC_KIND_DB - # One of workflow, task, agent, tool - if "traceloop.span.kind" in attributes: - traceloop_kind = attributes["traceloop.span.kind"] - return traceloop_kind + # Priority 5: HTTP operations + # Auto-instrumented by OpenTelemetry (requests, httpx, urllib3, etc.) + if "http.method" in attributes or "http.request.method" in attributes: + return SEMANTIC_KIND_HTTP + + # Priority 6: Messaging/Queue operations + # Auto-instrumented by OpenTelemetry (RabbitMQ, Kafka, SQS, etc.) + if "messaging.system" in attributes: + return SEMANTIC_KIND_MESSAGING + + # Priority 7: RPC operations + if "rpc.system" in attributes: + return SEMANTIC_KIND_RPC - # Default to unknown + # Default: unknown return SEMANTIC_KIND_UNKNOWN diff --git a/alphatrion/tracing/span_processor.py b/alphatrion/tracing/span_processor.py index 48e5966..bdaa888 100644 --- a/alphatrion/tracing/span_processor.py +++ b/alphatrion/tracing/span_processor.py @@ -11,10 +11,25 @@ logger = logging.getLogger(__name__) # Semantic kind enums: +# Core application spans (decorated with @workflow, @task, @tool) +SEMANTIC_KIND_WORKFLOW = "workflow" +SEMANTIC_KIND_TASK = "task" SEMANTIC_KIND_TOOL = "tool" -SEMANTIC_KIND_REASONING = "reasoning" +SEMANTIC_KIND_AGENT = "agent" + +# LLM operations SEMANTIC_KIND_CHAT = "chat" +SEMANTIC_KIND_COMPLETION = "completion" +SEMANTIC_KIND_EMBEDDINGS = "embeddings" +SEMANTIC_KIND_REASONING = "reasoning" + +# Infrastructure operations SEMANTIC_KIND_DB = "db" +SEMANTIC_KIND_HTTP = "http" +SEMANTIC_KIND_MESSAGING = "messaging" +SEMANTIC_KIND_RPC = "rpc" + +# Fallback SEMANTIC_KIND_UNKNOWN = "unknown" diff --git a/tests/integration/test_tracking.py b/tests/integration/test_tracking.py index 56b8697..538d962 100644 --- a/tests/integration/test_tracking.py +++ b/tests/integration/test_tracking.py @@ -125,46 +125,6 @@ async def token_workflow(): # Query spans with token data (use tracestore database name) database = tracestore.database - # Debug: Check if any spans exist for this experiment - debug_query = f""" - SELECT COUNT(*) as count - FROM {database}.otel_spans - WHERE ExperimentId = '{experiment_id}' - """ - total_spans = tracestore.client.query(debug_query).result_rows[0][0] - print(f"DEBUG: Total spans for experiment {experiment_id}: {total_spans}") - - # Debug: Check what span names we have - debug_query_names = f""" - SELECT SpanName, COUNT(*) as count - FROM {database}.otel_spans - WHERE ExperimentId = '{experiment_id}' - GROUP BY SpanName - ORDER BY count DESC - """ - span_names = tracestore.client.query(debug_query_names).result_rows - print(f"DEBUG: Span names: {span_names}") - - # Debug: Check what attributes exist in spans - debug_query_attrs = f""" - SELECT SpanName, mapKeys(SpanAttributes) as attr_keys - FROM {database}.otel_spans - WHERE ExperimentId = '{experiment_id}' - LIMIT 5 - """ - span_attrs = tracestore.client.query(debug_query_attrs).result_rows - print(f"DEBUG: Sample span attributes: {span_attrs}") - - # Debug: Check spans with gen_ai.usage attributes - debug_query2 = f""" - SELECT COUNT(*) as count - FROM {database}.otel_spans - WHERE ExperimentId = '{experiment_id}' - AND mapContains(SpanAttributes, 'gen_ai.usage.input_tokens') - """ - llm_spans = tracestore.client.query(debug_query2).result_rows[0][0] - print(f"DEBUG: Spans with gen_ai.usage.input_tokens: {llm_spans}") - query = f""" SELECT SpanId as span_id,