Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions src/forge_loop/control/doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@
"forge-loop mutation-check # plant faults on the high-risk module and "
"strengthen the tests that let survivors through"
)
# Advisory remediation for an over-threshold prunable-memory backlog (epic #426):
# the non-load-bearing episodic items that accumulate one-per-issue and drown the
# load-bearing decisions in boot context. The compaction mechanism itself is #428
# (``compact_episodic``); this probe only *reports* the backlog, so the string is
# advisory text rather than a copy-pasteable command that does not exist yet.
COMPACT_REMEDIATION = (
"compact prunable episodic memory # drop the non-load-bearing episodes so "
"the load-bearing decisions survive boot context (see #428 compact_episodic)"
)

# When the prunable-memory backlog (active items where ``is_load_bearing_memory``
# is ``False``) strictly exceeds this count, ``memory_integrity`` flips to WARN so
# compaction can be gated on a visible signal. Named once (no bare literal at the
# call site) per the same convention as PASS/FAIL/WARN and the ``*_REMEDIATION``
# constants.
PRUNABLE_MEMORY_WARN_THRESHOLD = 50

# The single high-risk control-plane module the scoped mutation-check targets by
# default (epic #378): the event-log hash-chain integrity module. Kept as a
Expand Down Expand Up @@ -99,7 +115,9 @@ def collect_control_plane_doctor(
return {
"projection_lag": _projection_lag_check(status),
"stale_leases": _stale_leases_check(status),
"memory_integrity": _memory_integrity_check(status, memory_path),
"memory_integrity": _memory_integrity_check(
status, memory_path, is_load_bearing=_resolve_load_bearing_predicate()
),
"replay_determinism": _replay_determinism_check(status, event_log_path),
}

Expand Down Expand Up @@ -159,7 +177,33 @@ def _stale_leases_check(status: dict[str, Any]) -> dict[str, Any]:
)


def _memory_integrity_check(status: dict[str, Any], memory_path: Path) -> dict[str, Any]:
def _resolve_load_bearing_predicate() -> Callable[[Any], bool] | None:
"""Resolve the ``is_load_bearing_memory`` predicate added by sibling #427.

Issue #429 *consumes* the pure predicate from #427 (same epic #426) and must
NOT re-implement the classification (out of scope; manifesto Q7). Until #427
merges the symbol is absent, so this looks it up dynamically and returns
``None`` when unavailable. The probe then degrades — it reports the prunable
backlog as unmeasured and stays ``PASS`` rather than crashing ``doctor`` —
exactly as ``mutation_survivors_check`` degrades to ``warn`` without its #379
checker (declared-degrade per Q11). When #427 lands the symbol resolves and
the WARN gate lights up with no further change here.
"""

import forge_loop.memory as memory_module

predicate = getattr(memory_module, "is_load_bearing_memory", None)
if not callable(predicate):
return None
return cast("Callable[[Any], bool]", predicate)


def _memory_integrity_check(
status: dict[str, Any],
memory_path: Path,
*,
is_load_bearing: Callable[[Any], bool] | None = None,
) -> dict[str, Any]:
memory = status["memory"]
if not memory["available"]:
# Distinguish "absent" (fresh repo → warn, not applicable) from
Expand Down Expand Up @@ -195,10 +239,31 @@ def _memory_integrity_check(status: dict[str, Any], memory_path: Path) -> dict[s
f"inspect or restore {memory_path}",
)

