Skip to content

Commit ca30df6

Browse files
pmcfadinclaude
andcommitted
fix: address code review comments for user activity module (closes #13)
- Remove unnecessary camelCase aliases from UserActivity (DB cols are snake_case); keep validation_alias on UserActivityResponse for accepting snake_case input - Add ACTIVITY_TYPES = Literal["view","comment","rate"] and apply it to record_user_activity parameter and UserActivity.activity_type field - Add __all__ export list to user_activity.py consistent with comment.py - Serialize activity_timestamp as isoformat string before DB insert - Replace serial 30-day loop with asyncio.gather() for concurrent partition queries - Add MAX_ACTIVITY_ROWS = 1000 hard cap to prevent unbounded memory usage - Add per-partition try/except in _fetch_day_rows so one bad partition does not abort the entire read - Remove dead/overwritten mock setup code from test_list_user_activity_returns_paginated_results - Add test_list_user_activity_fetches_table (read-path get_table coverage) - Add test_list_user_activity_error_in_partition_is_skipped (error isolation) - Add test_record_user_activity_timestamp_is_isoformat_string Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fd7e75c commit ca30df6

3 files changed

Lines changed: 564 additions & 0 deletions

File tree

app/models/user_activity.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""Pydantic models for user activity tracking."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import datetime, timezone
6+
from typing import Literal, Optional
7+
from uuid import UUID
8+
9+
from pydantic import BaseModel, Field, ConfigDict
10+
11+
from app.models.common import UserID
12+
13+
ACTIVITY_TYPES = Literal["view", "comment", "rate"]
14+
15+
16+
class UserActivity(BaseModel):
17+
"""Internal representation of a user activity row.
18+
19+
Field names match DB column names (snake_case) exactly — no aliases needed.
20+
"""
21+
22+
model_config = ConfigDict(populate_by_name=True, from_attributes=True)
23+
24+
userid: UserID
25+
day: str
26+
activity_type: ACTIVITY_TYPES
27+
activity_id: UUID
28+
activity_timestamp: datetime
29+
30+
31+
class UserActivityResponse(BaseModel):
32+
"""API response representation for a single user activity item."""
33+
34+
model_config = ConfigDict(populate_by_name=True, from_attributes=True)
35+
36+
userId: UserID = Field(..., validation_alias="userid")
37+
activityType: str = Field(..., validation_alias="activity_type")
38+
activityId: UUID = Field(..., validation_alias="activity_id")
39+
activityTimestamp: datetime = Field(..., validation_alias="activity_timestamp")
40+
41+
42+
__all__ = [
43+
"ACTIVITY_TYPES",
44+
"UserActivity",
45+
"UserActivityResponse",
46+
]
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
"""Service layer for tracking user activity (views, comments, ratings)."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import logging
7+
from datetime import datetime, timezone, timedelta
8+
from typing import Optional, List, Tuple
9+
from uuid import UUID, uuid1
10+
11+
from app.db.astra_client import get_table, AstraDBCollection
12+
from app.models.user_activity import UserActivity, ACTIVITY_TYPES
13+
14+
USER_ACTIVITY_TABLE_NAME = "user_activity"
15+
16+
ANONYMOUS_USER_ID = UUID("00000000-0000-0000-0000-000000000000")
17+
18+
# Hard cap on total rows scanned across all 30 partitions to prevent OOM.
19+
MAX_ACTIVITY_ROWS = 1000
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
async def record_user_activity(
25+
userid: UUID,
26+
activity_type: ACTIVITY_TYPES,
27+
activity_id: Optional[UUID] = None,
28+
db_table: Optional[AstraDBCollection] = None,
29+
) -> None:
30+
"""Insert a single user activity row.
31+
32+
Parameters
33+
----------
34+
userid:
35+
The user who performed the action (use ANONYMOUS_USER_ID for unauthenticated).
36+
activity_type:
37+
One of 'view', 'comment', 'rate'.
38+
activity_id:
39+
Optional time-based UUID linking back to the activity. Auto-generated if not provided.
40+
db_table:
41+
Optional pre-fetched table reference (for testing).
42+
"""
43+
44+
if db_table is None:
45+
db_table = await get_table(USER_ACTIVITY_TABLE_NAME)
46+
47+
if activity_id is None:
48+
activity_id = uuid1()
49+
50+
now_utc = datetime.now(timezone.utc)
51+
day_partition = now_utc.strftime("%Y-%m-%d")
52+
53+
await db_table.insert_one(
54+
{
55+
"userid": str(userid),
56+
"day": day_partition,
57+
"activity_type": activity_type,
58+
"activity_id": str(activity_id),
59+
"activity_timestamp": now_utc.isoformat(),
60+
}
61+
)
62+
63+
64+
async def _fetch_day_rows(
65+
db_table: AstraDBCollection,
66+
userid: UUID,
67+
day_key: str,
68+
activity_type: Optional[str],
69+
limit: int,
70+
) -> List[dict]:
71+
"""Fetch activity rows for a single day partition.
72+
73+
Returns an empty list on error so one bad partition does not abort the
74+
entire read.
75+
"""
76+
try:
77+
query_filter: dict = {"userid": str(userid), "day": day_key}
78+
if activity_type:
79+
query_filter["activity_type"] = activity_type
80+
81+
cursor = db_table.find(filter=query_filter, limit=limit)
82+
83+
if hasattr(cursor, "to_list"):
84+
return await cursor.to_list()
85+
return cursor # type: ignore[return-value]
86+
except Exception:
87+
logger.warning(
88+
"Failed to fetch user activity for day=%s userid=%s; skipping partition.",
89+
day_key,
90+
userid,
91+
exc_info=True,
92+
)
93+
return []
94+
95+
96+
async def list_user_activity(
97+
userid: UUID,
98+
page: int,
99+
page_size: int,
100+
activity_type: Optional[str] = None,
101+
db_table: Optional[AstraDBCollection] = None,
102+
) -> Tuple[List[UserActivity], int]:
103+
"""Query user activity across the last 30 days of partitions.
104+
105+
Queries all 30 day-partitions concurrently via asyncio.gather and applies a
106+
hard cap of MAX_ACTIVITY_ROWS total rows to prevent unbounded memory usage.
107+
108+
Returns
109+
-------
110+
Tuple[List[UserActivity], int]
111+
A page of activity items and the total count.
112+
"""
113+
114+
if db_table is None:
115+
db_table = await get_table(USER_ACTIVITY_TABLE_NAME)
116+
117+
today = datetime.now(timezone.utc).date()
118+
start_date = today - timedelta(days=29)
119+
120+
partition_keys: List[str] = [
121+
(start_date + timedelta(days=delta)).strftime("%Y-%m-%d")
122+
for delta in range(30)
123+
]
124+
125+
# Run all 30 partition queries concurrently; pass per-day limit to avoid
126+
# pulling more rows than we could ever display.
127+
per_day_limit = MAX_ACTIVITY_ROWS
128+
129+
results: List[List[dict]] = await asyncio.gather(
130+
*[
131+
_fetch_day_rows(db_table, userid, day_key, activity_type, per_day_limit)
132+
for day_key in partition_keys
133+
]
134+
)
135+
136+
all_rows: List[dict] = []
137+
for day_rows in results:
138+
all_rows.extend(day_rows)
139+
if len(all_rows) >= MAX_ACTIVITY_ROWS:
140+
all_rows = all_rows[:MAX_ACTIVITY_ROWS]
141+
break
142+
143+
# Sort by activity_timestamp descending (newest first)
144+
all_rows.sort(
145+
key=lambda r: r.get("activity_timestamp", ""),
146+
reverse=True,
147+
)
148+
149+
total = len(all_rows)
150+
151+
# Paginate
152+
skip = (page - 1) * page_size
153+
page_rows = all_rows[skip : skip + page_size]
154+
155+
activities = [UserActivity.model_validate(r) for r in page_rows]
156+
return activities, total

0 commit comments

Comments
 (0)