From cff0901015ba29638dae9d43d8536fdd91109be0 Mon Sep 17 00:00:00 2001 From: kmajdoub Date: Mon, 8 Jun 2026 15:47:11 +0200 Subject: [PATCH] feat(doctor): surface prunable-memory backlog in memory_integrity probe (#429) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `memory_integrity` doctor probe reported the memory store's decisions/rejected/episodes/skills breakdown and returned PASS whenever the store opened cleanly — with no visibility into the *prunable* backlog: the active, non-load-bearing episodic items that accumulate one-per-issue (epic #426) until they drown the load-bearing decisions in boot context. A loop that has run 300 issues showed `episodes=600` and a green light, giving a maestro resuming after context loss no signal that compaction was overdue. This enriches the existing probe (no new probe / CHECK_NAMES / CLI command): - `PRUNABLE_MEMORY_WARN_THRESHOLD` and `COMPACT_REMEDIATION` are declared once alongside the existing named `*_REMEDIATION` / verdict constants (no bare literal at the call site). - The detail line additionally reports `prunable=` (active items where `is_load_bearing_memory(item)` is False), computed over the already-fetched `active` list (no extra DB round-trip). Existing fields are unchanged. - Prunable strictly > threshold -> WARN naming the count + compaction, with `COMPACT_REMEDIATION`; <= threshold -> unchanged PASS / remediation None. - The absent (WARN), corrupt (FAIL) and unreadable (FAIL) branches are untouched; the prunable logic runs only on the clean-open path. Read-only. Dependency on sibling #427: `is_load_bearing_memory` is NOT yet merged. Per the ticket's "do not re-implement the classification" constraint (also manifesto Q7), the probe consumes the predicate via injection and resolves it dynamically in production (`_resolve_load_bearing_predicate`). Until #427 lands the symbol is absent, so the probe degrades — reporting `prunable=unmeasured` and staying PASS — exactly as `mutation_survivors_check` degrades without its #379 checker (declared-degrade per Q11). When #427 merges the WARN gate lights up with no further change here. Tests inject a fake predicate to exercise the full PASS/WARN/boundary matrix now, and monkeypatch the module symbol to drive the real CLI path end-to-end. Tests (tests/test_doctor_control_plane.py): below-threshold PASS, above- threshold WARN, strict `>` boundary (at-threshold PASS, +1 WARN), existing- fields-preserved, unavailable-predicate degrade, corrupt-store FAIL not masked, plus `doctor --json` WARN integration and human-table rendering of the prunable count + remediation. Co-Authored-By: Claude Opus 4.7 --- src/forge_loop/control/doctor.py | 71 +++++++++- tests/test_doctor_control_plane.py | 201 +++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+), 3 deletions(-) diff --git a/src/forge_loop/control/doctor.py b/src/forge_loop/control/doctor.py index f335372..8e47db4 100644 --- a/src/forge_loop/control/doctor.py +++ b/src/forge_loop/control/doctor.py @@ -56,6 +56,22 @@ "forge-loop mutation-check # plant faults on the high-risk module and " "strengthen the tests that let survivors through" ) +# Advisory remediation for an over-threshold prunable-memory backlog (epic #426): +# the non-load-bearing episodic items that accumulate one-per-issue and drown the +# load-bearing decisions in boot context. The compaction mechanism itself is #428 +# (``compact_episodic``); this probe only *reports* the backlog, so the string is +# advisory text rather than a copy-pasteable command that does not exist yet. +COMPACT_REMEDIATION = ( + "compact prunable episodic memory # drop the non-load-bearing episodes so " + "the load-bearing decisions survive boot context (see #428 compact_episodic)" +) + +# When the prunable-memory backlog (active items where ``is_load_bearing_memory`` +# is ``False``) strictly exceeds this count, ``memory_integrity`` flips to WARN so +# compaction can be gated on a visible signal. Named once (no bare literal at the +# call site) per the same convention as PASS/FAIL/WARN and the ``*_REMEDIATION`` +# constants. +PRUNABLE_MEMORY_WARN_THRESHOLD = 50 # The single high-risk control-plane module the scoped mutation-check targets by # default (epic #378): the event-log hash-chain integrity module. Kept as a @@ -99,7 +115,9 @@ def collect_control_plane_doctor( return { "projection_lag": _projection_lag_check(status), "stale_leases": _stale_leases_check(status), - "memory_integrity": _memory_integrity_check(status, memory_path), + "memory_integrity": _memory_integrity_check( + status, memory_path, is_load_bearing=_resolve_load_bearing_predicate() + ), "replay_determinism": _replay_determinism_check(status, event_log_path), } @@ -159,7 +177,33 @@ def _stale_leases_check(status: dict[str, Any]) -> dict[str, Any]: ) -def _memory_integrity_check(status: dict[str, Any], memory_path: Path) -> dict[str, Any]: +def _resolve_load_bearing_predicate() -> Callable[[Any], bool] | None: + """Resolve the ``is_load_bearing_memory`` predicate added by sibling #427. + + Issue #429 *consumes* the pure predicate from #427 (same epic #426) and must + NOT re-implement the classification (out of scope; manifesto Q7). Until #427 + merges the symbol is absent, so this looks it up dynamically and returns + ``None`` when unavailable. The probe then degrades — it reports the prunable + backlog as unmeasured and stays ``PASS`` rather than crashing ``doctor`` — + exactly as ``mutation_survivors_check`` degrades to ``warn`` without its #379 + checker (declared-degrade per Q11). When #427 lands the symbol resolves and + the WARN gate lights up with no further change here. + """ + + import forge_loop.memory as memory_module + + predicate = getattr(memory_module, "is_load_bearing_memory", None) + if not callable(predicate): + return None + return cast("Callable[[Any], bool]", predicate) + + +def _memory_integrity_check( + status: dict[str, Any], + memory_path: Path, + *, + is_load_bearing: Callable[[Any], bool] | None = None, +) -> dict[str, Any]: memory = status["memory"] if not memory["available"]: # Distinguish "absent" (fresh repo → warn, not applicable) from @@ -195,10 +239,31 @@ def _memory_integrity_check(status: dict[str, Any], memory_path: Path) -> dict[s f"inspect or restore {memory_path}", ) - detail = ( + breakdown = ( f"decisions/active={len(active)}, rejected_paths={len(rejected)}, " f"episodes={len(episodic)}, skills/procedural={len(procedural)}" ) + + # The prunable backlog: active items the #427 predicate classifies as NOT + # load-bearing (the episodic items that accumulate one-per-issue until they + # drown the load-bearing decisions in boot context — epic #426). Computed + # over the already-fetched ``active`` list, so no extra DB round-trip. + if is_load_bearing is None: + # #427 predicate not yet available — surface the field as unmeasured and + # stay PASS (declared degrade); we never raise a WARN we cannot back. + return _check(PASS, f"{breakdown}, prunable=unmeasured", None) + + prunable = sum(1 for item in active if not is_load_bearing(item)) + detail = f"{breakdown}, prunable={prunable}" + if prunable > PRUNABLE_MEMORY_WARN_THRESHOLD: + return _check( + WARN, + ( + f"{detail}; prunable backlog {prunable} exceeds threshold " + f"{PRUNABLE_MEMORY_WARN_THRESHOLD} — compaction overdue" + ), + COMPACT_REMEDIATION, + ) return _check(PASS, detail, None) diff --git a/tests/test_doctor_control_plane.py b/tests/test_doctor_control_plane.py index de5eb58..8886a72 100644 --- a/tests/test_doctor_control_plane.py +++ b/tests/test_doctor_control_plane.py @@ -24,10 +24,13 @@ from forge_loop import cli from forge_loop._testing.mutation_checker import FakeMutationChecker from forge_loop.control.doctor import ( + COMPACT_REMEDIATION, DEFAULT_MUTATION_MODULE, FAIL, PASS, + PRUNABLE_MEMORY_WARN_THRESHOLD, WARN, + _memory_integrity_check, _replay_determinism_check, collect_control_plane_doctor, mutation_survivors_check, @@ -137,6 +140,65 @@ def _seed_memory(forge_dir: Path) -> None: ) +def _episodic_is_prunable(item: MemoryItem) -> bool: + """Stand-in for #427's ``is_load_bearing_memory``: episodes are NOT load-bearing. + + #427 is not merged when this test was authored (issue #429 consumes that + predicate, it does not define it), so the suite injects this fake to exercise + the classification branch. The fake encodes the epic-#426 intent: episodic + items are the prunable backlog; decisions/rejected-paths/skills are load-bearing. + """ + return item.kind is not MemoryKind.EPISODIC + + +def _seed_memory_backlog(forge_dir: Path, *, prunable: int, load_bearing: int = 2) -> Path: + """Seed a store with load-bearing decisions + ``prunable`` episodic items. + + Returns the ``memory.db`` path. A rejected-path + ``load_bearing`` semantic + decisions are load-bearing under :func:`_episodic_is_prunable`; the + ``prunable`` episodes are the backlog the probe must surface. + """ + forge_dir.mkdir(parents=True, exist_ok=True) + path = forge_dir / "memory.db" + store = SqliteMemoryStore(path) + # The store commits (and fsyncs) once per ``put``; on slow CI disks seeding + # an above-threshold backlog row-by-row costs ~1s/row. These are throwaway + # test dbs, so drop the durability fsync to keep seeding fast. + store._connection.execute("PRAGMA synchronous=OFF") + prov = MemoryProvenance(source_event=None, authored_by="test", source_task_ref="task:#429") + store.put( + MemoryItem( + memory_id="rej", + kind=MemoryKind.SEMANTIC, + title="rejected path", + body="b", + tags=(REJECTED_PATH_TAG,), + provenance=prov, + ) + ) + for i in range(load_bearing): + store.put( + MemoryItem( + memory_id=f"dec-{i}", + kind=MemoryKind.SEMANTIC, + title=f"decision {i}", + body="b", + provenance=prov, + ) + ) + for i in range(prunable): + store.put( + MemoryItem( + memory_id=f"ep-{i}", + kind=MemoryKind.EPISODIC, + title=f"episode {i}", + body="b", + provenance=prov, + ) + ) + return path + + def _seed_sessions( state_dir: Path, *, @@ -274,6 +336,91 @@ def test_reports_counts_on_populated_store(self, tmp_path: Path) -> None: assert "rejected_paths=1" in mem["detail"] assert "episodes=1" in mem["detail"] + def test_prunable_backlog_reported_below_threshold_passes(self, tmp_path: Path) -> None: + # Sub-threshold backlog: the new prunable field appears in the detail but + # the probe stays PASS with no remediation. + memory_path = _seed_memory_backlog(tmp_path / ".forge", prunable=3) + status = {"memory": {"available": True}} + result = _memory_integrity_check( + status, memory_path, is_load_bearing=_episodic_is_prunable + ) + assert result["status"] == PASS + assert result["remediation"] is None + assert "prunable=3" in result["detail"] + + def test_prunable_backlog_above_threshold_warns(self, tmp_path: Path) -> None: + # Above-threshold backlog → WARN naming the count + compaction, remediation + # is the compaction constant. + count = PRUNABLE_MEMORY_WARN_THRESHOLD + 5 + memory_path = _seed_memory_backlog(tmp_path / ".forge", prunable=count) + status = {"memory": {"available": True}} + result = _memory_integrity_check( + status, memory_path, is_load_bearing=_episodic_is_prunable + ) + assert result["status"] == WARN + assert f"prunable={count}" in result["detail"] + assert "compact" in result["detail"].lower() + assert result["remediation"] == COMPACT_REMEDIATION + assert result["remediation"] is not None + + def test_prunable_threshold_boundary_uses_strict_greater_than(self, tmp_path: Path) -> None: + # Exactly-at-threshold stays PASS (proves ``>`` not ``>=``); threshold+1 WARNs. + status = {"memory": {"available": True}} + + at = _seed_memory_backlog( + tmp_path / "at" / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + ) + at_result = _memory_integrity_check(status, at, is_load_bearing=_episodic_is_prunable) + assert at_result["status"] == PASS + assert at_result["remediation"] is None + + over = _seed_memory_backlog( + tmp_path / "over" / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1 + ) + over_result = _memory_integrity_check(status, over, is_load_bearing=_episodic_is_prunable) + assert over_result["status"] == WARN + + def test_warn_detail_preserves_existing_breakdown_fields(self, tmp_path: Path) -> None: + # The new field is additive: the WARN detail still carries every existing + # field unrenamed and unreordered. + memory_path = _seed_memory_backlog( + tmp_path / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 2 + ) + status = {"memory": {"available": True}} + detail = _memory_integrity_check( + status, memory_path, is_load_bearing=_episodic_is_prunable + )["detail"] + assert "decisions/active=" in detail + assert "rejected_paths=" in detail + assert "episodes=" in detail + assert "skills/procedural=" in detail + + def test_unavailable_predicate_degrades_to_pass(self, tmp_path: Path) -> None: + # #427 predicate not wired (the real path until it merges): report the + # backlog as unmeasured and stay PASS — never a WARN we cannot back. + memory_path = _seed_memory_backlog( + tmp_path / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 10 + ) + status = {"memory": {"available": True}} + result = _memory_integrity_check(status, memory_path, is_load_bearing=None) + assert result["status"] == PASS + assert result["remediation"] is None + assert "prunable=unmeasured" in result["detail"] + + def test_prunable_logic_does_not_mask_corrupt_store_fail(self, tmp_path: Path) -> None: + # Even with a predicate wired, a present-but-unopenable store must FAIL — + # the new prunable logic runs only on the clean-open path. + forge_dir = tmp_path / ".forge" + forge_dir.mkdir(parents=True, exist_ok=True) + memory_path = forge_dir / "memory.db" + memory_path.write_bytes(b"this is not a sqlite database at all") + status = {"memory": {"available": False, "error": "file is not a database"}} + result = _memory_integrity_check( + status, memory_path, is_load_bearing=_episodic_is_prunable + ) + assert result["status"] == FAIL + assert result["remediation"] is not None + def test_fails_cleanly_on_corrupt_db(self, tmp_path: Path) -> None: # Seed the event log + sessions but make memory.db a present-but-corrupt # (non-SQLite) file. Crucially we do NOT seed a real memory store first @@ -508,6 +655,32 @@ def test_hard_reset_simulation_exits_one_with_failing_checks( assert blob["ok"] is False assert rc == 1 + def test_json_memory_integrity_warns_on_above_threshold_prunable_backlog( + self, monkeypatch: Any, tmp_path: Path, capsys: Any + ) -> None: + # End-to-end over the real CLI path: with the #427 predicate resolvable + # (injected here as it is not yet merged), an above-threshold prunable + # backlog drives ``memory_integrity`` to WARN with a non-null remediation. + # A WARN does not flip the control-plane exit, so rc stays 0. + live_lease = datetime.now(UTC) + timedelta(hours=1) + repo = _seeded_repo(tmp_path, cursor_sequence=2, lease_expires_at=live_lease) + _seed_memory_backlog(repo / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1) + cfg = _cfg(repo) + + import forge_loop.memory as memory_module + + monkeypatch.setattr( + memory_module, "is_load_bearing_memory", _episodic_is_prunable, raising=False + ) + + rc, blob = _run_doctor_json(monkeypatch, cfg, capsys) + + mem = blob["control_plane"]["memory_integrity"] + assert mem["status"] == WARN + assert mem["remediation"] is not None + assert "prunable=" in mem["detail"] + assert rc == 0 + def test_missing_forge_dir_runs_other_checks_and_warns_control_plane( self, monkeypatch: Any, tmp_path: Path, capsys: Any ) -> None: @@ -572,6 +745,34 @@ def test_human_table_has_control_plane_section_with_remediation( assert "forge-loop recover" in out assert rc == 1 + def test_human_table_renders_prunable_count_and_compaction_remediation( + self, monkeypatch: Any, tmp_path: Path, capsys: Any + ) -> None: + # The enriched ``memory_integrity`` detail (prunable count) and its + # compaction remediation must render in the human table when the backlog + # is over threshold and the #427 predicate is resolvable. + live_lease = datetime.now(UTC) + timedelta(hours=1) + repo = _seeded_repo(tmp_path, cursor_sequence=2, lease_expires_at=live_lease) + _seed_memory_backlog(repo / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1) + cfg = _cfg(repo) + + import forge_loop.memory as memory_module + + monkeypatch.setattr( + memory_module, "is_load_bearing_memory", _episodic_is_prunable, raising=False + ) + monkeypatch.setattr(cli, "load", lambda: cfg) + monkeypatch.setenv("COLUMNS", "240") + + rc = cli._cmd_doctor(SimpleNamespace(json=False)) + out = capsys.readouterr().out + + assert "prunable=" in out + assert "compact prunable episodic memory" in out + # memory_integrity WARN does not flip the exit; the seeded repo is + # otherwise healthy → rc 0. + assert rc == 0 + @pytest.mark.parametrize("want_json", [True, False]) def test_doctor_never_crashes_when_config_load_fails(