detail = (
breakdown = (
f"decisions/active={len(active)}, rejected_paths={len(rejected)}, "
f"episodes={len(episodic)}, skills/procedural={len(procedural)}"
)

# The prunable backlog: active items the #427 predicate classifies as NOT
# load-bearing (the episodic items that accumulate one-per-issue until they
# drown the load-bearing decisions in boot context — epic #426). Computed
# over the already-fetched ``active`` list, so no extra DB round-trip.
if is_load_bearing is None:
# #427 predicate not yet available — surface the field as unmeasured and
# stay PASS (declared degrade); we never raise a WARN we cannot back.
return _check(PASS, f"{breakdown}, prunable=unmeasured", None)

prunable = sum(1 for item in active if not is_load_bearing(item))
detail = f"{breakdown}, prunable={prunable}"
if prunable > PRUNABLE_MEMORY_WARN_THRESHOLD:
return _check(
WARN,
(
f"{detail}; prunable backlog {prunable} exceeds threshold "
f"{PRUNABLE_MEMORY_WARN_THRESHOLD} — compaction overdue"
),
COMPACT_REMEDIATION,
)
return _check(PASS, detail, None)


Expand Down
201 changes: 201 additions & 0 deletions tests/test_doctor_control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
from forge_loop import cli
from forge_loop._testing.mutation_checker import FakeMutationChecker
from forge_loop.control.doctor import (
COMPACT_REMEDIATION,
DEFAULT_MUTATION_MODULE,
FAIL,
PASS,
PRUNABLE_MEMORY_WARN_THRESHOLD,
WARN,
_memory_integrity_check,
_replay_determinism_check,
collect_control_plane_doctor,
mutation_survivors_check,
Expand Down Expand Up @@ -137,6 +140,65 @@ def _seed_memory(forge_dir: Path) -> None:
)


def _episodic_is_prunable(item: MemoryItem) -> bool:
"""Stand-in for #427's ``is_load_bearing_memory``: episodes are NOT load-bearing.

#427 is not merged when this test was authored (issue #429 consumes that
predicate, it does not define it), so the suite injects this fake to exercise
the classification branch. The fake encodes the epic-#426 intent: episodic
items are the prunable backlog; decisions/rejected-paths/skills are load-bearing.
"""
return item.kind is not MemoryKind.EPISODIC


def _seed_memory_backlog(forge_dir: Path, *, prunable: int, load_bearing: int = 2) -> Path:
"""Seed a store with load-bearing decisions + ``prunable`` episodic items.

Returns the ``memory.db`` path. A rejected-path + ``load_bearing`` semantic
decisions are load-bearing under :func:`_episodic_is_prunable`; the
``prunable`` episodes are the backlog the probe must surface.
"""
forge_dir.mkdir(parents=True, exist_ok=True)
path = forge_dir / "memory.db"
store = SqliteMemoryStore(path)
# The store commits (and fsyncs) once per ``put``; on slow CI disks seeding
# an above-threshold backlog row-by-row costs ~1s/row. These are throwaway
# test dbs, so drop the durability fsync to keep seeding fast.
store._connection.execute("PRAGMA synchronous=OFF")
prov = MemoryProvenance(source_event=None, authored_by="test", source_task_ref="task:#429")
store.put(
MemoryItem(
memory_id="rej",
kind=MemoryKind.SEMANTIC,
title="rejected path",
body="b",
tags=(REJECTED_PATH_TAG,),
provenance=prov,
)
)
for i in range(load_bearing):
store.put(
MemoryItem(
memory_id=f"dec-{i}",
kind=MemoryKind.SEMANTIC,
title=f"decision {i}",
body="b",
provenance=prov,
)
)
for i in range(prunable):
store.put(
MemoryItem(
memory_id=f"ep-{i}",
kind=MemoryKind.EPISODIC,
title=f"episode {i}",
body="b",
provenance=prov,
)
)
return path


def _seed_sessions(
state_dir: Path,
*,
Expand Down Expand Up @@ -274,6 +336,91 @@ def test_reports_counts_on_populated_store(self, tmp_path: Path) -> None:
assert "rejected_paths=1" in mem["detail"]
assert "episodes=1" in mem["detail"]

def test_prunable_backlog_reported_below_threshold_passes(self, tmp_path: Path) -> None:
# Sub-threshold backlog: the new prunable field appears in the detail but
# the probe stays PASS with no remediation.
memory_path = _seed_memory_backlog(tmp_path / ".forge", prunable=3)
status = {"memory": {"available": True}}
result = _memory_integrity_check(
status, memory_path, is_load_bearing=_episodic_is_prunable
)
assert result["status"] == PASS
assert result["remediation"] is None
assert "prunable=3" in result["detail"]

def test_prunable_backlog_above_threshold_warns(self, tmp_path: Path) -> None:
# Above-threshold backlog → WARN naming the count + compaction, remediation
# is the compaction constant.
count = PRUNABLE_MEMORY_WARN_THRESHOLD + 5
memory_path = _seed_memory_backlog(tmp_path / ".forge", prunable=count)
status = {"memory": {"available": True}}
result = _memory_integrity_check(
status, memory_path, is_load_bearing=_episodic_is_prunable
)
assert result["status"] == WARN
assert f"prunable={count}" in result["detail"]
assert "compact" in result["detail"].lower()
assert result["remediation"] == COMPACT_REMEDIATION
assert result["remediation"] is not None

def test_prunable_threshold_boundary_uses_strict_greater_than(self, tmp_path: Path) -> None:
# Exactly-at-threshold stays PASS (proves ``>`` not ``>=``); threshold+1 WARNs.
status = {"memory": {"available": True}}

at = _seed_memory_backlog(
tmp_path / "at" / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD
)
at_result = _memory_integrity_check(status, at, is_load_bearing=_episodic_is_prunable)
assert at_result["status"] == PASS
assert at_result["remediation"] is None

over = _seed_memory_backlog(
tmp_path / "over" / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1
)
over_result = _memory_integrity_check(status, over, is_load_bearing=_episodic_is_prunable)
assert over_result["status"] == WARN

def test_warn_detail_preserves_existing_breakdown_fields(self, tmp_path: Path) -> None:
# The new field is additive: the WARN detail still carries every existing
# field unrenamed and unreordered.
memory_path = _seed_memory_backlog(
tmp_path / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 2
)
status = {"memory": {"available": True}}
detail = _memory_integrity_check(
status, memory_path, is_load_bearing=_episodic_is_prunable
)["detail"]
assert "decisions/active=" in detail
assert "rejected_paths=" in detail
assert "episodes=" in detail
assert "skills/procedural=" in detail

def test_unavailable_predicate_degrades_to_pass(self, tmp_path: Path) -> None:
# #427 predicate not wired (the real path until it merges): report the
# backlog as unmeasured and stay PASS — never a WARN we cannot back.
memory_path = _seed_memory_backlog(
tmp_path / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 10
)
status = {"memory": {"available": True}}
result = _memory_integrity_check(status, memory_path, is_load_bearing=None)
assert result["status"] == PASS
assert result["remediation"] is None
assert "prunable=unmeasured" in result["detail"]

def test_prunable_logic_does_not_mask_corrupt_store_fail(self, tmp_path: Path) -> None:
# Even with a predicate wired, a present-but-unopenable store must FAIL —
# the new prunable logic runs only on the clean-open path.
forge_dir = tmp_path / ".forge"
forge_dir.mkdir(parents=True, exist_ok=True)
memory_path = forge_dir / "memory.db"
memory_path.write_bytes(b"this is not a sqlite database at all")
status = {"memory": {"available": False, "error": "file is not a database"}}
result = _memory_integrity_check(
status, memory_path, is_load_bearing=_episodic_is_prunable
)
assert result["status"] == FAIL
assert result["remediation"] is not None

def test_fails_cleanly_on_corrupt_db(self, tmp_path: Path) -> None:
# Seed the event log + sessions but make memory.db a present-but-corrupt
# (non-SQLite) file. Crucially we do NOT seed a real memory store first
Expand Down Expand Up @@ -508,6 +655,32 @@ def test_hard_reset_simulation_exits_one_with_failing_checks(
assert blob["ok"] is False
assert rc == 1

def test_json_memory_integrity_warns_on_above_threshold_prunable_backlog(
self, monkeypatch: Any, tmp_path: Path, capsys: Any
) -> None:
# End-to-end over the real CLI path: with the #427 predicate resolvable
# (injected here as it is not yet merged), an above-threshold prunable
# backlog drives ``memory_integrity`` to WARN with a non-null remediation.
# A WARN does not flip the control-plane exit, so rc stays 0.
live_lease = datetime.now(UTC) + timedelta(hours=1)
repo = _seeded_repo(tmp_path, cursor_sequence=2, lease_expires_at=live_lease)
_seed_memory_backlog(repo / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1)
cfg = _cfg(repo)

import forge_loop.memory as memory_module

monkeypatch.setattr(
memory_module, "is_load_bearing_memory", _episodic_is_prunable, raising=False
)

rc, blob = _run_doctor_json(monkeypatch, cfg, capsys)

mem = blob["control_plane"]["memory_integrity"]
assert mem["status"] == WARN
assert mem["remediation"] is not None
assert "prunable=" in mem["detail"]
assert rc == 0

def test_missing_forge_dir_runs_other_checks_and_warns_control_plane(
self, monkeypatch: Any, tmp_path: Path, capsys: Any
) -> None:
Expand Down Expand Up @@ -572,6 +745,34 @@ def test_human_table_has_control_plane_section_with_remediation(
assert "forge-loop recover" in out
assert rc == 1

def test_human_table_renders_prunable_count_and_compaction_remediation(
self, monkeypatch: Any, tmp_path: Path, capsys: Any
) -> None:
# The enriched ``memory_integrity`` detail (prunable count) and its
# compaction remediation must render in the human table when the backlog
# is over threshold and the #427 predicate is resolvable.
live_lease = datetime.now(UTC) + timedelta(hours=1)
repo = _seeded_repo(tmp_path, cursor_sequence=2, lease_expires_at=live_lease)
_seed_memory_backlog(repo / ".forge", prunable=PRUNABLE_MEMORY_WARN_THRESHOLD + 1)
cfg = _cfg(repo)

import forge_loop.memory as memory_module

monkeypatch.setattr(
memory_module, "is_load_bearing_memory", _episodic_is_prunable, raising=False
)
monkeypatch.setattr(cli, "load", lambda: cfg)
monkeypatch.setenv("COLUMNS", "240")

rc = cli._cmd_doctor(SimpleNamespace(json=False))
out = capsys.readouterr().out

assert "prunable=" in out
assert "compact prunable episodic memory" in out
# memory_integrity WARN does not flip the exit; the seeded repo is
# otherwise healthy → rc 0.
assert rc == 0


@pytest.mark.parametrize("want_json", [True, False])
def test_doctor_never_crashes_when_config_load_fails(
Expand Down
Loading