diff --git a/src/forge_loop/control/recovery.py b/src/forge_loop/control/recovery.py index e71d8d2..2b3f451 100644 --- a/src/forge_loop/control/recovery.py +++ b/src/forge_loop/control/recovery.py @@ -22,21 +22,33 @@ ReapWorktree = Callable[[int], None] # Compensation kinds this engine knows how to run during a recovery sweep. This -# keyset is the load-bearing exhaustiveness guard: an exhaustiveness contract -# test asserts it equals ``set(CompensationKind)``, so adding a new enum member -# without registering it here turns that test red (see ``test_recovery``). -# -# A stale saga carrying any kind NOT in this set (e.g. the close-pr / -# delete-branch kinds #272 adds) cannot be honestly driven to COMPENSATED here — -# doing so would assert a side effect was undone when no handler ran. Instead -# such a saga is driven to the terminal QUARANTINED state ("parked for a human, -# could not auto-compensate"): that ends the saga (liveness — it drains from the -# in-flight view and is never swept again) without lying about the side effect -# (integrity). See ``reconcile_stale_sagas``. +# keyset, together with ``_DEFERRED_COMPENSATION_KINDS``, is the load-bearing +# exhaustiveness guard: a contract test asserts their union equals +# ``set(CompensationKind)``, so adding a new enum member without classifying it +# turns that test red (see ``test_recovery``). _HANDLED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset( {CompensationKind.REMOVE_WORKTREE} ) +# Kinds dispatch already ENQUEUES but whose recovery handler is a deferred epic +# issue. A stale saga carrying ONLY handled and/or deferred kinds is still +# auto-recovered: the handled compensations run and the saga is driven +# COMPENSATED, while the deferred kinds are knowingly skipped (their side effect +# is left exactly as before this kind existed — never falsely claimed undone). +# This is what keeps dead-worker worktrees auto-reaped instead of parked for a +# human while a deferred handler is still in flight. ``DELETE_BRANCH`` (#433, +# epic "Compensate the branch a failed worker abandons") is enqueued at +# dispatch; its reclamation handler lands later in the same epic and will move +# here → handled, at which point recovery will also delete the branch. +# +# A kind that is in NEITHER set is a genuine gap: recovery refuses to +# half-compensate such a saga and drives it to the terminal QUARANTINED state +# ("parked for a human, could not auto-compensate") — liveness without lying +# about the side effect. See ``reconcile_stale_sagas``. +_DEFERRED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset( + {CompensationKind.DELETE_BRANCH} +) + @dataclass(frozen=True) class RecoveredSaga: @@ -88,25 +100,30 @@ def reconcile_stale_sagas( recovery must make as much progress as it can, not abort on the first snag. - A saga carrying a compensation kind this engine has no handler for is NOT - marked COMPENSATED: driving it COMPENSATED would assert a side effect was - undone when it never ran (a wrong-but-green integrity hole). Instead the - saga is driven to the terminal QUARANTINED state ("parked for a human") so - it stops being immortal — it drains from the in-flight view and is never - re-swept — without claiming the side effect was undone. No handled - compensation is run for such a saga (we refuse to half-compensate it), and - an entry naming its id + the unhandled kind(s) is appended to - ``RecoveryReport.errors`` so an operator knows a saga was parked and why. + A saga carrying a compensation kind this engine neither handles nor has + explicitly deferred is NOT marked COMPENSATED: driving it COMPENSATED would + assert a side effect was undone when it never ran (a wrong-but-green + integrity hole). Instead the saga is driven to the terminal QUARANTINED + state ("parked for a human") so it stops being immortal — it drains from the + in-flight view and is never re-swept — without claiming the side effect was + undone. No handled compensation is run for such a saga (we refuse to + half-compensate it), and an entry naming its id + the unhandled kind(s) is + appended to ``RecoveryReport.errors`` so an operator knows a saga was parked. + + A *deferred* kind (enqueued at dispatch but whose handler has not landed + yet, e.g. ``DELETE_BRANCH``) does NOT taint the saga: the handled + compensations still run and the saga is driven COMPENSATED, while the + deferred kind is skipped. That keeps dead-worker worktrees auto-reaped + instead of parked for a human while a deferred handler is in flight. """ moment = now or datetime.now(UTC) recovered: list[RecoveredSaga] = [] errors: list[str] = [] + classified = _HANDLED_COMPENSATION_KINDS | _DEFERRED_COMPENSATION_KINDS for saga in saga_store.list_stale(now=moment): try: - unhandled = sorted( - {c.kind for c in saga.compensations if c.kind not in _HANDLED_COMPENSATION_KINDS} - ) + unhandled = sorted({c.kind for c in saga.compensations if c.kind not in classified}) if unhandled: kinds = ", ".join(unhandled) saga_store.mark_quarantined( @@ -123,7 +140,12 @@ def reconcile_stale_sagas( continue reaped: list[str] = [] for compensation in saga.compensations: - # Every kind here is handled (unhandled kinds short-circuit above). + # Run only the handled compensations; deferred kinds (no handler + # yet) are skipped, never falsely claimed undone. Filtering by + # kind also stops a non-worktree entry from double-firing the + # reaper or recording its target as a reaped worktree. + if compensation.kind != CompensationKind.REMOVE_WORKTREE: + continue if reap_worktree is not None and saga.issue is not None: reap_worktree(saga.issue) reaped.append(compensation.target) diff --git a/src/forge_loop/runner/dispatch.py b/src/forge_loop/runner/dispatch.py index 8f50c7d..0e083f3 100644 --- a/src/forge_loop/runner/dispatch.py +++ b/src/forge_loop/runner/dispatch.py @@ -272,13 +272,7 @@ def record_worker_task_policy( issue=issue, branch=branch, worktree=worktree_path, - compensations=( - Compensation( - kind=CompensationKind.REMOVE_WORKTREE, - target=worktree_path, - reason="cleanup worker worktree after task terminal state", - ), - ), + compensations=_worker_compensations(worktree_path=worktree_path, branch=branch), capability_policy=capability_policy, ) ) @@ -315,17 +309,40 @@ def _seed_worker_saga( issue=n, branch=branch, worktree=worktree_path, - compensations=( - Compensation( - kind=CompensationKind.REMOVE_WORKTREE, - target=worktree_path, - reason="cleanup after worker task terminal state", - ), - ), + compensations=_worker_compensations(worktree_path=worktree_path, branch=branch), capability_policy=capability_policy, ) +def _worker_compensations(*, worktree_path: str, branch: str) -> tuple[Compensation, ...]: + """Build the compensation tuple registered when a worker saga is created. + + Two cleanups are registered at dispatch, in the order they must run on + recovery: drop the worktree first, then delete the branch it planted. + + * ``REMOVE_WORKTREE`` reaps the ``/tmp`` worktree the worker ran in. + * ``DELETE_BRANCH`` (#433, epic "Compensate the branch a failed worker + abandons") reclaims the exact ``loop/`` branch this worker planted — + ``target`` is the canonical branch name derived for the issue, so a + failed worker can never leak a branch the control plane doesn't know to + delete. Both seed paths (the shared-store ``create`` and the + ``task_store is None`` fallback) build the tuple here so they can never + drift apart. + """ + return ( + Compensation( + kind=CompensationKind.REMOVE_WORKTREE, + target=worktree_path, + reason="cleanup worker worktree after task terminal state", + ), + Compensation( + kind=CompensationKind.DELETE_BRANCH, + target=branch, + reason="reclaim worker loop branch after task terminal state", + ), + ) + + def _worker_task_id(issue_number: int) -> str: return f"task-{issue_number}-worker" diff --git a/src/forge_loop/tasks/saga.py b/src/forge_loop/tasks/saga.py index c4dfcc0..2985ad6 100644 --- a/src/forge_loop/tasks/saga.py +++ b/src/forge_loop/tasks/saga.py @@ -34,6 +34,15 @@ class CompensationKind(StrEnum): """ REMOVE_WORKTREE = "remove-worktree" + # #433 (epic "Compensate the branch a failed worker abandons"): the dispatch + # path plants a ``loop/`` branch for every worker and registers this + # compensation at saga-creation time so a failed worker can never leak a + # branch the control plane doesn't know to delete. The recovery *handler* + # for this kind is a sibling epic issue; until it lands, recovery classifies + # DELETE_BRANCH as deferred — it still reaps the worktree and drives the saga + # COMPENSATED, skipping (never falsely claiming) the branch deletion (see + # ``control/recovery.py``). + DELETE_BRANCH = "delete-branch" @dataclass(frozen=True) diff --git a/tests/test_dispatch_branch_compensation.py b/tests/test_dispatch_branch_compensation.py new file mode 100644 index 0000000..f1c2a8a --- /dev/null +++ b/tests/test_dispatch_branch_compensation.py @@ -0,0 +1,150 @@ +"""Dispatch enqueues a DELETE_BRANCH compensation for the branch it plants (#433). + +Epic: "Compensate the branch a failed worker abandons". At dispatch, when a +saga is created for a worker that plants a ``loop/`` branch, the saga must +carry a ``DELETE_BRANCH`` compensation whose target is the *exact* branch name +planted for that worker — alongside the pre-existing ``REMOVE_WORKTREE`` one — +so a failed worker can never leak a branch the control plane doesn't know to +delete. + +Primary falsifiable acceptance (``test_dispatch_one_worker_*``): a saga created +by the dispatch path for issue #n carries a DELETE_BRANCH compensation whose +target equals the exact branch name planted for that worker. +""" + +from __future__ import annotations + +from typing import Any + +from forge_loop.runner import dispatch as dispatch_mod +from forge_loop.sandbox import CapabilityPolicy +from forge_loop.tasks import Compensation, CompensationKind, SqliteTaskSagaStore +from forge_loop.worker import WorkerOutcome + +# Reuse the Config/issue/meta scaffolding from the dispatch FSM tests. +from tests.test_persistent_dispatch import _issue, _make_cfg, _meta + + +def _saga_store(cfg: Any) -> SqliteTaskSagaStore: + return SqliteTaskSagaStore(dispatch_mod.canonical_task_saga_path(cfg.repo)) + + +def _delete_branch(comps: tuple[Compensation, ...]) -> list[Compensation]: + return [c for c in comps if c.kind == CompensationKind.DELETE_BRANCH] + + +def _remove_worktree(comps: tuple[Compensation, ...]) -> list[Compensation]: + return [c for c in comps if c.kind == CompensationKind.REMOVE_WORKTREE] + + +# --------------------------------------------------------------------------- # +# Unit: the shared compensation builder # +# --------------------------------------------------------------------------- # + + +def test_worker_compensations_carry_both_kinds_with_correct_targets() -> None: + comps = dispatch_mod._worker_compensations( + worktree_path="/tmp/wt-loop-7", branch="loop/7-do-the-thing" + ) + + # Exactly one of each kind — no duplicate, no drift. + assert len(_remove_worktree(comps)) == 1 + assert len(_delete_branch(comps)) == 1 + + rm = _remove_worktree(comps)[0] + db = _delete_branch(comps)[0] + assert rm.target == "/tmp/wt-loop-7" + # The DELETE_BRANCH target is the BRANCH, never the worktree path — the + # exact falsifiable bug this feature guards against. + assert db.target == "loop/7-do-the-thing" + assert db.target != rm.target + # Worktree is reaped before the branch it planted is deleted. + assert comps.index(rm) < comps.index(db) + + +def test_worker_compensations_handles_empty_strings() -> None: + """Adversarial: empty branch/worktree must not crash or merge the entries.""" + comps = dispatch_mod._worker_compensations(worktree_path="", branch="") + + assert len(comps) == 2 + assert _delete_branch(comps)[0].target == "" + assert _remove_worktree(comps)[0].target == "" + + +# --------------------------------------------------------------------------- # +# Unit: both seed paths register the compensation # +# --------------------------------------------------------------------------- # + + +def test_seed_worker_saga_enqueues_delete_branch(tmp_path: Any) -> None: + store = SqliteTaskSagaStore(":memory:") + dispatch_mod._seed_worker_saga( + store, + repo=tmp_path / "repo", + issue=_issue(7), + branch="loop/7-do-the-thing", + worktree_path="/tmp/wt-loop-7", + capability_policy=CapabilityPolicy(), + ) + + saga = store.get("task-7-worker") + assert saga is not None + db = _delete_branch(saga.compensations) + assert len(db) == 1 + assert db[0].target == "loop/7-do-the-thing" + assert len(_remove_worktree(saga.compensations)) == 1 + + +def test_record_worker_task_policy_fallback_enqueues_delete_branch(tmp_path: Any) -> None: + """The ``task_store is None`` fallback path must also register DELETE_BRANCH.""" + repo = tmp_path / "repo" + repo.mkdir(parents=True, exist_ok=True) + saga = dispatch_mod.record_worker_task_policy( + repo=repo, + task_id="task-7-worker", + saga_id="saga-7-worker", + issue=7, + branch="loop/7-do-the-thing", + worktree_path="/tmp/wt-loop-7", + capability_policy=CapabilityPolicy(), + ) + + db = _delete_branch(saga.compensations) + assert len(db) == 1 + assert db[0].target == "loop/7-do-the-thing" + + +# --------------------------------------------------------------------------- # +# Integration: the real dispatch path (primary acceptance) # +# --------------------------------------------------------------------------- # + + +def test_dispatch_one_worker_saga_carries_delete_branch_for_exact_branch( + monkeypatch: Any, tmp_path: Any +) -> None: + cfg = _make_cfg(tmp_path) + issue = _issue(7) + expected_branch = dispatch_mod._branch_for_issue(issue) + # Falsifiable anchor: the slug really is the planted branch name. + assert expected_branch == "loop/7-do-the-thing" + + def ok(*args: Any, **kwargs: Any) -> WorkerOutcome: + return WorkerOutcome( + issue=7, + title="t", + pr_url="https://x/pull/1", + status="open", + duration_s=1.0, + stdout_tail="", + ) + + monkeypatch.setattr(dispatch_mod, "run_worker", ok) + dispatch_mod._dispatch_one_worker( + cfg, issue, _meta(), tick=1, bus_emit=lambda *a, **k: None, store=None + ) + + saga = _saga_store(cfg).get("task-7-worker") + assert saga is not None + db = _delete_branch(saga.compensations) + assert len(db) == 1 + assert db[0].target == expected_branch diff --git a/tests/test_recovery.py b/tests/test_recovery.py index 21a40d0..89c2f6b 100644 --- a/tests/test_recovery.py +++ b/tests/test_recovery.py @@ -158,7 +158,7 @@ def test_reconcile_quarantines_saga_with_unhandled_compensation(tmp_path: Path) def test_reconcile_quarantines_saga_with_mixed_handled_and_unhandled_kinds( tmp_path: Path, ) -> None: - """#360: any unhandled kind taints the saga → QUARANTINED, no reap claimed.""" + """#360: any truly-unhandled kind taints the saga → QUARANTINED, no reap claimed.""" store = _store(tmp_path) _stale_with_compensations( store, @@ -170,9 +170,9 @@ def test_reconcile_quarantines_saga_with_mixed_handled_and_unhandled_kinds( reason="cleanup worktree", ), Compensation( - kind="delete-branch", # unhandled — taints the whole saga - target="loop/43", - reason="delete orphaned branch", + kind="close-pr", # neither handled nor deferred — taints the whole saga + target="https://github.com/o/r/pull/43", + reason="close orphaned PR", ), ), ) @@ -184,12 +184,55 @@ def test_reconcile_quarantines_saga_with_mixed_handled_and_unhandled_kinds( saga = store.get("task-43-worker") assert saga.state == TaskState.QUARANTINED assert saga.is_terminal is True - assert any("delete-branch" in e for e in report.errors) + assert any("close-pr" in e for e in report.errors) # We do not run the handled remove-worktree when another kind is unhandled: # the saga is quarantined whole, no side effect claimed. assert reaped == [] +def test_reconcile_reaps_worktree_for_dispatch_saga_with_deferred_branch( + tmp_path: Path, +) -> None: + """#433 regression guard: the production dispatch tuple still reaps its worktree. + + Dispatch now seeds every worker saga with BOTH REMOVE_WORKTREE and the + deferred DELETE_BRANCH. The deferred branch kind must NOT taint the saga + into quarantine: recovery still reaps the worktree (handled) and drives the + saga COMPENSATED, skipping the not-yet-handled branch deletion. The reaper + fires exactly once and only the worktree path — never the branch name — is + recorded as reaped. + """ + store = _store(tmp_path) + _stale_with_compensations( + store, + issue=50, + compensations=( + Compensation( + kind=CompensationKind.REMOVE_WORKTREE, + target="/tmp/wt-loop-50", + reason="cleanup worktree", + ), + Compensation( + kind=CompensationKind.DELETE_BRANCH, + target="loop/50", + reason="reclaim worker loop branch", + ), + ), + ) + reaped: list[int] = [] + + report = reconcile_stale_sagas(store, reap_worktree=reaped.append) + + # Worktree IS reaped despite the deferred DELETE_BRANCH — exactly once. + assert reaped == [50] + saga = store.get("task-50-worker") + assert saga.state == TaskState.COMPENSATED + assert report.recovered_count == 1 + # Only the worktree target is recorded as reaped, never the branch name. + assert report.recovered[0].worktrees_reaped == ("/tmp/wt-loop-50",) + assert store.list_in_flight() == () + + def test_reconcile_quarantine_reason_lists_every_unhandled_kind(tmp_path: Path) -> None: """#360: multiple unhandled kinds → the terminal reason names them all.""" store = _store(tmp_path) @@ -198,7 +241,7 @@ def test_reconcile_quarantine_reason_lists_every_unhandled_kind(tmp_path: Path) issue=44, compensations=( Compensation(kind="close-pr", target="pr/44", reason="close pr"), - Compensation(kind="delete-branch", target="loop/44", reason="del branch"), + Compensation(kind="revert-commit", target="abc123", reason="revert commit"), ), ) reaped: list[int] = [] @@ -209,7 +252,7 @@ def test_reconcile_quarantine_reason_lists_every_unhandled_kind(tmp_path: Path) assert saga.state == TaskState.QUARANTINED reason = saga.terminal_reason or "" assert "close-pr" in reason - assert "delete-branch" in reason + assert "revert-commit" in reason assert reaped == [] # no reap_worktree side effect claimed assert len(report.errors) == 1 @@ -294,15 +337,21 @@ def test_reconcile_quarantines_raw_string_kind_not_in_enum(tmp_path: Path) -> No def test_recovery_handler_keyset_is_exhaustive_over_compensation_kinds() -> None: - """#360 exhaustiveness contract: every CompensationKind has a recovery entry. + """#360 exhaustiveness contract: every CompensationKind is classified. + Every member must be either HANDLED (recovery runs its undo) or explicitly + DEFERRED (enqueued but recovery quarantines it until its handler lands). Goes RED the moment a member is added to ``CompensationKind`` without - registering it in ``_HANDLED_COMPENSATION_KINDS`` — the gap is then caught - at test time, not in production recovery. + registering it in either set — the gap is caught at test time, not in + production recovery. The two sets are disjoint: a kind is never both. """ - from forge_loop.control.recovery import _HANDLED_COMPENSATION_KINDS + from forge_loop.control.recovery import ( + _DEFERRED_COMPENSATION_KINDS, + _HANDLED_COMPENSATION_KINDS, + ) - assert set(CompensationKind) == _HANDLED_COMPENSATION_KINDS + assert not (_HANDLED_COMPENSATION_KINDS & _DEFERRED_COMPENSATION_KINDS) + assert set(CompensationKind) == _HANDLED_COMPENSATION_KINDS | _DEFERRED_COMPENSATION_KINDS def test_runner_boot_recovery_reconciles_and_emits_event(tmp_path: Path) -> None: