Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI

on:
push:
branches: [dev]
branches: [main, dev]
pull_request:
branches: [main, dev]

Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) — versioning

---

## [1.0.3] — 2026-05-15

### Added

- `DRIFT` insight type: detects progressive latency degradation using ordinary least squares over the last 30 days — emitted when slope ≥ 5ms/day over 7+ data points, with a 30-day projection
- `DRIFT` filter chip added to the dashboard Insights view
- 60 unit tests covering `aggregator`, `database`, `insights` and `middleware`

### Fixed

- CI badge was pointing to `main` with no workflow run — CI now triggers on both `main` and `dev`

---

## [1.0.0] — 2026-05-15

### Fixed
Expand Down
5 changes: 3 additions & 2 deletions apiforgepy/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,9 @@

const types = [
{id:'ALL',label:'All'},{id:'PERF',label:'Performance'},
{id:'ANOMALY',label:'Anomaly'},{id:'DEAD',label:'Dead'},
{id:'UNTRACKED',label:'Untracked'},{id:'OK',label:'OK'},
{id:'DRIFT',label:'Drift'},{id:'ANOMALY',label:'Anomaly'},
{id:'DEAD',label:'Dead'},{id:'UNTRACKED',label:'Untracked'},
{id:'OK',label:'OK'},
];
const filtered = INSIGHTS.filter(i =>
(typeFilter === 'ALL' || i.type === typeFilter) &&
Expand Down
15 changes: 15 additions & 0 deletions apiforgepy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,21 @@ def get_releases(self) -> list[dict]:
""").fetchall()
return [dict(r) for r in rows]

def get_drift_data(self) -> list[dict]:
"""Returns one row per (route, method, day) over the last 30 days for drift detection."""
since_30d = _now_sec() - 30 * 86_400
rows = self._conn.execute("""
SELECT
route, method,
CAST(bucket_ts / 86400 AS INTEGER) as day_bucket,
AVG(lat_p90) as p90
FROM api_metrics
WHERE bucket_ts >= ? AND lat_p90 IS NOT NULL
GROUP BY route, method, day_bucket
ORDER BY route, method, day_bucket
""", (since_30d,)).fetchall()
return [dict(r) for r in rows]

def get_global_time_series(self, hours: int = 24) -> list[dict]:
since = _now_sec() - hours * 3600
rows = self._conn.execute("""
Expand Down
70 changes: 67 additions & 3 deletions apiforgepy/insights.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import math
import time

DEAD_ENDPOINT_DAYS = 21
REGRESSION_THRESHOLD = 0.20
ANOMALY_Z_THRESHOLD = 2.5
DEAD_ENDPOINT_DAYS = 21
REGRESSION_THRESHOLD = 0.20
ANOMALY_Z_THRESHOLD = 2.5
DRIFT_SLOPE_THRESHOLD = 5.0 # ms/day above which progressive drift is reported
DRIFT_MIN_DAYS = 7 # minimum number of daily data points required


def get_insights(db) -> list[dict]:
Expand All @@ -13,6 +15,7 @@ def get_insights(db) -> list[dict]:
_detect_dead_endpoints,
_detect_release_regressions,
_detect_untracked_routes,
_detect_drift,
):
try:
insights.extend(fn(db))
Expand Down Expand Up @@ -160,6 +163,67 @@ def _detect_release_regressions(db) -> list[dict]:
return insights


def _detect_drift(db) -> list[dict]:
rows = db.get_drift_data()
if not rows:
return []

# Group daily P90 samples by endpoint
by_endpoint: dict[str, dict] = {}
for row in rows:
key = f"{row['method']}|{row['route']}"
if key not in by_endpoint:
by_endpoint[key] = {"method": row["method"], "route": row["route"], "points": []}
by_endpoint[key]["points"].append({"x": row["day_bucket"], "y": row["p90"]})

insights = []
for ep in by_endpoint.values():
points = ep["points"]
if len(points) < DRIFT_MIN_DAYS:
continue

