From a1362dc68ca004de9a767f3c87354ac79c13d206 Mon Sep 17 00:00:00 2001 From: kmajdoub Date: Tue, 9 Jun 2026 01:05:03 +0200 Subject: [PATCH] feat(merge_gate): refuse + quarantine a PR whose diff escaped its write-roots (#443) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The merge gate refused PRs for closed source issues (#65) and weak mutation oracles (#381) but never checked whether the worker's diff stayed inside the filesystem sandbox it was leased. The lease scopes Write/Edit at the Claude settings layer, but `Bash(*)` cannot be path-scoped there, so a worker could `bash`-write a file outside its worktree and — on an otherwise-green run — sail through the gate into mergeable state. A Bash escape advancing into durable cognition. This adds a sibling post-hoc gate: - `sandbox.policy.write_root_violations(changed_paths, write_roots)`: a pure, normalized (abs / normpath / trailing-slash-insensitive, mirroring worktree_sweep) helper returning the changed paths outside every write root. Closed-by-default: an empty lease treats any changed path as a violation. - `merge_gate.apply_write_root_escape_gate`: modeled on `apply_mutation_survivor_gate`. On a non-empty violation tuple it disables auto-merge, posts a path-naming comment, quarantines the worktree by reusing `quarantine_if_blocking` + `mark_quarantined` (no new quarantine machinery), emits `merge_refused_write_root_escape` with the offending paths, and flips status merged->open. Empty violations => byte-for-byte pass-through. All gh / quarantine side effects are best-effort (contextlib.suppress) so the event + status flip always fire. - Changed-path collection goes through an injectable seam (`collect_changed_paths` + a `changed_paths` callable) so tests drive it with a fake — no real git. - Wired into `runner/tick.py::_run_merge_gate` via `_apply_write_root_escape_gate`, resolving each outcome's leased CapabilityPolicy + worktree from the tick-scoped saga store (best-effort: a missing store leaves the gate a pass-through). Tests: pure-function matrix (in/out of bounds, normalization, closed-by-default, dedup, property test), gate happy/adversarial/best-effort/skip paths, and both returncode branches of the git seam. Co-Authored-By: Claude Opus 4.7 --- src/forge_loop/runner/merge_gate.py | 138 +++++++++++++++ src/forge_loop/runner/tick.py | 63 ++++++- src/forge_loop/sandbox/__init__.py | 2 + src/forge_loop/sandbox/policy.py | 52 ++++++ tests/test_runner_merge_gate.py | 180 ++++++++++++++++++++ tests/test_sandbox_write_root_violations.py | 92 ++++++++++ 6 files changed, 526 insertions(+), 1 deletion(-) create mode 100644 tests/test_sandbox_write_root_violations.py diff --git a/src/forge_loop/runner/merge_gate.py b/src/forge_loop/runner/merge_gate.py index 54735ef..56166c0 100644 --- a/src/forge_loop/runner/merge_gate.py +++ b/src/forge_loop/runner/merge_gate.py @@ -25,13 +25,18 @@ from __future__ import annotations import contextlib +import os +import subprocess from collections.abc import Callable from dataclasses import dataclass, field +from pathlib import Path from typing import Any, Protocol from forge_loop.config import MutationGateConfig +from forge_loop.sandbox.policy import write_root_violations from forge_loop.state import append_event from forge_loop.worker import WorkerOutcome +from forge_loop.worker_worktree import quarantine_if_blocking class GhMergeGateClient(Protocol): @@ -264,3 +269,136 @@ def apply_mutation_survivor_gate( o.status = "open" refused.append(o.issue) return refused + + +# --------------------------------------------------------------------------- +# Write-root-escape gate (issue #443): refuse merge when a worker's diff touched +# files OUTSIDE the filesystem sandbox it was leased. The lease scopes Write/Edit +# at the Claude *settings* layer, but ``Bash(*)`` cannot be path-scoped there, so +# a worker can ``bash``-write outside its worktree and — on an otherwise-green +# run — sail into mergeable state. This POST-HOC gate inspects the diff's paths +# against the leased ``write_roots`` and refuses + quarantines an escape, rather +# than auto-merging a sandbox break into durable cognition. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class WorkerLeaseRef: + """The slice of a worker's leased saga the write-root gate needs. + + Resolved per outcome via an injected callable so tests can supply a fake and + production can look the saga up by issue. ``write_roots`` is the leased + :class:`~forge_loop.sandbox.policy.FilesystemScope.write_roots`; ``worktree`` + is where the diff is collected and what gets quarantined on a violation. + """ + + task_id: str | None + worktree: str | None + write_roots: tuple[str, ...] + + +def collect_changed_paths( + worktree: str, + base_branch: str, + *, + run: Callable[..., subprocess.CompletedProcess[str]] = subprocess.run, +) -> list[str]: + """Absolute paths a worktree's diff touched vs ``origin/``. + + The production backing for the gate's injectable ``changed_paths`` seam. The + ``run`` callable is injected so tests drive it with a fake — no real git. A + non-zero git returncode yields an empty list (nothing inspectable ⇒ the gate + is a pass-through, never a false refusal on a transient git failure). + """ + proc = run( + ["git", "diff", "--name-only", f"origin/{base_branch}"], + cwd=worktree, + capture_output=True, + text=True, + timeout=30, + check=False, + ) + if proc.returncode != 0: + return [] + names = [line.strip() for line in proc.stdout.splitlines() if line.strip()] + return [os.path.join(worktree, name) for name in names] + + +def _write_root_refusal_comment(violations: tuple[str, ...]) -> str: + """PR comment naming the escaping path(s) so the refusal is actionable.""" + bullets = "\n".join(f"- {p}" for p in violations) + return ( + "Worker diff escaped its leased filesystem sandbox (write-root escape). " + "Loop refusing auto-merge and quarantining the worktree: a `Bash(*)` write " + "outside the leased `write_roots` must not promote itself into durable " + "cognition. Paths outside the sandbox:\n" + f"{bullets}" + ) + + +def apply_write_root_escape_gate( + outcomes: list[WorkerOutcome], + *, + lease_for: Callable[[WorkerOutcome], WorkerLeaseRef | None], + changed_paths: Callable[[str], list[str]], + gh: GhMergeGateClient, + repo: str | None, + quarantine: Callable[[Path], Path | None] = quarantine_if_blocking, + mark_quarantined: Callable[[str, str], None] | None = None, + events_file: Any | None = None, + emit: Callable[[str, dict[str, Any]], None] | None = None, +) -> list[int]: + """Refuse merge for outcomes whose diff escaped the leased ``write_roots``. + + Returns the list of issue numbers whose merge was refused (empty when every + diff stayed in-bounds). On refusal each PR has auto-merge disabled, an + escape-naming comment posted, the worktree quarantined (reusing + :func:`forge_loop.worker_worktree.quarantine_if_blocking` and, when a + ``mark_quarantined`` sink + ``task_id`` are available, + :meth:`tasks.store.mark_quarantined`), and a ``merge_refused_write_root_escape`` + event emitted carrying the offending paths; a ``merged`` status is flipped to + ``open`` so the attempts ledger reflects the refusal. + + Pass-through when the diff is clean (empty violation tuple) — current merge + behaviour is unchanged. Outcomes with no ``pr_url``, or whose lease/worktree + cannot be resolved, are skipped (nothing inspectable). All GitHub/quarantine + side effects are best-effort (``contextlib.suppress``); the event + status + flip MUST still fire even if a side effect raises. + """ + refused: list[int] = [] + for o in outcomes: + if not o.pr_url: + continue + lease = lease_for(o) + if lease is None or not lease.worktree: + continue + violations = write_root_violations(changed_paths(lease.worktree), lease.write_roots) + if not violations: + continue + + body = _write_root_refusal_comment(violations) + # Best-effort side effects; the event + status flip below MUST still fire. + with contextlib.suppress(Exception): + gh.disable_pr_auto_merge(o.pr_url, repo=repo) + with contextlib.suppress(Exception): + gh.pr_comment(o.pr_url, body, repo=repo) + with contextlib.suppress(Exception): + quarantine(Path(lease.worktree)) + if lease.task_id and mark_quarantined is not None: + with contextlib.suppress(Exception): + mark_quarantined(lease.task_id, "write-root escape (#443)") + + payload = { + "issue": o.issue, + "pr": o.pr_url, + "paths": list(violations), + } + if events_file is not None: + append_event(events_file, "merge_refused_write_root_escape", **payload) + if emit is not None: + emit("merge_refused_write_root_escape", payload) + + if o.status == "merged": + o.status = "open" + refused.append(o.issue) + return refused diff --git a/src/forge_loop/runner/tick.py b/src/forge_loop/runner/tick.py index 6c420cb..7e6d2f9 100644 --- a/src/forge_loop/runner/tick.py +++ b/src/forge_loop/runner/tick.py @@ -1119,6 +1119,12 @@ def _run_merge_gate( emit=bus_emit, ) + # Write-root-escape gate (#443): refuse + quarantine a PR whose diff touched + # files outside the worker's leased ``write_roots`` (a ``Bash(*)`` sandbox + # escape the settings layer cannot path-scope). Resolved against the saga's + # leased CapabilityPolicy; best-effort so saga bookkeeping never breaks merge. + escape_refused = _apply_write_root_escape_gate(cfg, outcomes, gh=_gh, bus_emit=bus_emit) + # Oracle-strength gate (#381). Dormant in production until a result source # is wired (mutation_result left _UNSET); a provided result/None engages it. mutation_refused: list[int] = [] @@ -1133,7 +1139,7 @@ def _run_merge_gate( emit=bus_emit, ) - refused_all = set(refused) | set(mutation_refused) + refused_all = set(refused) | set(mutation_refused) | set(escape_refused) _enable_automerge_for_reviewed_outcomes( cfg, outcomes, @@ -1154,6 +1160,61 @@ def _run_merge_gate( return refused_all +def _apply_write_root_escape_gate( + cfg: Config, + outcomes: list[WorkerOutcome], + *, + gh: Any, + bus_emit: Any, +) -> list[int]: + """Production wiring for the write-root-escape gate (#443). + + Resolves each outcome's leased saga (worktree + ``write_roots`` + task id) + from the tick-scoped saga store and collects the worktree's changed paths via + a real-git seam, then delegates to the gate. Best-effort: a missing/failed + saga store leaves the gate a pass-through rather than breaking merge. + """ + from forge_loop.runner.dispatch import _resolve_task_saga_store, _worker_task_id + from forge_loop.runner.merge_gate import ( + WorkerLeaseRef, + apply_write_root_escape_gate, + collect_changed_paths, + ) + + store = _resolve_task_saga_store(cfg) + if store is None: + return [] + + def _lease_for(outcome: WorkerOutcome) -> WorkerLeaseRef | None: + try: + saga = store.get(_worker_task_id(outcome.issue)) + except Exception: # noqa: BLE001 - saga state is advisory + saga = None + if saga is None: + return None + return WorkerLeaseRef( + task_id=saga.task_id, + worktree=saga.worktree, + write_roots=saga.capability_policy.filesystem.write_roots, + ) + + def _mark_quarantined(task_id: str, reason: str) -> None: + store.mark_quarantined(task_id, reason=reason) + + return list( + apply_write_root_escape_gate( + outcomes, + lease_for=_lease_for, + changed_paths=lambda wt: collect_changed_paths(wt, cfg.base_branch), + gh=gh, + repo=cfg.github_repo, + mark_quarantined=_mark_quarantined, + events_file=cfg.events_file, + emit=bus_emit, + ) + ) + + def _record_attempts( cfg: Config, outcomes: list[WorkerOutcome], diff --git a/src/forge_loop/sandbox/__init__.py b/src/forge_loop/sandbox/__init__.py index 8cc3616..c5d987e 100644 --- a/src/forge_loop/sandbox/__init__.py +++ b/src/forge_loop/sandbox/__init__.py @@ -9,6 +9,7 @@ mcp_allow_patterns, policy_hash, render_capability_policy, + write_root_violations, ) __all__ = [ @@ -20,4 +21,5 @@ "mcp_allow_patterns", "policy_hash", "render_capability_policy", + "write_root_violations", ] diff --git a/src/forge_loop/sandbox/policy.py b/src/forge_loop/sandbox/policy.py index 3f29a22..2bed5cc 100644 --- a/src/forge_loop/sandbox/policy.py +++ b/src/forge_loop/sandbox/policy.py @@ -8,6 +8,8 @@ import hashlib import json +import os +from collections.abc import Iterable from dataclasses import dataclass, field from typing import Any @@ -93,6 +95,56 @@ def from_json_obj(cls, value: dict[str, Any] | None) -> CapabilityPolicy: ) +def _norm_fs_path(path: str) -> str: + """Absolute, normalized, trailing-slash-insensitive form of a filesystem path. + + Mirrors :func:`forge_loop.worktree_sweep._norm` (``normpath`` + strip trailing + separators) but also absolutizes so a relative-vs-absolute pairing for the same + location compares equal — the normalization the write-root gate (#443) needs. + """ + return os.path.normpath(os.path.abspath(path.strip())).rstrip(os.sep) + + +def _path_under_root(path: str, root: str) -> bool: + """True iff ``path`` is ``root`` itself or nested beneath it (normalized).""" + p, r = _norm_fs_path(path), _norm_fs_path(root) + return p == r or p.startswith(r + os.sep) + + +def write_root_violations( + changed_paths: Iterable[str], + write_roots: Iterable[str], +) -> tuple[str, ...]: + """Changed paths that fall OUTSIDE every leased ``write_roots`` entry (#443). + + Pure helper for the write-root-escape merge gate. Given the paths a worker's + diff touched and the filesystem write-roots it was leased, return the + (order-preserving, de-duplicated) tuple of changed paths that escape the + sandbox. An empty tuple means the diff stayed entirely in-bounds. + + Comparison is normalized (absolute, ``normpath``, trailing-slash insensitive) + so a path equal-but-for-a-trailing-slash or relative-vs-absolute to a write + root is NOT a false positive. + + Fail-safe stance — **closed-by-default**: with *no* leased write roots (empty + ``write_roots``) every changed path is a violation, matching the merge gate's + "conservative on uncertainty" convention (an un-leased worker that still + produced a diff is treated as having escaped, not as trusted). + """ + roots = tuple(r for r in write_roots if r and r.strip()) + seen: set[str] = set() + violations: list[str] = [] + for raw in changed_paths: + if not raw or not raw.strip(): + continue + if any(_path_under_root(raw, root) for root in roots): + continue + if raw not in seen: + seen.add(raw) + violations.append(raw) + return tuple(violations) + + def mcp_allow_patterns(policy: CapabilityPolicy) -> list[str]: """SDK/settings ``allowed_tools`` MCP patterns derived from ``policy.mcp``. diff --git a/tests/test_runner_merge_gate.py b/tests/test_runner_merge_gate.py index 44689ad..5e5e299 100644 --- a/tests/test_runner_merge_gate.py +++ b/tests/test_runner_merge_gate.py @@ -20,11 +20,14 @@ from forge_loop.config import CriticConfig, MutationGateConfig from forge_loop.runner.merge_gate import ( MutationCheckResult, + WorkerLeaseRef, _mutation_refusal_comment, _refusal_comment, apply_issue_closed_gate, apply_mutation_survivor_gate, + apply_write_root_escape_gate, check_issue_closed_gate, + collect_changed_paths, ) from forge_loop.worker import WorkerOutcome @@ -571,3 +574,180 @@ def test_run_merge_gate_skips_promotion_for_refused_issue(tmp_path: Path, monkey still_merged = _outcome(47, pr="https://gh/u/r/pull/62", status="merged") _tick._record_merged_memory(cfg, [still_merged], refused_issues=refused) assert promoted_inputs == [] # nothing promoted — refused issue excluded + + +# --------------------------------------------------------------------------- +# Write-root-escape gate (issue #443). +# --------------------------------------------------------------------------- + + +def _lease(worktree: str, *, task_id: str = "task-47-worker") -> WorkerLeaseRef: + """A lease whose only write root is the worktree (the production shape).""" + return WorkerLeaseRef(task_id=task_id, worktree=worktree, write_roots=(worktree,)) + + +def test_write_root_gate_passes_for_in_bounds_diff(tmp_path: Path) -> None: + # Happy path: every changed path lives under the leased worktree → the gate + # is a byte-for-byte pass-through (no refuse, no quarantine, no event). + gh = _FakeGh() + wt = str(tmp_path / "wt") + o = _outcome(47, pr="https://gh/u/r/pull/62", status="merged") + quarantined: list[Path] = [] + marked: list[tuple[str, str]] = [] + events = tmp_path / "events.jsonl" + + refused = apply_write_root_escape_gate( + [o], + lease_for=lambda _o: _lease(wt), + changed_paths=lambda w: [w + "/src/forge_loop/x.py"], + gh=gh, + repo="o/r", + quarantine=lambda p: quarantined.append(p) or p, + mark_quarantined=lambda tid, reason: marked.append((tid, reason)), + events_file=events, + ) + + assert refused == [] + assert o.status == "merged" # unchanged + assert gh.disable_calls == [] + assert quarantined == [] + assert marked == [] + assert not events.exists() + + +def test_write_root_gate_refuses_and_quarantines_on_escape(tmp_path: Path) -> None: + # Adversarial: a path outside the worktree → refuse, quarantine, event, + # status flip merged→open, comment posted. + gh = _FakeGh() + wt = str(tmp_path / "wt") + escape = "/home/u/forge-loop/src/forge_loop/runner/tick.py" + o = _outcome(47, pr="https://gh/u/r/pull/62", status="merged") + quarantined: list[Path] = [] + marked: list[tuple[str, str]] = [] + events = tmp_path / "events.jsonl" + emitted: list[tuple[str, dict]] = [] + + refused = apply_write_root_escape_gate( + [o], + lease_for=lambda _o: _lease(wt), + changed_paths=lambda w: [w + "/ok.py", escape], + gh=gh, + repo="o/r", + quarantine=lambda p: quarantined.append(p) or p, + mark_quarantined=lambda tid, reason: marked.append((tid, reason)), + events_file=events, + emit=lambda kind, payload: emitted.append((kind, payload)), + ) + + assert refused == [47] + assert o.status == "open" # flipped + assert gh.disable_calls == [("https://gh/u/r/pull/62", "o/r")] + assert len(gh.comment_calls) == 1 + assert escape in gh.comment_calls[0]["body"] + # quarantine primitives reused on the worktree. + assert quarantined == [Path(wt)] + assert marked == [("task-47-worker", "write-root escape (#443)")] + # event emitted with the offending path. + assert emitted == [ + ("merge_refused_write_root_escape", {"issue": 47, "pr": o.pr_url, "paths": [escape]}) + ] + rows = [json.loads(line) for line in events.read_text().splitlines() if line.strip()] + assert rows[-1]["kind"] == "merge_refused_write_root_escape" + assert rows[-1]["paths"] == [escape] + + +def test_write_root_gate_side_effects_are_best_effort(tmp_path: Path) -> None: + # A raising pr_comment / disable_pr_auto_merge MUST NOT prevent the event + + # status flip from firing (mirrors apply_mutation_survivor_gate's contract). + gh = _FakeGh(disable_raises=True, comment_raises=True) + wt = str(tmp_path / "wt") + escape = "/etc/passwd" + o = _outcome(47, pr="https://gh/u/r/pull/62", status="merged") + events = tmp_path / "events.jsonl" + + def _raising_quarantine(_p: Path) -> Path | None: + raise RuntimeError("simulated quarantine failure") + + refused = apply_write_root_escape_gate( + [o], + lease_for=lambda _o: _lease(wt), + changed_paths=lambda w: [escape], + gh=gh, + repo="o/r", + quarantine=_raising_quarantine, + mark_quarantined=lambda tid, reason: (_ for _ in ()).throw(RuntimeError("boom")), + events_file=events, + ) + + # Despite EVERY side effect raising, the gate still refused + flipped + emitted. + assert refused == [47] + assert o.status == "open" + rows = [json.loads(line) for line in events.read_text().splitlines() if line.strip()] + assert rows[-1]["kind"] == "merge_refused_write_root_escape" + + +def test_write_root_gate_skips_unresolvable_lease(tmp_path: Path) -> None: + # lease_for → None (no saga / no worktree) → nothing inspectable → skip. + gh = _FakeGh() + o = _outcome(47, pr="https://gh/u/r/pull/62", status="merged") + called: list[str] = [] + + refused = apply_write_root_escape_gate( + [o], + lease_for=lambda _o: None, + changed_paths=lambda w: called.append(w) or ["/etc/passwd"], + gh=gh, + repo="o/r", + ) + + assert refused == [] + assert o.status == "merged" + assert called == [] # never even collected a diff + + +def test_write_root_gate_skips_no_pr_outcome(tmp_path: Path) -> None: + gh = _FakeGh() + wt = str(tmp_path / "wt") + o = _outcome(55, pr=None, status="failed") + refused = apply_write_root_escape_gate( + [o], + lease_for=lambda _o: _lease(wt), + changed_paths=lambda w: ["/etc/passwd"], + gh=gh, + repo="o/r", + ) + assert refused == [] + assert o.status == "failed" + + +# --------------------------------------------------------------------------- +# collect_changed_paths: the git seam. Both returncode branches (T3). +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeCompleted: + returncode: int + stdout: str = "" + + +def test_collect_changed_paths_returncode_zero(tmp_path: Path) -> None: + wt = str(tmp_path / "wt") + + def _run(cmd, **kw): # type: ignore[no-untyped-def] + assert cmd[:3] == ["git", "diff", "--name-only"] + return _FakeCompleted(returncode=0, stdout="a.py\nsrc/b.py\n\n") + + paths = collect_changed_paths(wt, "trunk", run=_run) + assert paths == [f"{wt}/a.py", f"{wt}/src/b.py"] + + +def test_collect_changed_paths_nonzero_returncode_is_empty(tmp_path: Path) -> None: + wt = str(tmp_path / "wt") + + def _run(cmd, **kw): # type: ignore[no-untyped-def] + return _FakeCompleted(returncode=128, stdout="garbage\n") + + # A failed git call yields no paths → the gate is a pass-through, never a + # false refusal on a transient git failure. + assert collect_changed_paths(wt, "trunk", run=_run) == [] diff --git a/tests/test_sandbox_write_root_violations.py b/tests/test_sandbox_write_root_violations.py new file mode 100644 index 0000000..ddaaa3f --- /dev/null +++ b/tests/test_sandbox_write_root_violations.py @@ -0,0 +1,92 @@ +"""Unit tests for the pure write-root-violation helper (issue #443). + +``write_root_violations`` underpins the write-root-escape merge gate: given the +paths a worker's diff touched and the filesystem write-roots it was leased, it +returns the paths that escaped the sandbox. The matrix below mirrors the issue +body: in-bounds → empty, out-of-bounds → that path, normalization (no false +positive on trailing-slash / relative-vs-abs), and the closed-by-default stance +on an empty lease. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from hypothesis import given +from hypothesis import strategies as st + +from forge_loop.sandbox.policy import write_root_violations + + +def test_in_bounds_path_is_no_violation(tmp_path: Path) -> None: + root = str(tmp_path) + inside = str(tmp_path / "src" / "forge_loop" / "x.py") + assert write_root_violations((inside,), (root,)) == () + + +def test_out_of_bounds_path_is_a_violation(tmp_path: Path) -> None: + root = str(tmp_path / "wt") + escape = "/home/u/forge-loop/src/forge_loop/runner/tick.py" + assert write_root_violations((escape,), (root,)) == (escape,) + + +def test_mixes_in_and_out_of_bounds(tmp_path: Path) -> None: + root = str(tmp_path / "wt") + inside = str(tmp_path / "wt" / "a.py") + escape = "/etc/passwd" + assert write_root_violations((inside, escape), (root,)) == (escape,) + + +def test_trailing_slash_insensitive_no_false_positive(tmp_path: Path) -> None: + # write root has a trailing slash; changed path does not — still in-bounds. + root_slash = str(tmp_path) + "/" + inside = str(tmp_path / "a.py") + assert write_root_violations((inside,), (root_slash,)) == () + # And the root itself (with trailing slash) equals the root → in-bounds. + assert write_root_violations((str(tmp_path) + "/",), (str(tmp_path),)) == () + + +def test_relative_vs_abs_no_false_positive(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + # A relative changed path resolving under an absolute write root is in-bounds. + monkeypatch.chdir(tmp_path) + assert write_root_violations(("sub/a.py",), (str(tmp_path),)) == () + + +def test_empty_write_roots_is_closed_by_default(tmp_path: Path) -> None: + # No lease at all → conservative: any changed path is a violation. + changed = str(tmp_path / "a.py") + assert write_root_violations((changed,), ()) == (changed,) + # A whitespace-only / empty root entry is treated as "no root". + assert write_root_violations((changed,), ("", " ")) == (changed,) + + +def test_clean_empty_diff_is_no_violation(tmp_path: Path) -> None: + assert write_root_violations((), (str(tmp_path),)) == () + # Blank changed entries are ignored, not reported as escapes. + assert write_root_violations(("", " "), ()) == () + + +def test_violations_are_deduplicated_order_preserving() -> None: + escape = "/etc/passwd" + other = "/etc/shadow" + assert write_root_violations((escape, other, escape), ("/safe",)) == (escape, other) + + +def test_multiple_write_roots_any_match_is_in_bounds(tmp_path: Path) -> None: + root_a = str(tmp_path / "a") + root_b = str(tmp_path / "b") + inside_b = str(tmp_path / "b" / "deep" / "x.py") + assert write_root_violations((inside_b,), (root_a, root_b)) == () + + +@given( + st.lists(st.text()), + st.lists(st.text()), +) +def test_never_raises_on_arbitrary_text(changed: list[str], roots: list[str]) -> None: + # Property (T5): the pure helper consumes user-shaped strings (paths from a + # diff) and must never raise — only ever return a tuple of strings. + result = write_root_violations(changed, roots) + assert isinstance(result, tuple) + assert all(isinstance(p, str) for p in result)