Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/forge_loop/runner/merge_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
from __future__ import annotations

import contextlib
import os
import subprocess
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Protocol

from forge_loop.config import MutationGateConfig
from forge_loop.sandbox.policy import write_root_violations
from forge_loop.state import append_event
from forge_loop.worker import WorkerOutcome
from forge_loop.worker_worktree import quarantine_if_blocking


class GhMergeGateClient(Protocol):
Expand Down Expand Up @@ -264,3 +269,136 @@ def apply_mutation_survivor_gate(
o.status = "open"
refused.append(o.issue)
return refused


# ---------------------------------------------------------------------------
# Write-root-escape gate (issue #443): refuse merge when a worker's diff touched
# files OUTSIDE the filesystem sandbox it was leased. The lease scopes Write/Edit
# at the Claude *settings* layer, but ``Bash(*)`` cannot be path-scoped there, so
# a worker can ``bash``-write outside its worktree and — on an otherwise-green
# run — sail into mergeable state. This POST-HOC gate inspects the diff's paths
# against the leased ``write_roots`` and refuses + quarantines an escape, rather
# than auto-merging a sandbox break into durable cognition.
# ---------------------------------------------------------------------------


@dataclass(frozen=True)
class WorkerLeaseRef:
"""The slice of a worker's leased saga the write-root gate needs.

Resolved per outcome via an injected callable so tests can supply a fake and
production can look the saga up by issue. ``write_roots`` is the leased
:class:`~forge_loop.sandbox.policy.FilesystemScope.write_roots`; ``worktree``
is where the diff is collected and what gets quarantined on a violation.
"""

task_id: str | None
worktree: str | None
write_roots: tuple[str, ...]


def collect_changed_paths(
worktree: str,
base_branch: str,
*,
run: Callable[..., subprocess.CompletedProcess[str]] = subprocess.run,
) -> list[str]:

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[sev1/correctness] collect_changed_paths runs git diff --name-only origin/<base> with cwd=worktree and then os.path.join(worktree, name). A linked worktree's git diff only reports files under that worktree, so every collected path is always under write_roots (production lease = the worktree itself). The escape in the issue's concrete example writes to a DIFFERENT directory (/home/u/forge-loop/..., not the worktree), which this command cannot see — so in production the gate never fires on a real Bash escape. The pure helper and gate logic are correct, but the seam feeding them can't observe the threat, making the gate a no-op against its customer story. The tests pass only because they inject out-of-bounds paths the real collector would never produce. Rework the collection seam to surface writes outside the worktree.

"""Absolute paths a worktree's diff touched vs ``origin/<base_branch>``.

The production backing for the gate's injectable ``changed_paths`` seam. The
``run`` callable is injected so tests drive it with a fake — no real git. A
non-zero git returncode yields an empty list (nothing inspectable ⇒ the gate
is a pass-through, never a false refusal on a transient git failure).
"""
proc = run(
["git", "diff", "--name-only", f"origin/{base_branch}"],
cwd=worktree,
capture_output=True,
text=True,
timeout=30,
check=False,
)
if proc.returncode != 0:
return []
names = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
return [os.path.join(worktree, name) for name in names]


def _write_root_refusal_comment(violations: tuple[str, ...]) -> str:
"""PR comment naming the escaping path(s) so the refusal is actionable."""
bullets = "\n".join(f"- {p}" for p in violations)
return (
"Worker diff escaped its leased filesystem sandbox (write-root escape). "
"Loop refusing auto-merge and quarantining the worktree: a `Bash(*)` write "
"outside the leased `write_roots` must not promote itself into durable "
"cognition. Paths outside the sandbox:\n"
f"{bullets}"
)


def apply_write_root_escape_gate(
outcomes: list[WorkerOutcome],
*,
lease_for: Callable[[WorkerOutcome], WorkerLeaseRef | None],
changed_paths: Callable[[str], list[str]],
gh: GhMergeGateClient,
repo: str | None,
quarantine: Callable[[Path], Path | None] = quarantine_if_blocking,
mark_quarantined: Callable[[str, str], None] | None = None,
events_file: Any | None = None,
emit: Callable[[str, dict[str, Any]], None] | None = None,
) -> list[int]:
"""Refuse merge for outcomes whose diff escaped the leased ``write_roots``.

Returns the list of issue numbers whose merge was refused (empty when every
diff stayed in-bounds). On refusal each PR has auto-merge disabled, an
escape-naming comment posted, the worktree quarantined (reusing
:func:`forge_loop.worker_worktree.quarantine_if_blocking` and, when a
``mark_quarantined`` sink + ``task_id`` are available,
:meth:`tasks.store.mark_quarantined`), and a ``merge_refused_write_root_escape``
event emitted carrying the offending paths; a ``merged`` status is flipped to
``open`` so the attempts ledger reflects the refusal.

Pass-through when the diff is clean (empty violation tuple) — current merge
behaviour is unchanged. Outcomes with no ``pr_url``, or whose lease/worktree
cannot be resolved, are skipped (nothing inspectable). All GitHub/quarantine
side effects are best-effort (``contextlib.suppress``); the event + status
flip MUST still fire even if a side effect raises.
"""
refused: list[int] = []
for o in outcomes:
if not o.pr_url:
continue
lease = lease_for(o)
if lease is None or not lease.worktree:
continue
violations = write_root_violations(changed_paths(lease.worktree), lease.write_roots)
if not violations:
continue

body = _write_root_refusal_comment(violations)
# Best-effort side effects; the event + status flip below MUST still fire.
with contextlib.suppress(Exception):
gh.disable_pr_auto_merge(o.pr_url, repo=repo)
with contextlib.suppress(Exception):
gh.pr_comment(o.pr_url, body, repo=repo)
with contextlib.suppress(Exception):
quarantine(Path(lease.worktree))
if lease.task_id and mark_quarantined is not None:
with contextlib.suppress(Exception):
mark_quarantined(lease.task_id, "write-root escape (#443)")

payload = {
"issue": o.issue,
"pr": o.pr_url,
"paths": list(violations),
}
if events_file is not None:
append_event(events_file, "merge_refused_write_root_escape", **payload)
if emit is not None:
emit("merge_refused_write_root_escape", payload)

if o.status == "merged":
o.status = "open"
refused.append(o.issue)
return refused
63 changes: 62 additions & 1 deletion src/forge_loop/runner/tick.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,12 @@ def _run_merge_gate(
emit=bus_emit,
)

# Write-root-escape gate (#443): refuse + quarantine a PR whose diff touched
# files outside the worker's leased ``write_roots`` (a ``Bash(*)`` sandbox
# escape the settings layer cannot path-scope). Resolved against the saga's
# leased CapabilityPolicy; best-effort so saga bookkeeping never breaks merge.
escape_refused = _apply_write_root_escape_gate(cfg, outcomes, gh=_gh, bus_emit=bus_emit)

# Oracle-strength gate (#381). Dormant in production until a result source
# is wired (mutation_result left _UNSET); a provided result/None engages it.
mutation_refused: list[int] = []
Expand All @@ -1133,7 +1139,7 @@ def _run_merge_gate(
emit=bus_emit,
)

refused_all = set(refused) | set(mutation_refused)
refused_all = set(refused) | set(mutation_refused) | set(escape_refused)
_enable_automerge_for_reviewed_outcomes(
cfg,
outcomes,
Expand All @@ -1154,6 +1160,61 @@ def _run_merge_gate(
return refused_all


def _apply_write_root_escape_gate(
cfg: Config,
outcomes: list[WorkerOutcome],
*,
gh: Any,
bus_emit: Any,
) -> list[int]:
"""Production wiring for the write-root-escape gate (#443).

Resolves each outcome's leased saga (worktree + ``write_roots`` + task id)
from the tick-scoped saga store and collects the worktree's changed paths via
a real-git seam, then delegates to the gate. Best-effort: a missing/failed
saga store leaves the gate a pass-through rather than breaking merge.
"""
from forge_loop.runner.dispatch import _resolve_task_saga_store, _worker_task_id
from forge_loop.runner.merge_gate import (
WorkerLeaseRef,
apply_write_root_escape_gate,
collect_changed_paths,
)

store = _resolve_task_saga_store(cfg)
if store is None:
return []

def _lease_for(outcome: WorkerOutcome) -> WorkerLeaseRef | None:
try:
saga = store.get(_worker_task_id(outcome.issue))
except Exception: # noqa: BLE001 - saga state is advisory
saga = None
if saga is None:
return None
return WorkerLeaseRef(
task_id=saga.task_id,
worktree=saga.worktree,
write_roots=saga.capability_policy.filesystem.write_roots,
)

def _mark_quarantined(task_id: str, reason: str) -> None:
store.mark_quarantined(task_id, reason=reason)

return list(
apply_write_root_escape_gate(
outcomes,
lease_for=_lease_for,
changed_paths=lambda wt: collect_changed_paths(wt, cfg.base_branch),
gh=gh,
repo=cfg.github_repo,
mark_quarantined=_mark_quarantined,
events_file=cfg.events_file,
emit=bus_emit,
)
)


def _record_attempts(
cfg: Config,
outcomes: list[WorkerOutcome],
Expand Down
2 changes: 2 additions & 0 deletions src/forge_loop/sandbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
mcp_allow_patterns,
policy_hash,
render_capability_policy,
write_root_violations,
)

__all__ = [
Expand All @@ -20,4 +21,5 @@
"mcp_allow_patterns",
"policy_hash",
"render_capability_policy",
"write_root_violations",
]
52 changes: 52 additions & 0 deletions src/forge_loop/sandbox/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import hashlib
import json
import os
from collections.abc import Iterable
from dataclasses import dataclass, field
from typing import Any

Expand Down Expand Up @@ -93,6 +95,56 @@ def from_json_obj(cls, value: dict[str, Any] | None) -> CapabilityPolicy:
)


def _norm_fs_path(path: str) -> str:
"""Absolute, normalized, trailing-slash-insensitive form of a filesystem path.

Mirrors :func:`forge_loop.worktree_sweep._norm` (``normpath`` + strip trailing
separators) but also absolutizes so a relative-vs-absolute pairing for the same
location compares equal — the normalization the write-root gate (#443) needs.
"""
return os.path.normpath(os.path.abspath(path.strip())).rstrip(os.sep)


def _path_under_root(path: str, root: str) -> bool:
"""True iff ``path`` is ``root`` itself or nested beneath it (normalized)."""
p, r = _norm_fs_path(path), _norm_fs_path(root)
return p == r or p.startswith(r + os.sep)


def write_root_violations(
changed_paths: Iterable[str],
write_roots: Iterable[str],
) -> tuple[str, ...]:
"""Changed paths that fall OUTSIDE every leased ``write_roots`` entry (#443).

Pure helper for the write-root-escape merge gate. Given the paths a worker's
diff touched and the filesystem write-roots it was leased, return the
(order-preserving, de-duplicated) tuple of changed paths that escape the
sandbox. An empty tuple means the diff stayed entirely in-bounds.

Comparison is normalized (absolute, ``normpath``, trailing-slash insensitive)
so a path equal-but-for-a-trailing-slash or relative-vs-absolute to a write
root is NOT a false positive.

Fail-safe stance — **closed-by-default**: with *no* leased write roots (empty
``write_roots``) every changed path is a violation, matching the merge gate's
"conservative on uncertainty" convention (an un-leased worker that still
produced a diff is treated as having escaped, not as trusted).
"""
roots = tuple(r for r in write_roots if r and r.strip())
seen: set[str] = set()
violations: list[str] = []
for raw in changed_paths:
if not raw or not raw.strip():
continue
if any(_path_under_root(raw, root) for root in roots):
continue
if raw not in seen:
seen.add(raw)
violations.append(raw)
return tuple(violations)


def mcp_allow_patterns(policy: CapabilityPolicy) -> list[str]:
"""SDK/settings ``allowed_tools`` MCP patterns derived from ``policy.mcp``.

Expand Down
Loading
Loading