Skip to content

Commit 1aa958e

Browse files
committed
feat(metrics): deep instrumentation and custom Prometheus histograms (refs #7)
1 parent 4619040 commit 1aa958e

7 files changed

Lines changed: 382 additions & 113 deletions

File tree

app/external_services/youtube_metadata.py

Lines changed: 87 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -63,62 +63,102 @@ async def _fetch_v3_api(
6363
) -> YouTubeMetadata:
6464
"""Fetch metadata using the official YouTube Data API v3."""
6565

66-
url = (
67-
"https://www.googleapis.com/youtube/v3/videos?part=snippet"
68-
f"&id={youtube_id}&key={api_key}"
69-
)
70-
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
71-
resp = await client.get(url)
72-
if resp.status_code != 200:
73-
raise MetadataFetchError(
74-
f"Data API returned HTTP {resp.status_code}: {resp.text[:200]}"
75-
)
76-
data = resp.json()
77-
items = data.get("items") or []
78-
if not items:
79-
raise MetadataFetchError(
80-
"Video not found or no snippet returned from Data API"
81-
)
66+
# ------------------------------------------------------------------
67+
# Observability – manual span & histogram timing
68+
# ------------------------------------------------------------------
69+
70+
from opentelemetry import trace
71+
import time
72+
from app.metrics import YOUTUBE_FETCH_DURATION_SECONDS
73+
74+
tracer = trace.get_tracer(__name__)
75+
76+
start_time = time.perf_counter()
8277

83-
snippet = items[0].get("snippet") or {}
84-
return YouTubeMetadata(
85-
title=snippet.get("title", ""),
86-
description=snippet.get("description"),
87-
thumbnail_url=snippet.get("thumbnails"), # handled by validator
88-
tags=snippet.get("tags", []),
78+
with tracer.start_as_current_span("youtube.fetch_v3_api") as span:
79+
span.set_attribute("youtube.video_id", youtube_id)
80+
81+
url = (
82+
"https://www.googleapis.com/youtube/v3/videos?part=snippet"
83+
f"&id={youtube_id}&key={api_key}"
8984
)
85+
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
86+
resp = await client.get(url)
87+
if resp.status_code != 200:
88+
raise MetadataFetchError(
89+
f"Data API returned HTTP {resp.status_code}: {resp.text[:200]}"
90+
)
91+
data = resp.json()
92+
items = data.get("items") or []
93+
if not items:
94+
raise MetadataFetchError(
95+
"Video not found or no snippet returned from Data API"
96+
)
97+
98+
snippet = items[0].get("snippet") or {}
99+
result = YouTubeMetadata(
100+
title=snippet.get("title", ""),
101+
description=snippet.get("description"),
102+
thumbnail_url=snippet.get("thumbnails"), # handled by validator
103+
tags=snippet.get("tags", []),
104+
)
105+
106+
# Record duration & size metrics
107+
duration = time.perf_counter() - start_time
108+
YOUTUBE_FETCH_DURATION_SECONDS.labels(method="v3_api").observe(duration)
109+
span.set_attribute("duration_ms", int(duration * 1000))
110+
span.set_attribute("title_length", len(result.title))
111+
112+
return result
90113

91114

92115
async def _fetch_oembed(youtube_id: str, timeout: float) -> YouTubeMetadata:
93116
"""Fetch metadata using YouTube's public oEmbed endpoint."""
94117

95-
url = (
96-
"https://www.youtube.com/oembed?format=json&url="
97-
f"https://youtu.be/{youtube_id}"
98-
)
99-
print(f"DEBUG _fetch_oembed: url={url}")
100-
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
101-
resp = await client.get(url)
102-
if resp.status_code != 200:
103-
raise MetadataFetchError(
104-
f"oEmbed returned HTTP {resp.status_code}: {resp.text[:200]}"
105-
)
106-
print(f"DEBUG _fetch_oembed: resp={resp.text}")
107-
data = resp.json()
108-
title = data.get("title")
109-
if not title:
110-
raise MetadataFetchError("oEmbed response missing title field")
111-
thumb = (
112-
data.get("thumbnail_url")
113-
or f"https://i.ytimg.com/vi/{youtube_id}/hqdefault.jpg"
114-
)
118+
from opentelemetry import trace
119+
import time
120+
from app.metrics import YOUTUBE_FETCH_DURATION_SECONDS
121+
122+
tracer = trace.get_tracer(__name__)
123+
124+
start_time = time.perf_counter()
115125

116-
return YouTubeMetadata(
117-
title=title,
118-
description=None, # oEmbed does not provide description
119-
thumbnail_url=thumb,
120-
tags=[],
126+
with tracer.start_as_current_span("youtube.fetch_oembed") as span:
127+
span.set_attribute("youtube.video_id", youtube_id)
128+
129+
url = (
130+
"https://www.youtube.com/oembed?format=json&url="
131+
f"https://youtu.be/{youtube_id}"
121132
)
133+
print(f"DEBUG _fetch_oembed: url={url}")
134+
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
135+
resp = await client.get(url)
136+
if resp.status_code != 200:
137+
raise MetadataFetchError(
138+
f"oEmbed returned HTTP {resp.status_code}: {resp.text[:200]}"
139+
)
140+
print(f"DEBUG _fetch_oembed: resp={resp.text}")
141+
data = resp.json()
142+
title = data.get("title")
143+
if not title:
144+
raise MetadataFetchError("oEmbed response missing title field")
145+
thumb = (
146+
data.get("thumbnail_url")
147+
or f"https://i.ytimg.com/vi/{youtube_id}/hqdefault.jpg"
148+
)
149+
150+
result = YouTubeMetadata(
151+
title=title,
152+
description=None, # oEmbed does not provide description
153+
thumbnail_url=thumb,
154+
tags=[],
155+
)
156+
157+
duration = time.perf_counter() - start_time
158+
YOUTUBE_FETCH_DURATION_SECONDS.labels(method="oembed").observe(duration)
159+
span.set_attribute("duration_ms", int(duration * 1000))
160+
161+
return result
122162

123163

124164
# ---------------------------------------------------------------------------

app/metrics.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from prometheus_client import Histogram
2+
3+
# ---------------------------------------------------------------------------
4+
# Custom Prometheus metrics – exported via /metrics route exposed by
5+
# prometheus_fastapi_instrumentator in app.utils.observability.configure_observability().
6+
# ---------------------------------------------------------------------------
7+
8+
ASTRA_DB_QUERY_DURATION_SECONDS = Histogram(
9+
"astra_db_query_duration_seconds",
10+
"Latency of Astra DB Data API queries (seconds)",
11+
["operation"],
12+
)
13+
14+
YOUTUBE_FETCH_DURATION_SECONDS = Histogram(
15+
"youtube_fetch_duration_seconds",
16+
"Latency of YouTube metadata fetches (seconds)",
17+
["method"],
18+
)
19+
20+
VECTOR_SEARCH_DURATION_SECONDS = Histogram(
21+
"vector_search_duration_seconds",
22+
"Latency of semantic vector search operations (seconds)",
23+
)
24+
25+
RECOMMENDATION_DURATION_SECONDS = Histogram(
26+
"recommendation_generation_duration_seconds",
27+
"Latency of recommendation engine stub (seconds)",
28+
)

app/services/recommendation_service.py

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,51 @@ async def get_related_videos(
2424
and assign each a random relevance score.
2525
"""
2626

27-
# Ensure the referenced video exists – if it does not, we treat the request
28-
# as valid but return an empty list. The caller is free to 404 at the API
29-
# layer if it wishes to enforce existence – keeping this generic allows the
30-
# service to be reused from different contexts.
31-
target_video = await video_service.get_video_by_id(video_id)
32-
if target_video is None:
33-
return []
34-
35-
latest_summaries, _total = await video_service.list_latest_videos(
36-
page=1, page_size=limit + 5
37-
)
27+
from opentelemetry import trace
28+
import time
29+
from app.metrics import RECOMMENDATION_DURATION_SECONDS
30+
31+
tracer = trace.get_tracer(__name__)
32+
start_time = time.perf_counter()
33+
34+
with tracer.start_as_current_span("recommend.related_videos") as span:
35+
span.set_attribute("video_id", str(video_id))
36+
37+
# Ensure the referenced video exists – if it does not, we treat the request
38+
# as valid but return an empty list. The caller is free to 404 at the API
39+
# layer if it wishes to enforce existence – keeping this generic allows the
40+
# service to be reused from different contexts.
41+
target_video = await video_service.get_video_by_id(video_id)
42+
if target_video is None:
43+
return []
44+
45+
latest_summaries, _total = await video_service.list_latest_videos(
46+
page=1, page_size=limit + 5
47+
)
3848

39-
related_items: List[RecommendationItem] = []
40-
41-
for summary in latest_summaries:
42-
if summary.videoId == video_id:
43-
# Skip the source video itself
44-
continue
45-
if len(related_items) >= limit:
46-
break
47-
related_items.append(
48-
RecommendationItem(
49-
videoId=summary.videoId,
50-
title=summary.title,
51-
thumbnailUrl=summary.thumbnailUrl,
52-
score=round(random.uniform(0.5, 1.0), 2),
49+
related_items: List[RecommendationItem] = []
50+
51+
for summary in latest_summaries:
52+
if summary.videoId == video_id:
53+
# Skip the source video itself
54+
continue
55+
if len(related_items) >= limit:
56+
break
57+
related_items.append(
58+
RecommendationItem(
59+
videoId=summary.videoId,
60+
title=summary.title,
61+
thumbnailUrl=summary.thumbnailUrl,
62+
score=round(random.uniform(0.5, 1.0), 2),
63+
)
5364
)
54-
)
5565

56-
return related_items
66+
duration = time.perf_counter() - start_time
67+
RECOMMENDATION_DURATION_SECONDS.observe(duration)
68+
span.set_attribute("duration_ms", int(duration * 1000))
69+
span.set_attribute("result_count", len(related_items))
70+
71+
return related_items
5772

5873

5974
async def get_personalized_for_you_videos(
@@ -68,15 +83,31 @@ async def get_personalized_for_you_videos(
6883
recommender can be dropped-in later without further API changes.
6984
"""
7085

71-
# For visibility during development/testing.
72-
print(
73-
f"STUB: Generating 'For You' feed for user {current_user.userId} (page={page}, page_size={page_size})"
74-
)
86+
from opentelemetry import trace
87+
import time
88+
from app.metrics import RECOMMENDATION_DURATION_SECONDS
7589

76-
videos, total_items = await video_service.list_latest_videos(
77-
page=page, page_size=page_size
78-
)
79-
return videos, total_items
90+
tracer = trace.get_tracer(__name__)
91+
start_time = time.perf_counter()
92+
93+
with tracer.start_as_current_span("recommend.for_you") as span:
94+
span.set_attribute("user_id", str(current_user.userId))
95+
96+
# For visibility during development/testing.
97+
print(
98+
f"STUB: Generating 'For You' feed for user {current_user.userId} (page={page}, page_size={page_size})"
99+
)
100+
101+
videos, total_items = await video_service.list_latest_videos(
102+
page=page, page_size=page_size
103+
)
104+
105+
duration = time.perf_counter() - start_time
106+
RECOMMENDATION_DURATION_SECONDS.observe(duration)
107+
span.set_attribute("duration_ms", int(duration * 1000))
108+
span.set_attribute("result_count", total_items)
109+
110+
return videos, total_items
80111

81112

82113
# ---------------------------------------------------------------------------

app/services/vector_search_utils.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,29 @@ async def semantic_search_with_threshold(
5454
for typical thresholds around 0.7-0.9.
5555
"""
5656

57+
from opentelemetry import trace
58+
import time
59+
from app.metrics import VECTOR_SEARCH_DURATION_SECONDS
60+
61+
tracer = trace.get_tracer(__name__)
62+
5763
if page < 1 or page_size < 1:
5864
return [], 0
5965

6066
# Ask Astra for a generous slice so we can trim client-side.
6167
overfetch = page_size * overfetch_factor * page # grow with page number
6268

63-
cursor = db_table.find(
64-
filter={},
65-
sort={vector_column: query},
66-
limit=overfetch,
67-
include_similarity=True, # ⭐
68-
)
69+
start_time = time.perf_counter()
70+
71+
with tracer.start_as_current_span("vector.search") as span:
72+
span.set_attribute("query", query[:64]) # truncate long queries for span
73+
74+
cursor = db_table.find(
75+
filter={},
76+
sort={vector_column: query},
77+
limit=overfetch,
78+
include_similarity=True, # ⭐
79+
)
6980

7081
# Fetch docs.
7182
docs: List[Dict[str, Any]]
@@ -85,13 +96,22 @@ async def semantic_search_with_threshold(
8596
pre_trim = len(docs)
8697
docs = [d for d in docs if d.get("$similarity", 0) >= similarity_threshold]
8798
logger.debug(
88-
"Trimmed by threshold %.2f: %s → %s docs", similarity_threshold, pre_trim, len(docs)
99+
"Trimmed by threshold %.2f: %s → %s docs",
100+
similarity_threshold,
101+
pre_trim,
102+
len(docs),
89103
)
90104

91-
if docs:
92-
logger.debug(
93-
"Top doc similarity after trim: %.3f", docs[0].get("$similarity", -1.0)
94-
)
105+
if docs:
106+
logger.debug(
107+
"Top doc similarity after trim: %.3f", docs[0].get("$similarity", -1.0)
108+
)
109+
110+
# Record metrics
111+
duration = time.perf_counter() - start_time
112+
VECTOR_SEARCH_DURATION_SECONDS.observe(duration)
113+
span.set_attribute("duration_ms", int(duration * 1000))
114+
span.set_attribute("total_results", len(docs))
95115

96116
total = len(docs)
97117

@@ -102,4 +122,4 @@ async def semantic_search_with_threshold(
102122

103123
summaries = [VideoSummary.model_validate(d) for d in page_docs]
104124

105-
return summaries, total
125+
return summaries, total

0 commit comments

Comments
 (0)