Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 46 additions & 24 deletions src/forge_loop/control/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,33 @@
ReapWorktree = Callable[[int], None]

# Compensation kinds this engine knows how to run during a recovery sweep. This
# keyset is the load-bearing exhaustiveness guard: an exhaustiveness contract
# test asserts it equals ``set(CompensationKind)``, so adding a new enum member
# without registering it here turns that test red (see ``test_recovery``).
#
# A stale saga carrying any kind NOT in this set (e.g. the close-pr /
# delete-branch kinds #272 adds) cannot be honestly driven to COMPENSATED here —
# doing so would assert a side effect was undone when no handler ran. Instead
# such a saga is driven to the terminal QUARANTINED state ("parked for a human,
# could not auto-compensate"): that ends the saga (liveness — it drains from the
# in-flight view and is never swept again) without lying about the side effect
# (integrity). See ``reconcile_stale_sagas``.
# keyset, together with ``_DEFERRED_COMPENSATION_KINDS``, is the load-bearing
# exhaustiveness guard: a contract test asserts their union equals
# ``set(CompensationKind)``, so adding a new enum member without classifying it
# turns that test red (see ``test_recovery``).
_HANDLED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset(
{CompensationKind.REMOVE_WORKTREE}
)

# Kinds dispatch already ENQUEUES but whose recovery handler is a deferred epic
# issue. A stale saga carrying ONLY handled and/or deferred kinds is still
# auto-recovered: the handled compensations run and the saga is driven
# COMPENSATED, while the deferred kinds are knowingly skipped (their side effect
# is left exactly as before this kind existed — never falsely claimed undone).
# This is what keeps dead-worker worktrees auto-reaped instead of parked for a
# human while a deferred handler is still in flight. ``DELETE_BRANCH`` (#433,
# epic "Compensate the branch a failed worker abandons") is enqueued at
# dispatch; its reclamation handler lands later in the same epic and will move
# here → handled, at which point recovery will also delete the branch.
#
# A kind that is in NEITHER set is a genuine gap: recovery refuses to
# half-compensate such a saga and drives it to the terminal QUARANTINED state
# ("parked for a human, could not auto-compensate") — liveness without lying
# about the side effect. See ``reconcile_stale_sagas``.
_DEFERRED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset(
{CompensationKind.DELETE_BRANCH}
)


