Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions src/forge_loop/control/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,30 @@
# Reap a worker's worktree, keyed by issue number (best-effort, idempotent).
ReapWorktree = Callable[[int], None]

# Reap a worker's abandoned branch, keyed by branch name (best-effort,
# idempotent). The branch name is the ``DELETE_BRANCH`` compensation target.
ReapBranch = Callable[[str], None]

# Compensation kinds this engine knows how to run during a recovery sweep. This
# keyset, together with ``_DEFERRED_COMPENSATION_KINDS``, is the load-bearing
# exhaustiveness guard: a contract test asserts their union equals
# ``set(CompensationKind)``, so adding a new enum member without classifying it
# turns that test red (see ``test_recovery``).
_HANDLED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset(
{CompensationKind.REMOVE_WORKTREE}
{CompensationKind.REMOVE_WORKTREE, CompensationKind.DELETE_BRANCH}
)

# Kinds dispatch already ENQUEUES but whose recovery handler is a deferred epic
# issue. A stale saga carrying ONLY handled and/or deferred kinds is still
# Kinds dispatch already enqueues but whose recovery handler is a deferred epic
# issue. A stale saga carrying only handled and/or deferred kinds is still
# auto-recovered: the handled compensations run and the saga is driven
# COMPENSATED, while the deferred kinds are knowingly skipped (their side effect
# is left exactly as before this kind existed — never falsely claimed undone).
# This is what keeps dead-worker worktrees auto-reaped instead of parked for a
# human while a deferred handler is still in flight. ``DELETE_BRANCH`` (#433,
# epic "Compensate the branch a failed worker abandons") is enqueued at
# dispatch; its reclamation handler lands later in the same epic and will move
# here → handled, at which point recovery will also delete the branch.
# COMPENSATED, while the deferred kinds are knowingly skipped. No kinds are
# deferred today; keep the set so the classification invariant remains explicit.
#
# A kind that is in NEITHER set is a genuine gap: recovery refuses to
# half-compensate such a saga and drives it to the terminal QUARANTINED state
# ("parked for a human, could not auto-compensate") — liveness without lying
# about the side effect. See ``reconcile_stale_sagas``.
_DEFERRED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset(
{CompensationKind.DELETE_BRANCH}
)
_DEFERRED_COMPENSATION_KINDS: frozenset[CompensationKind] = frozenset()


@dataclass(frozen=True)
Expand Down Expand Up @@ -91,14 +88,15 @@ def reconcile_stale_sagas(
*,
now: datetime | None = None,
reap_worktree: ReapWorktree | None = None,
reap_branch: ReapBranch | None = None,
) -> RecoveryReport:
"""Compensate and close every stale (expired-lease) saga.

For each stale saga: run its ``remove-worktree`` compensations via
``reap_worktree`` (idempotent, best-effort), then mark it COMPENSATED.
A failure on one saga is captured and the sweep continues to the next —
recovery must make as much progress as it can, not abort on the first
snag.
For each stale saga: discharge its compensations by kind. A
``remove-worktree`` compensation runs ``reap_worktree(issue)`` and a
``delete-branch`` compensation runs ``reap_branch(target)``. A failure on
one saga is captured and the sweep continues to the next — recovery must
make as much progress as it can, not abort on the first snag.

A saga carrying a compensation kind this engine neither handles nor has
explicitly deferred is NOT marked COMPENSATED: driving it COMPENSATED would
Expand Down Expand Up @@ -134,21 +132,22 @@ def reconcile_stale_sagas(
),
)
errors.append(
f"{saga.saga_id}: quarantined, "
f"unhandled compensation kind(s): {kinds}"
f"{saga.saga_id}: quarantined, unhandled compensation kind(s): {kinds}"
)
continue
reaped: list[str] = []
reaped_worktrees: list[str] = []
for compensation in saga.compensations:
# Run only the handled compensations; deferred kinds (no handler
# yet) are skipped, never falsely claimed undone. Filtering by
# kind also stops a non-worktree entry from double-firing the
# reaper or recording its target as a reaped worktree.
if compensation.kind != CompensationKind.REMOVE_WORKTREE:
continue
if reap_worktree is not None and saga.issue is not None:
reap_worktree(saga.issue)
reaped.append(compensation.target)
if compensation.kind == CompensationKind.REMOVE_WORKTREE:
if reap_worktree is not None and saga.issue is not None:
reap_worktree(saga.issue)
reaped_worktrees.append(compensation.target)
elif compensation.kind == CompensationKind.DELETE_BRANCH:
if reap_branch is not None:
reap_branch(compensation.target)
saga_store.mark_compensated(
saga.task_id, reason="recovered: dead-worker lease expired at boot"
)
Expand All @@ -157,7 +156,7 @@ def reconcile_stale_sagas(
task_id=saga.task_id,
saga_id=saga.saga_id,
issue=saga.issue,
worktrees_reaped=tuple(reaped),
worktrees_reaped=tuple(reaped_worktrees),
)
)
except Exception as exc: # noqa: BLE001 - one bad saga must not abort the sweep
Expand Down
11 changes: 3 additions & 8 deletions src/forge_loop/tasks/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,9 @@ class CompensationKind(StrEnum):
"""

