Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/forge_loop/control/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ def reconcile_stale_sagas(
appended to ``RecoveryReport.errors`` so an operator knows a saga was parked.

A *deferred* kind (enqueued at dispatch but whose handler has not landed
yet, e.g. ``DELETE_BRANCH``) does NOT taint the saga: the handled
compensations still run and the saga is driven COMPENSATED, while the
deferred kind is skipped. That keeps dead-worker worktrees auto-reaped
instead of parked for a human while a deferred handler is in flight.
yet) does NOT taint the saga: the handled compensations still run and the
saga is driven COMPENSATED, while the deferred kind is skipped. That keeps
dead-worker worktrees auto-reaped instead of parked for a human while a
deferred handler is in flight.

A saga whose ``delete-branch`` handler raises is driven terminal
QUARANTINED instead of being left RUNNING. That preserves the integrity of
COMPENSATED: recovery never claims a branch was deleted when its handler
failed.
"""
moment = now or datetime.now(UTC)
recovered: list[RecoveredSaga] = []
Expand All @@ -136,6 +141,7 @@ def reconcile_stale_sagas(
)
continue
reaped_worktrees: list[str] = []
failed_compensation = False
for compensation in saga.compensations:
# Run only the handled compensations; deferred kinds (no handler
# yet) are skipped, never falsely claimed undone. Filtering by
Expand All @@ -147,7 +153,21 @@ def reconcile_stale_sagas(
reaped_worktrees.append(compensation.target)
elif compensation.kind == CompensationKind.DELETE_BRANCH:
if reap_branch is not None:
reap_branch(compensation.target)
try:
reap_branch(compensation.target)
except Exception as exc: # noqa: BLE001 - park this saga, continue sweep
reason = (
"recovered: parked for human, compensation "
f"{CompensationKind.DELETE_BRANCH} for "
f"{compensation.target!r} failed: "
f"{type(exc).__name__}: {exc}"
)
saga_store.mark_quarantined(saga.task_id, reason=reason)
errors.append(f"{saga.saga_id}: quarantined, {reason}")
failed_compensation = True
break
if failed_compensation:
continue
saga_store.mark_compensated(
saga.task_id, reason="recovered: dead-worker lease expired at boot"
)
Expand Down
75 changes: 75 additions & 0 deletions tests/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,81 @@ def test_reconcile_delete_branch_without_reaper_still_compensates(tmp_path: Path
assert store.get("task-42-worker").state == TaskState.COMPENSATED


def test_reconcile_quarantines_saga_when_branch_deletion_raises(tmp_path: Path) -> None:
"""#434: a failed DELETE_BRANCH handler parks the saga terminal."""
store = _store(tmp_path)
_stale_with_compensations(
store,
issue=42,
compensations=(
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target="loop/42",
reason="delete orphaned branch",
),
),
)

def fail_branch_reap(branch: str) -> None:
raise RuntimeError(f"protected branch: {branch}")

report = reconcile_stale_sagas(
store, reap_worktree=lambda _: None, reap_branch=fail_branch_reap
)

saga = store.get("task-42-worker")
assert report.recovered == ()
assert len(report.errors) == 1
assert "saga-42-worker" in report.errors[0]
assert "delete-branch" in report.errors[0]
assert "protected branch" in report.errors[0]
assert saga.state == TaskState.QUARANTINED
assert saga.is_terminal is True
assert "delete-branch" in (saga.terminal_reason or "")
assert "loop/42" in (saga.terminal_reason or "")
assert store.list_in_flight() == ()

second = reconcile_stale_sagas(
store, reap_worktree=lambda _: None, reap_branch=fail_branch_reap
)
assert second.recovered == ()
assert second.errors == ()


def test_reconcile_branch_failure_does_not_abort_sibling_saga(tmp_path: Path) -> None:
"""#434: one failed branch reap does not stop another stale saga."""
store = _store(tmp_path)
_stale_with_compensations(
store,
issue=1,
compensations=(
Compensation(
kind=CompensationKind.DELETE_BRANCH,
target="loop/1",
reason="delete orphaned branch",
),
),
)
_stale_running(store, issue=2)
reaped_worktrees: list[int] = []

def fail_branch_reap(branch: str) -> None:
raise RuntimeError(f"cannot delete {branch}")

report = reconcile_stale_sagas(
store, reap_worktree=reaped_worktrees.append, reap_branch=fail_branch_reap
)

assert [item.issue for item in report.recovered] == [2]
assert report.recovered[0].worktrees_reaped == ("/tmp/wt-loop-2",)
assert reaped_worktrees == [2]
assert store.get("task-1-worker").state == TaskState.QUARANTINED
assert store.get("task-2-worker").state == TaskState.COMPENSATED
assert store.list_in_flight() == ()
assert len(report.errors) == 1
assert "saga-1-worker" in report.errors[0]


def test_recovery_handler_keyset_is_exhaustive_over_compensation_kinds() -> None:
"""#360 exhaustiveness contract: every CompensationKind is classified.

Expand Down
Loading