@dataclass(frozen=True)
class RecoveredSaga:
Expand Down Expand Up @@ -88,25 +100,30 @@ def reconcile_stale_sagas(
recovery must make as much progress as it can, not abort on the first
snag.

A saga carrying a compensation kind this engine has no handler for is NOT
marked COMPENSATED: driving it COMPENSATED would assert a side effect was
undone when it never ran (a wrong-but-green integrity hole). Instead the
saga is driven to the terminal QUARANTINED state ("parked for a human") so
it stops being immortal — it drains from the in-flight view and is never
re-swept — without claiming the side effect was undone. No handled
compensation is run for such a saga (we refuse to half-compensate it), and
an entry naming its id + the unhandled kind(s) is appended to
``RecoveryReport.errors`` so an operator knows a saga was parked and why.
A saga carrying a compensation kind this engine neither handles nor has
explicitly deferred is NOT marked COMPENSATED: driving it COMPENSATED would
assert a side effect was undone when it never ran (a wrong-but-green
integrity hole). Instead the saga is driven to the terminal QUARANTINED
state ("parked for a human") so it stops being immortal — it drains from the
in-flight view and is never re-swept — without claiming the side effect was
undone. No handled compensation is run for such a saga (we refuse to
half-compensate it), and an entry naming its id + the unhandled kind(s) is
appended to ``RecoveryReport.errors`` so an operator knows a saga was parked.

A *deferred* kind (enqueued at dispatch but whose handler has not landed
yet, e.g. ``DELETE_BRANCH``) does NOT taint the saga: the handled
compensations still run and the saga is driven COMPENSATED, while the
deferred kind is skipped. That keeps dead-worker worktrees auto-reaped
instead of parked for a human while a deferred handler is in flight.
"""
moment = now or datetime.now(UTC)
recovered: list[RecoveredSaga] = []
errors: list[str] = []
classified = _HANDLED_COMPENSATION_KINDS | _DEFERRED_COMPENSATION_KINDS

for saga in saga_store.list_stale(now=moment):
try:
unhandled = sorted(
{c.kind for c in saga.compensations if c.kind not in _HANDLED_COMPENSATION_KINDS}
)
unhandled = sorted({c.kind for c in saga.compensations if c.kind not in classified})
if unhandled:
kinds = ", ".join(unhandled)
saga_store.mark_quarantined(
Expand All @@ -123,7 +140,12 @@ def reconcile_stale_sagas(
continue
reaped: list[str] = []
for compensation in saga.compensations:
# Every kind here is handled (unhandled kinds short-circuit above).
# Run only the handled compensations; deferred kinds (no handler
# yet) are skipped, never falsely claimed undone. Filtering by
# kind also stops a non-worktree entry from double-firing the
# reaper or recording its target as a reaped worktree.
if compensation.kind != CompensationKind.REMOVE_WORKTREE:
continue
if reap_worktree is not None and saga.issue is not None:
reap_worktree(saga.issue)
reaped.append(compensation.target)
Expand Down
45 changes: 31 additions & 14 deletions src/forge_loop/runner/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,7 @@ def record_worker_task_policy(
issue=issue,
branch=branch,
worktree=worktree_path,
compensations=(
Compensation(
kind=CompensationKind.REMOVE_WORKTREE,
target=worktree_path,
reason="cleanup worker worktree after task terminal state",
),
),
compensations=_worker_compensations(worktree_path=worktree_path, branch=branch),
capability_policy=capability_policy,
)
)
Expand Down Expand Up @@ -315,17 +309,40 @@ def _seed_worker_saga(
issue=n,
branch=branch,
worktree=worktree_path,
compensations=(
Compensation(
kind=CompensationKind.REMOVE_WORKTREE,
target=worktree_path,
reason="cleanup after worker task terminal state",
),
),
compensations=_worker_compensations(worktree_path=worktree_path, branch=branch),
capability_policy=capability_policy,
)


def _worker_compensations(*, worktree_path: str, branch: str) -> tuple[Compensation, ...]:
"""Build the compensation tuple registered when a worker saga is created.

Two cleanups are registered at dispatch, in the order they must run on
recovery: drop the worktree first, then delete the branch it planted.

* ``REMOVE_WORKTREE`` reaps the ``/tmp`` worktree the worker ran in.
* ``DELETE_BRANCH`` (#433, epic "Compensate the branch a failed worker
abandons") reclaims the exact ``loop/<n>`` branch this worker planted —
``target`` is the canonical branch name derived for the issue, so a
failed worker can never leak a branch the control plane doesn't know to
delete. Both seed paths (the shared-store ``create`` and the
``task_store is None`` fallback) build the tuple here so they can never
drift apart.
"""
return (
Compensation(
kind=CompensationKind.REMOVE_WORKTREE,
target=worktree_path,
reason="cleanup worker worktree after task terminal state",
),
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target=branch,
reason="reclaim worker loop branch after task terminal state",
),
)


def _worker_task_id(issue_number: int) -> str:
return f"task-{issue_number}-worker"

Expand Down
9 changes: 9 additions & 0 deletions src/forge_loop/tasks/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ class CompensationKind(StrEnum):
"""

REMOVE_WORKTREE = "remove-worktree"
# #433 (epic "Compensate the branch a failed worker abandons"): the dispatch
# path plants a ``loop/<n>`` branch for every worker and registers this
# compensation at saga-creation time so a failed worker can never leak a
# branch the control plane doesn't know to delete. The recovery *handler*
# for this kind is a sibling epic issue; until it lands, recovery classifies
# DELETE_BRANCH as deferred — it still reaps the worktree and drives the saga
# COMPENSATED, skipping (never falsely claiming) the branch deletion (see
# ``control/recovery.py``).
DELETE_BRANCH = "delete-branch"


@dataclass(frozen=True)
Expand Down
150 changes: 150 additions & 0 deletions tests/test_dispatch_branch_compensation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Dispatch enqueues a DELETE_BRANCH compensation for the branch it plants (#433).

Epic: "Compensate the branch a failed worker abandons". At dispatch, when a
saga is created for a worker that plants a ``loop/<n>`` branch, the saga must
carry a ``DELETE_BRANCH`` compensation whose target is the *exact* branch name
planted for that worker — alongside the pre-existing ``REMOVE_WORKTREE`` one —
so a failed worker can never leak a branch the control plane doesn't know to
delete.

Primary falsifiable acceptance (``test_dispatch_one_worker_*``): a saga created
by the dispatch path for issue #n carries a DELETE_BRANCH compensation whose
target equals the exact branch name planted for that worker.
"""

from __future__ import annotations

from typing import Any

from forge_loop.runner import dispatch as dispatch_mod
from forge_loop.sandbox import CapabilityPolicy
from forge_loop.tasks import Compensation, CompensationKind, SqliteTaskSagaStore
from forge_loop.worker import WorkerOutcome

# Reuse the Config/issue/meta scaffolding from the dispatch FSM tests.
from tests.test_persistent_dispatch import _issue, _make_cfg, _meta


def _saga_store(cfg: Any) -> SqliteTaskSagaStore:
return SqliteTaskSagaStore(dispatch_mod.canonical_task_saga_path(cfg.repo))


def _delete_branch(comps: tuple[Compensation, ...]) -> list[Compensation]:
return [c for c in comps if c.kind == CompensationKind.DELETE_BRANCH]


def _remove_worktree(comps: tuple[Compensation, ...]) -> list[Compensation]:
return [c for c in comps if c.kind == CompensationKind.REMOVE_WORKTREE]


# --------------------------------------------------------------------------- #
# Unit: the shared compensation builder #
# --------------------------------------------------------------------------- #


def test_worker_compensations_carry_both_kinds_with_correct_targets() -> None:
comps = dispatch_mod._worker_compensations(
worktree_path="/tmp/wt-loop-7", branch="loop/7-do-the-thing"
)

# Exactly one of each kind — no duplicate, no drift.
assert len(_remove_worktree(comps)) == 1
assert len(_delete_branch(comps)) == 1

rm = _remove_worktree(comps)[0]
db = _delete_branch(comps)[0]
assert rm.target == "/tmp/wt-loop-7"
# The DELETE_BRANCH target is the BRANCH, never the worktree path — the
# exact falsifiable bug this feature guards against.
assert db.target == "loop/7-do-the-thing"
assert db.target != rm.target
# Worktree is reaped before the branch it planted is deleted.
assert comps.index(rm) < comps.index(db)


def test_worker_compensations_handles_empty_strings() -> None:
"""Adversarial: empty branch/worktree must not crash or merge the entries."""
comps = dispatch_mod._worker_compensations(worktree_path="", branch="")

assert len(comps) == 2
assert _delete_branch(comps)[0].target == ""
assert _remove_worktree(comps)[0].target == ""


# --------------------------------------------------------------------------- #
# Unit: both seed paths register the compensation #
# --------------------------------------------------------------------------- #


def test_seed_worker_saga_enqueues_delete_branch(tmp_path: Any) -> None:
store = SqliteTaskSagaStore(":memory:")
dispatch_mod._seed_worker_saga(
store,
repo=tmp_path / "repo",
issue=_issue(7),
branch="loop/7-do-the-thing",
worktree_path="/tmp/wt-loop-7",
capability_policy=CapabilityPolicy(),
)

saga = store.get("task-7-worker")
assert saga is not None
db = _delete_branch(saga.compensations)
assert len(db) == 1
assert db[0].target == "loop/7-do-the-thing"
assert len(_remove_worktree(saga.compensations)) == 1


def test_record_worker_task_policy_fallback_enqueues_delete_branch(tmp_path: Any) -> None:
"""The ``task_store is None`` fallback path must also register DELETE_BRANCH."""
repo = tmp_path / "repo"
repo.mkdir(parents=True, exist_ok=True)
saga = dispatch_mod.record_worker_task_policy(
repo=repo,
task_id="task-7-worker",
saga_id="saga-7-worker",
issue=7,
branch="loop/7-do-the-thing",
worktree_path="/tmp/wt-loop-7",
capability_policy=CapabilityPolicy(),
)

db = _delete_branch(saga.compensations)
assert len(db) == 1
assert db[0].target == "loop/7-do-the-thing"


# --------------------------------------------------------------------------- #
# Integration: the real dispatch path (primary acceptance) #
# --------------------------------------------------------------------------- #


def test_dispatch_one_worker_saga_carries_delete_branch_for_exact_branch(
monkeypatch: Any, tmp_path: Any
) -> None:
cfg = _make_cfg(tmp_path)
issue = _issue(7)
expected_branch = dispatch_mod._branch_for_issue(issue)
# Falsifiable anchor: the slug really is the planted branch name.
assert expected_branch == "loop/7-do-the-thing"

def ok(*args: Any, **kwargs: Any) -> WorkerOutcome:
return WorkerOutcome(
issue=7,
title="t",
pr_url="https://x/pull/1",
status="open",
duration_s=1.0,
stdout_tail="",
)

monkeypatch.setattr(dispatch_mod, "run_worker", ok)
dispatch_mod._dispatch_one_worker(
cfg, issue, _meta(), tick=1, bus_emit=lambda *a, **k: None, store=None
)

saga = _saga_store(cfg).get("task-7-worker")
assert saga is not None
db = _delete_branch(saga.compensations)
assert len(db) == 1
assert db[0].target == expected_branch
Loading
Loading