Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ Format: [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) — versioning

---

## [Unreleased]

### Added

- `bytes_avg` field: average response body size (bytes) per route per bucket, sourced from the `Content-Length` response header — stored in SQLite and exposed via `/api/routes`
- 4 unit tests covering `bytes_avg` aggregation and storage

---

## [1.0.3] — 2026-05-15

### Added
Expand Down
46 changes: 26 additions & 20 deletions apiforgepy/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ def record(self, event: dict):
with self._lock:
if key not in self._buffer:
self._buffer[key] = {
"method": event["method"],
"route": event["route"],
"env": event["env"],
"release": event.get("release"),
"durations": [],
"status_2xx": 0,
"status_4xx": 0,
"status_5xx": 0,
"method": event["method"],
"route": event["route"],
"env": event["env"],
"release": event.get("release"),
"durations": [],
"response_sizes": [],
"status_2xx": 0,
"status_4xx": 0,
"status_5xx": 0,
}
bucket = self._buffer[key]
bucket["durations"].append(event["duration_ms"])
if event.get("response_size") is not None:
bucket["response_sizes"].append(event["response_size"])
s = event["status"]
if 200 <= s < 300:
bucket["status_2xx"] += 1
Expand Down Expand Up @@ -73,21 +76,24 @@ def _flush(self):
for bucket in snapshot.values():
sorted_d = sorted(bucket["durations"])
n = len(sorted_d)
sizes = bucket["response_sizes"]
bytes_avg = sum(sizes) / len(sizes) if sizes else None
rows.append({
"bucket_ts": bucket_ts,
"route": bucket["route"],
"method": bucket["method"],
"env": bucket["env"],
"bucket_ts": bucket_ts,
"route": bucket["route"],
"method": bucket["method"],
"env": bucket["env"],
"release_tag": bucket["release"],
"status_2xx": bucket["status_2xx"],
"status_4xx": bucket["status_4xx"],
"status_5xx": bucket["status_5xx"],
"status_2xx": bucket["status_2xx"],
"status_4xx": bucket["status_4xx"],
"status_5xx": bucket["status_5xx"],
"total_calls": n,
"lat_p50": _percentile(sorted_d, 0.50),
"lat_p90": _percentile(sorted_d, 0.90),
"lat_p99": _percentile(sorted_d, 0.99),
"lat_min": sorted_d[0] if sorted_d else 0,
"lat_max": sorted_d[-1] if sorted_d else 0,
"lat_p50": _percentile(sorted_d, 0.50),
"lat_p90": _percentile(sorted_d, 0.90),
"lat_p99": _percentile(sorted_d, 0.99),
"lat_min": sorted_d[0] if sorted_d else 0,
"lat_max": sorted_d[-1] if sorted_d else 0,
"bytes_avg": bytes_avg,
})

self._transport.write(rows)
15 changes: 11 additions & 4 deletions apiforgepy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ def _init(self):
lat_p90 REAL,
lat_p99 REAL,
lat_min REAL,
lat_max REAL
lat_max REAL,
bytes_avg REAL
);

CREATE INDEX IF NOT EXISTS idx_route_ts ON api_metrics (route, method, bucket_ts);
CREATE INDEX IF NOT EXISTS idx_bucket_ts ON api_metrics (bucket_ts);
CREATE INDEX IF NOT EXISTS idx_release ON api_metrics (release_tag)
WHERE release_tag IS NOT NULL;
""")
# Migration for databases created before bytes_avg was introduced
try:
c.execute("ALTER TABLE api_metrics ADD COLUMN bytes_avg REAL")
except Exception:
pass # column already exists
c.commit()

def insert_batch(self, rows: list[dict]):
Expand All @@ -60,11 +66,11 @@ def insert_batch(self, rows: list[dict]):
INSERT INTO api_metrics
(bucket_ts, route, method, env, release_tag,
status_2xx, status_4xx, status_5xx, total_calls,
lat_p50, lat_p90, lat_p99, lat_min, lat_max)
lat_p50, lat_p90, lat_p99, lat_min, lat_max, bytes_avg)
VALUES (
:bucket_ts, :route, :method, :env, :release_tag,
:status_2xx, :status_4xx, :status_5xx, :total_calls,
:lat_p50, :lat_p90, :lat_p99, :lat_min, :lat_max
:lat_p50, :lat_p90, :lat_p99, :lat_min, :lat_max, :bytes_avg
)
""", rows)
self._conn.commit()
Expand Down Expand Up @@ -138,7 +144,8 @@ def get_routes(self, hours: int = 24) -> list[dict]:
AVG(lat_p50) as p50,
AVG(lat_p90) as p90,
AVG(lat_p99) as p99,
MAX(lat_max) as lat_max
MAX(lat_max) as lat_max,
AVG(bytes_avg) as bytes_avg
FROM api_metrics
WHERE bucket_ts >= ?
GROUP BY route, method
Expand Down
15 changes: 15 additions & 0 deletions tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,18 @@ def test_stop_flushes_buffer(self):
agg.record(base_event())
agg.stop()
assert len(t.calls) == 1, "stop() must flush remaining events"

def test_bytes_avg_is_mean_of_non_null_sizes(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(response_size=100))
agg.record(base_event(response_size=300))
agg._flush()
assert t.calls[0][0]["bytes_avg"] == pytest.approx(200.0)

def test_bytes_avg_is_none_when_no_sizes(self):
t = make_transport()
agg = Aggregator(t, flush_interval_ms=999_999_000)
agg.record(base_event(response_size=None))
agg._flush()
assert t.calls[0][0]["bytes_avg"] is None
17 changes: 17 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def insert_row(db, **overrides):
lat_p99=99.0,
lat_min=10.0,
lat_max=150.0,
bytes_avg=None,
)
db.insert_batch([{**defaults, **overrides}])

Expand Down Expand Up @@ -174,3 +175,19 @@ def test_returns_bucketed_data(self):
rows = db.get_global_time_series(24)
assert len(rows) >= 1
db.close()


class TestBytesAvg:
def test_stored_and_returned_in_get_routes(self):
db = make_db()
insert_row(db, route="/size", bytes_avg=512.0)
routes = db.get_routes(24)
assert routes[0]["bytes_avg"] == pytest.approx(512.0)
db.close()

def test_null_bytes_avg_when_not_provided(self):
db = make_db()
insert_row(db, route="/nosize", bytes_avg=None)
routes = db.get_routes(24)
assert routes[0]["bytes_avg"] is None
db.close()
Loading