REMOVE_WORKTREE = "remove-worktree"
# #433 (epic "Compensate the branch a failed worker abandons"): the dispatch
# path plants a ``loop/<n>`` branch for every worker and registers this
# compensation at saga-creation time so a failed worker can never leak a
# branch the control plane doesn't know to delete. The recovery *handler*
# for this kind is a sibling epic issue; until it lands, recovery classifies
# DELETE_BRANCH as deferred — it still reaps the worktree and drives the saga
# COMPENSATED, skipping (never falsely claiming) the branch deletion (see
# ``control/recovery.py``).
# Reclaim the orphan ``loop/<n>`` branch a dead worker left behind (#432).
# Recovery routes this kind to ``reap_branch`` and keeps branch targets out
# of the worktree-reaped summary.
DELETE_BRANCH = "delete-branch"


Expand Down
81 changes: 78 additions & 3 deletions tests/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ def test_reconcile_mixed_sweep_one_compensated_one_quarantined(tmp_path: Path) -
_stale_with_compensations(
store,
issue=8,
compensations=(
Compensation(kind="close-pr", target="pr/8", reason="close pr"),
),
compensations=(Compensation(kind="close-pr", target="pr/8", reason="close pr"),),
)
reaped: list[int] = []

Expand Down Expand Up @@ -336,6 +334,83 @@ def test_reconcile_quarantines_raw_string_kind_not_in_enum(tmp_path: Path) -> No
assert any("some-future-kind-2027" in e for e in report.errors)


def test_reconcile_delete_branch_reaps_branch_and_compensates(tmp_path: Path) -> None:
"""#432: a DELETE_BRANCH compensation deletes the abandoned branch."""
store = _store(tmp_path)
_stale_with_compensations(
store,
issue=42,
compensations=(
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target="loop/42",
reason="delete orphaned branch left by dead worker",
),
),
)
branches: list[str] = []

report = reconcile_stale_sagas(store, reap_worktree=lambda _: None, reap_branch=branches.append)

assert branches == ["loop/42"]
assert report.recovered_count == 1
assert store.get("task-42-worker").state == TaskState.COMPENSATED
assert store.list_in_flight() == ()


def test_reconcile_routes_worktree_and_branch_to_their_own_reaper(tmp_path: Path) -> None:
"""#432: each compensation kind routes to its own callback."""
store = _store(tmp_path)
_stale_with_compensations(
store,
issue=42,
compensations=(
Compensation(
kind=CompensationKind.REMOVE_WORKTREE,
target="/tmp/wt-loop-42",
reason="cleanup worktree",
),
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target="loop/42",
reason="delete orphaned branch",
),
),
)
worktrees: list[int] = []
branches: list[str] = []

report = reconcile_stale_sagas(
store, reap_worktree=worktrees.append, reap_branch=branches.append
)

assert worktrees == [42]
assert branches == ["loop/42"]
assert report.recovered_count == 1
assert store.get("task-42-worker").state == TaskState.COMPENSATED


def test_reconcile_delete_branch_without_reaper_still_compensates(tmp_path: Path) -> None:
"""#432: reap_branch is optional, matching reap_worktree."""
store = _store(tmp_path)
_stale_with_compensations(
store,
issue=42,
compensations=(
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target="loop/42",
reason="delete orphaned branch",
),
),
)

report = reconcile_stale_sagas(store, reap_worktree=lambda _: None)

assert report.recovered_count == 1
assert store.get("task-42-worker").state == TaskState.COMPENSATED


def test_recovery_handler_keyset_is_exhaustive_over_compensation_kinds() -> None:
"""#360 exhaustiveness contract: every CompensationKind is classified.

Expand Down
Loading