# Ordinary least squares on (day_index, p90) pairs
x0 = points[0]["x"]
xs = [p["x"] - x0 for p in points]
ys = [p["y"] for p in points]
n = len(xs)
sum_x = sum(xs)
sum_y = sum(ys)
sum_xy = sum(xs[i] * ys[i] for i in range(n))
sum_x2 = sum(x * x for x in xs)
denom = n * sum_x2 - sum_x ** 2
if denom == 0:
continue

slope = (n * sum_xy - sum_x * sum_y) / denom
if slope < DRIFT_SLOPE_THRESHOLD:
continue

observed_days = xs[-1]
projection_30 = round(slope * 30)
method, route = ep["method"], ep["route"]
day_str = "day" if observed_days == 1 else "days"

insights.append({
"type": "DRIFT",
"severity": "warning",
"route": route,
"method": method,
"message": (
f"`{method} {route}` has been progressively degrading for "
f"{observed_days} {day_str}: +{slope:.1f}ms/day. "
f"30-day projection: +{projection_30}ms."
),
"data": {
"slope_ms_per_day": slope,
"observed_days": observed_days,
"projection_30d_ms": projection_30,
},
})

return insights


def _detect_untracked_routes(db) -> list[dict]:
return [
{
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "apiforgepy"
version = "1.0.2"
version = "1.0.3"

description = "API observability & intelligence for FastAPI/Starlette — local-first, privacy-first"
readme = "README.md"
Expand Down
121 changes: 121 additions & 0 deletions tests/test_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import pytest
from apiforgepy.aggregator import Aggregator


def make_transport():
class Spy:
def __init__(self):
self.calls = []
def write(self, rows):
self.calls.append(rows)
return Spy()


def base_event(**overrides):
defaults = dict(
route="/test", method="GET", status=200,
duration_ms=100.0, timestamp="2026-01-01T00:00:00Z",
env="test", release=None, service="svc", response_size=None,
)
return {**defaults, **overrides}


class TestRecord:
def test_accumulates_durations(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(duration_ms=10))
agg.record(base_event(duration_ms=20))
key = next(iter(agg._buffer))
assert len(agg._buffer[key]["durations"]) == 2

def test_increments_2xx_counter(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(status=200))
agg.record(base_event(status=204))
key = next(iter(agg._buffer))
assert agg._buffer[key]["status_2xx"] == 2

def test_increments_4xx_counter(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(status=404))
agg.record(base_event(status=429))
key = next(iter(agg._buffer))
assert agg._buffer[key]["status_4xx"] == 2
assert agg._buffer[key]["status_2xx"] == 0

def test_increments_5xx_counter(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(status=500))
key = next(iter(agg._buffer))
assert agg._buffer[key]["status_5xx"] == 1

def test_separate_buckets_per_route(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(route="/a"))
agg.record(base_event(route="/b"))
assert len(agg._buffer) == 2

def test_release_separates_bucket_key(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(release="v1"))
agg.record(base_event(release="v2"))
assert len(agg._buffer) == 2


class TestFlush:
def test_sends_rows_and_clears_buffer(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event())
agg._flush()
assert len(t.calls) == 1
assert len(t.calls[0]) == 1
assert len(agg._buffer) == 0

def test_noop_when_buffer_empty(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg._flush()
assert len(t.calls) == 0

def test_computes_percentiles(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
for i in range(1, 11):
agg.record(base_event(duration_ms=float(i * 10)))
agg._flush()
row = t.calls[0][0]
assert 50 <= row["lat_p50"] <= 60
assert 90 <= row["lat_p90"] <= 100
assert row["lat_p99"] >= 90

def test_correct_lat_min_max(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(duration_ms=5.0))
agg.record(base_event(duration_ms=95.0))
agg._flush()
row = t.calls[0][0]
assert row["lat_min"] == pytest.approx(5.0)
assert row["lat_max"] == pytest.approx(95.0)

def test_total_calls_matches_records(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
for _ in range(7):
agg.record(base_event())
agg._flush()
assert t.calls[0][0]["total_calls"] == 7

def test_stop_flushes_buffer(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event())
agg.stop()
assert len(t.calls) == 1, "stop() must flush remaining events"
Loading
Loading