diff --git a/README.md b/README.md index 598afe4..8fdd8c6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,8 @@ A wire format and a verify function. See [POSITIONING.md](POSITIONING.md). [![CI](https://github.com/Aliipou/authgate-kernel/actions/workflows/ci.yml/badge.svg)](https://github.com/Aliipou/authgate-kernel/actions) [![Rust](https://img.shields.io/badge/kernel-Rust-orange.svg)](authgate-kernel/) -[![Tests](https://img.shields.io/badge/tests-1155%20passing-brightgreen.svg)](tests/) +[![Tests](https://img.shields.io/badge/tests-1297%20passing-brightgreen.svg)](tests/) +[![Runtime red team](https://img.shields.io/badge/runtime%20red--team-1000%20cases%2C%200%20escapes-brightgreen.svg)](redteam/) [![Kani](https://img.shields.io/badge/Kani-24%20harnesses-green.svg)](formal/) [![Lean4](https://img.shields.io/badge/Lean4-16%20theorems-blue.svg)](formal/lean4/) [![License: PolyForm Noncommercial 1.0.0](https://img.shields.io/badge/License-PolyForm--Noncommercial--1.0.0-orange.svg)](LICENSE) @@ -67,6 +68,88 @@ This is the same principle as capability-based OS security (seL4, CHERI), applie --- +## Agent runtime (operational layer) + +`src/authgate/runtime/` is a minimal, working agent runtime that puts the gate on +the real path — **not** part of the TCB: + +``` +intent → Planner → [PlanStep] → runtime loop → verify(action) → sandboxed tool → RunLog + AuditLog + │ deny → stop, nothing further runs +``` + +Single agent, 3 tools (calculator, file_read, web_search), deterministic planner. +A denied step halts the plan. Two things make it more than a demo: + +**1. It can run on the *verified* kernel (opt-in).** `build_runtime(backend="rust")` +routes each permit/deny decision into the Kani/Lean-verified Rust engine +(`engine::verify`) over a **JSON** boundary (`verify_json`), returning an +ed25519-signed verdict — no Rust objects enter Python. When that backend is +selected, the decision is made by the verified code instead of the Python +reimplementation. **The default is `backend="python"`** (the pure-Python +reference verifier), and that is the path CI runs — the Rust backend requires the +compiled extension (below) and is **not** exercised by CI. `"auto"` picks Rust +only when the extension is importable. Epoch-revocation semantics (absent from the +wire format) are preserved by serializing only currently-valid claims. + +**2. Tools can run in a real sandbox (opt-in).** Pass a `SandboxPolicy` and each +tool executes in an isolated subprocess under a wall-clock deadline and output cap +(every platform), plus opt-in POSIX rlimits (CPU/memory/file-size). A tool that +hangs, crashes, or runs away is *killed and reaped* → clean denial, rather than an +in-process check. It bounds time/memory/output and isolates crashes; it is **not** +a network or syscall jail, and it is **off by default** (`sandbox_policy=None`). + +### Adversarial verification — 1000 engineers + +`redteam/runtime_redteam.py` synthesizes 1000 deterministic adversarial +"engineers" across 10 attack classes (sandbox escape, capability forgery, +calculator injection, plan injection, revocation/epoch bypass, denial probing, +audit tampering, replay-after-deny, argument fuzzing, identity spoofing). The bar +is **zero escapes** — any one is a hard failure. + +| Backend | Result | +|---|---| +| Pure-Python verifier | 1000/1000 blocked, 0 escapes | +| **Verified Rust engine** (via JSON wire) | 1000/1000 blocked, 0 escapes | + +Two real vulnerabilities were found and fixed by this harness: an unbounded-`**` +calculator DoS (a `2**2**2**2**2**2` that hangs the process) and a Windows +reserved-device-name sandbox bypass (`CON` blocks forever; `NUL`/`COMn` open +devices). Both are now regression-covered. + +**Scope, honestly:** these are *self-authored* attack classes against the 3 MVP +tools. A green result is evidence that the *known* vectors are closed on both +backends — it is **not** a proof of security and **not** a substitute for +independent external review (still an open milestone). + +### Benchmark — cost of a verified decision + +`verify()` latency per gated tool call (`redteam/bench_verify.py`): + +| Path | Latency | Throughput | +|---|---|---| +| Pure-Python `FreedomVerifier` | ~38 µs/decision | ~26,000 /s | +| Verified Rust engine (JSON wire + ed25519 sign) | ~544 µs/decision | ~1,800 /s | + +Routing through the verified engine costs ~14× (JSON marshalling + per-call +signing) but stays sub-millisecond. Figures from one machine (Windows, CPython +3.13); treat as order-of-magnitude, not a spec. + +### Building the verified extension + +The Rust kernel exposes a PyO3 module (`authgate_kernel`). Build it with: + +```bash +# ASCII build dir (the C toolchain mangles non-ASCII paths); GNU toolchain. +CARGO_TARGET_DIR=/tmp/akbuild cargo build --lib --release +# copy the cdylib next to the package as authgate_kernel.{pyd,so} +``` + +The runtime falls back to the pure-Python verifier wherever the extension is not +built (CI included), so the runtime tests are green with or without it. + +--- + ## What it does NOT do | Not this | Why | @@ -351,8 +434,10 @@ The gap between `Permit/Deny` and actual constrained execution: | Gap | Status | What closes it | |---|---|---| +| **Runtime decides on the verified TCB** | Opt-in (`backend="rust"`); needs the built extension; **not run by CI** (CI uses Python) | A CI job that builds the extension and runs the runtime suite on the Rust backend | +| **Real tool sandbox** (process isolation) | Opt-in (`SandboxPolicy`, off by default); bounds time/memory/output + isolates crashes; **not** a syscall/network jail | seccomp / namespaces / WASM for syscall + network confinement | | **WASM sandbox** (`cargo build --features sandbox`) | Blocked: Windows SDK kernel32.lib missing | Install Windows SDK 10.0.22621 or build on Linux | -| **OS-level confinement** (seccomp-bpf) | Not implemented | Wrap tool subprocess with seccomp filter | +| **OS-level confinement** (seccomp-bpf, network jail) | Partial: process isolation + rlimits done; no syscall/network jail | seccomp filter / namespaces / WASM around the tool subprocess | | **End-to-end integration test** | **Done** (`tests/test_integration_e2e.py`) | 18 assertions: tool call → gate → audit chain | | **TLC model checker** | Java not installed | `java -jar tla2tools.jar -tool MC_AuthGateV3` | | **CLI** | Exists; not packaged | `pip install authgate-kernel` | diff --git a/examples/sandbox/notes.txt b/examples/sandbox/notes.txt new file mode 100644 index 0000000..5eae509 --- /dev/null +++ b/examples/sandbox/notes.txt @@ -0,0 +1,2 @@ +hello from sandbox +revenue Q1: 1200000 diff --git a/redteam/bench_verify.py b/redteam/bench_verify.py new file mode 100644 index 0000000..7a64e35 --- /dev/null +++ b/redteam/bench_verify.py @@ -0,0 +1,66 @@ +""" +Benchmark: cost of an authorization decision, pure-Python verifier vs the +verified Rust engine reached over the JSON wire (RustBackedVerifier). + +This is the decision-relevant number for the runtime: every gated tool call pays +one verify(). It answers "what does routing through the verified TCB cost per +call?" Run: AUTHGATE_BACKEND=python python redteam/bench_verify.py +""" +from __future__ import annotations + +import os +import sys +import time +from pathlib import Path + +os.environ.setdefault("AUTHGATE_BACKEND", "python") +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from authgate.kernel.entities import ( # noqa: E402 + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry # noqa: E402 +from authgate.kernel.verifier import Action, FreedomVerifier # noqa: E402 +from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available # noqa: E402 + + +def _scenario(): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + registry.add_claim(RightsClaim(owner, resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, resource, can_read=True), delegated_by=owner) + return registry, Action("t1", agent, resources_read=[resource]) + + +def _bench(label: str, verify, action, n: int) -> None: + verify(action) # warm up + t0 = time.perf_counter() + for _ in range(n): + verify(action) + elapsed = time.perf_counter() - t0 + per = elapsed / n * 1e6 # microseconds per decision + print(f" {label:32} {per:9.2f} us/decision {n/elapsed:12,.0f} decisions/s") + + +def main() -> int: + registry, action = _scenario() + n = 20000 + print(f"verify() latency over {n:,} permitted decisions (AUTHGATE_BACKEND=" + f"{os.environ.get('AUTHGATE_BACKEND')}):") + _bench("pure-Python FreedomVerifier", FreedomVerifier(registry).verify, action, n) + if rust_backend_available(): + _bench("verified Rust engine (JSON wire)", RustBackedVerifier(registry).verify, action, n) + else: + print(" verified Rust engine: SKIPPED (authgate_kernel extension not built)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/redteam/runtime_redteam.py b/redteam/runtime_redteam.py new file mode 100644 index 0000000..05a4642 --- /dev/null +++ b/redteam/runtime_redteam.py @@ -0,0 +1,612 @@ +""" +runtime_redteam.py — adversarial red-team harness for the capability-gated runtime. + +WHAT THIS IS +------------ +A standalone, deterministic harness that conjures 1000 distinct "engineers" — each +with its own seed, archetype and skill level — and pits every one of them against +the `authgate.runtime` agent runtime. Each engineer mounts exactly ONE attack drawn +from one of ten categories. The runtime is the *defender*: the harness PROVES that +every attack is blocked. ANY successful escape is a hard FAILURE (exit code 1). + +"Blocked" means the system correctly resisted the attack: the kernel gate denied the +step, the tool's own sandbox refused, the calculator rejected the expression, the +audit chain detected tampering, etc. An "escape" is the opposite — the attacker got +the runtime to do something it must never do (read a real system file, run code, +act without a capability, slip past a revocation, hide a tamper). + +DESIGN NOTES +------------ +* Fully deterministic: a fixed master seed yields an identical RedTeamReport. +* Standalone: it inserts ``src`` onto sys.path itself, so it runs without conftest. +* It does NOT modify or weaken any source under ``src``. It only *observes* the + runtime's behavior and asserts the safe outcome. If a real vulnerability existed, + an attack's ``blocked`` would come back False and surface as an escape — the + harness never masks that. + +RUN +--- + $env:AUTHGATE_BACKEND='python' # PowerShell + python redteam/runtime_redteam.py +""" +from __future__ import annotations + +import logging +import os +import random +import sys +import tempfile +from collections.abc import Callable +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +# --- standalone path wiring (the repo's tests use conftest; we must not rely on it) --- +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SRC = _REPO_ROOT / "src" +if str(_SRC) not in sys.path: + sys.path.insert(0, str(_SRC)) +# The runtime layer is not part of the TCB; the python backend is the one under test. +os.environ.setdefault("AUTHGATE_BACKEND", "python") + +# Each attack deliberately provokes denials, and the kernel logs a WARNING per +# deny. Across thousands of internal runs that log volume dominates runtime, so we +# quiet the authgate logger. This is purely a verbosity/performance change in the +# HARNESS — it does not touch any security decision in the source under test. +logging.getLogger("authgate").setLevel(logging.CRITICAL) + +from authgate.kernel.audit import AuditLog # noqa: E402 +from authgate.runtime import ( # noqa: E402 + AgentRuntime, + PlanStep, + ScriptedPlanner, +) +from authgate.runtime import build_runtime as _build_runtime_impl # noqa: E402 +from authgate.runtime.tools import calculate # noqa: E402 + +# Which authorization engine every attack runs against. run_redteam() sets this; +# wrapping build_runtime here means all attacks pick it up without per-call edits. +_ACTIVE_BACKEND = "python" + + +def build_runtime(*args: object, **kwargs: object): # noqa: ANN002, ANN003 + kwargs.setdefault("backend", _ACTIVE_BACKEND) + return _build_runtime_impl(*args, **kwargs) + +# Sentinel that must NEVER appear in any returned output: it is the marker of a +# real system-file read leaking through the sandbox. +_LEAK_MARKERS = ("root:", "[boot loader]", "win.ini", "/bin/bash", "daemon:") + + +# =========================================================================== +# Engineer population +# =========================================================================== + +_ARCHETYPES = [ + "path-traversal artist", "capability forger", "expression smuggler", + "prompt injector", "revocation racer", "denial prober", "ledger tamperer", + "replay operator", "fuzz gremlin", "identity ghost", "supply-chain mole", + "side-channel listener", +] +_SKILLS = ["junior", "mid", "senior", "principal", "nation-state"] +_FIRST = [ + "Mara", "Iqbal", "Soren", "Yuki", "Dmitri", "Aisha", "Tomas", "Lena", + "Kwame", "Priya", "Olek", "Nadia", "Hiro", "Camila", "Bjorn", "Fatima", + "Rashid", "Ingrid", "Diego", "Mei", "Anton", "Zara", "Lucas", "Noor", +] +_LAST = [ + "Voss", "Khan", "Aaltonen", "Tanaka", "Petrov", "Okafor", "Nguyen", + "Lindqvist", "Mbeki", "Sharma", "Kowalski", "Haddad", "Sato", "Reyes", + "Eriksson", "Ali", "Costa", "Wang", "Novak", "Bianchi", "Park", "Singh", +] + +# Attack categories in fixed order; index == id % len drives even distribution. +ATTACK_CATEGORIES = [ + "SANDBOX_ESCAPE", + "CAP_FORGERY", + "CALC_INJECTION", + "PLAN_INJECTION", + "REVOCATION_BYPASS", + "DENIAL_PROBE", + "AUDIT_TAMPER", + "REPLAY_AFTER_DENY", + "ARG_FUZZ", + "IDENTITY_SPOOF", +] + + +@dataclass(frozen=True) +class EngineerProfile: + """One adversary. Synthesized deterministically from the master seed.""" + + id: int + name: str + archetype: str + seed: int + attack_category: str + skill: str + bio: str + + +@dataclass(frozen=True) +class AttackResult: + """Outcome of one engineer's single attack.""" + + profile: EngineerProfile + category: str + blocked: bool # True == system correctly resisted; False == ESCAPE + detail: str + + +@dataclass +class RedTeamReport: + total: int + blocked: int + escapes: list[AttackResult] = field(default_factory=list) + by_category: dict[str, tuple[int, int]] = field(default_factory=dict) # cat -> (blocked, total) + + +def _make_profiles(n: int, master_seed: int) -> list[EngineerProfile]: + """Deterministically synthesize n distinct engineers from one master seed.""" + rng = random.Random(master_seed) + profiles: list[EngineerProfile] = [] + for i in range(n): + category = ATTACK_CATEGORIES[i % len(ATTACK_CATEGORIES)] + first = rng.choice(_FIRST) + last = rng.choice(_LAST) + name = f"{first} {last}-{i:04d}" + archetype = rng.choice(_ARCHETYPES) + skill = rng.choice(_SKILLS) + seed = rng.randrange(1, 2**31 - 1) + bio = ( + f"{skill} {archetype} (#{i}); favors the {category} vector; " + f"operates from seed {seed}." + ) + profiles.append(EngineerProfile( + id=i, name=name, archetype=archetype, seed=seed, + attack_category=category, skill=skill, bio=bio, + )) + return profiles + + +# =========================================================================== +# Sandbox fixture (benign files only; we NEVER touch real OS files) +# =========================================================================== + +def _new_sandbox() -> Path: + """A fresh tmp dir seeded with a couple of innocuous files.""" + root = Path(tempfile.mkdtemp(prefix="redteam_sbx_")) + (root / "notes.txt").write_text("benign notes inside the sandbox\n", encoding="utf-8") + (root / "data.txt").write_text("12345\n", encoding="utf-8") + return root + + +def _no_leak(outputs: list[Any]) -> bool: + """True iff no returned output contains the fingerprint of a real system file.""" + for out in outputs: + if not isinstance(out, str): + continue + low = out.lower() + if any(m.lower() in low for m in _LEAK_MARKERS): + return False + return True + + +# =========================================================================== +# Attack categories — each returns (blocked: bool, detail: str) +# =========================================================================== + +_HOSTILE_PATH_TEMPLATES = [ + "../etc/passwd", + "../../../../etc/shadow", + "../../../../../../etc/passwd", + "/etc/passwd", + "/etc/shadow", + r"C:\Windows\win.ini", + r"C:\boot.ini", + "..%2f..%2fetc%2fpasswd", + "....//....//etc/passwd", + "foo/../../bar/../../../etc/passwd", + "notes.txt/../../../../etc/passwd", + "./../" * 8 + "etc/passwd", + "etc/passwd\x00.txt", + "a" * 4000 + "/etc/passwd", + "../" * 64 + "Windows/System32/drivers/etc/hosts", + "subdir/../../../../etc/passwd", + # Windows reserved device names: resolve inside the sandbox yet open a device + # (CON blocks forever = DoS; NUL/COMn/LPTn touch hardware). Must be refused. + "CON", "NUL", "PRN", "AUX", "COM1", "LPT1", "CON.txt", "subdir/NUL", +] + + +def _attack_sandbox_escape(p: EngineerProfile) -> tuple[bool, str]: + """file_read GRANTED (capability passes) but a hostile path must be refused + by the tool's own sandbox boundary — and no real file content may leak.""" + rng = random.Random(p.seed) + root = _new_sandbox() + path = rng.choice(_HOSTILE_PATH_TEMPLATES) + # Occasionally append junk to vary it per engineer. + if rng.random() < 0.5: + path = path + ("." * rng.randint(1, 5)) + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep("file_read", {"filename": path}, "exfiltrate")]), + root, granted_tools=["file_read"], + ) + result = rt.run("read a file") + outcome = result.outcomes[0] + # Must be a denial (the tool raised PermissionError -> gate captures as not permitted), + # and absolutely no permitted outcome may carry real system-file content. + leaked_outputs = result.outputs() # only permitted steps' outputs + safe = (not outcome.permitted) and outcome.result.output is None and _no_leak(leaked_outputs) + detail = ( + f"path={path[:48]!r} permitted={outcome.permitted} " + f"reason={str(outcome.result.denied_reason)[:60]!r} no_leak={_no_leak(leaked_outputs)}" + ) + return safe, detail + + +def _attack_cap_forgery(p: EngineerProfile) -> tuple[bool, str]: + """Use a tool that was never granted. The kernel gate must DENY it and the + tool body must never run (output is None).""" + rng = random.Random(p.seed) + root = _new_sandbox() + # Pick a target tool, then grant everything EXCEPT it. + target = rng.choice(["file_read", "web_search", "calculator"]) + others = [t for t in ["file_read", "web_search", "calculator"] if t != target] + granted = rng.choice([[], others, [others[0]]]) # never includes target + args = { + "file_read": {"filename": "notes.txt"}, + "web_search": {"query": "authgate"}, + "calculator": {"expression": "2+2"}, + }[target] + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(target, args, "use ungranted tool")]), + root, granted_tools=granted, + ) + outcome = rt.run("go").outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None + return safe, f"target={target} granted={granted} permitted={outcome.permitted}" + + +_CALC_PAYLOADS = [ + "__import__('os').system('echo pwned')", + "().__class__.__bases__[0].__subclasses__()", + "lambda: 1", + "open('/etc/passwd')", + "1; import os", + "os.system('id')", + "eval('2+2')", + "(1).__class__", + "x + 1", + "f'{1+1}'", + "[].append(1)", + "{'a': 1}", + "2 ** 2 ** 2 ** 2 ** 2 ** 2", + "(" * 200 + "1" + ")" * 200, + "__builtins__", + "globals()", + "1 if True else 2", + "0xdeadbeef.bit_length()", +] + + +def _attack_calc_injection(p: EngineerProfile) -> tuple[bool, str]: + """Calculator must reject any non-arithmetic expression: calculate() raises + ValueError directly, AND through the runtime the step is denied with no + object/code execution leaking out.""" + rng = random.Random(p.seed) + root = _new_sandbox() + expr = rng.choice(_CALC_PAYLOADS) + + # 1) direct tool contract: must raise ValueError. + raised_value_error = False + try: + calculate(expr) + except ValueError: + raised_value_error = True + except Exception: + raised_value_error = False # any OTHER exception type would be suspicious + + # 2) through the runtime: granted calculator, but the step must be denied and + # output must be None (never a live Python object / side effect). + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep("calculator", {"expression": expr}, "exec")]), + root, granted_tools=["calculator"], + ) + outcome = rt.run("calculate").outcomes[0] + runtime_blocked = (not outcome.permitted) and outcome.result.output is None + safe = raised_value_error and runtime_blocked + return safe, ( + f"expr={expr[:40]!r} value_error={raised_value_error} " + f"permitted={outcome.permitted}" + ) + + +_FORBIDDEN_TOOLS = [ + "shell", "exec", "../tool", "calculator\n; rm -rf /", "file_write", + "system", "eval", "os.system", "", "CALCULATOR", "web_search ", "net", +] + + +def _attack_plan_injection(p: EngineerProfile) -> tuple[bool, str]: + """A prompt-injected plan naming an unknown/forbidden tool, or carrying a huge + arg payload, must produce a clean denial — never a crash, and loop integrity + must hold (run() returns normally).""" + rng = random.Random(p.seed) + root = _new_sandbox() + bad_tool = rng.choice(_FORBIDDEN_TOOLS) + huge = "A" * rng.randint(1000, 20000) + plan = [PlanStep(bad_tool, {"payload": huge, "x": rng.random()}, "injected")] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=None) + crashed = False + try: + result = rt.run("do the injected thing") + except Exception as exc: # run() must never raise on a hostile plan + return False, f"run() raised {type(exc).__name__}: {exc}" + outcome = result.outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None and not crashed + return safe, f"tool={bad_tool!r} permitted={outcome.permitted}" + + +def _attack_revocation_bypass(p: EngineerProfile) -> tuple[bool, str]: + """Run a granted tool (permit), then revoke_all and run again -> must DENY. + Also an epoch-revocation sub-check: a runtime demanding a higher min_epoch + rejects old-epoch claims, and reissuing via advance_epoch restores them.""" + root = _new_sandbox() + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "1+1"}, "compute")]) + rt, reg = build_runtime(planner, root, granted_tools=["calculator"], freeze=False) + + first = rt.run("compute").outcomes[0] + if not first.permitted: + return False, "baseline run was denied (tool was never usable)" + + reg.revoke_all("agent-1") + after_revoke = rt.run("compute").outcomes[0] + revoke_holds = not after_revoke.permitted + + # Epoch variant: a fresh runtime over the SAME gate demanding min_epoch=2. + # Claims default to epoch=1, so they must be rejected until advance_epoch(2). + # (We re-grant by re-building to avoid coupling to the revoked registry.) + rt2, reg2 = build_runtime(planner, root, granted_tools=["calculator"], freeze=False) + rt_hi = AgentRuntime( + agent=rt2._agent, gate=rt2._gate, tools=rt2._tools, planner=planner, min_epoch=2, + ) + epoch_old_rejected = not rt_hi.run("compute").outcomes[0].permitted + reg2.advance_epoch(2) + epoch_reissued_ok = rt_hi.run("compute").outcomes[0].permitted + epoch_holds = epoch_old_rejected and epoch_reissued_ok + + safe = revoke_holds and epoch_holds + return safe, ( + f"first={first.permitted} after_revoke={after_revoke.permitted} " + f"epoch_old_rejected={epoch_old_rejected} epoch_reissued={epoch_reissued_ok}" + ) + + +def _attack_denial_probe(p: EngineerProfile) -> tuple[bool, str]: + """A plan: permitted step, then a DENIED step in the middle, then a later step + that must NEVER execute. The runtime must stop_early and never run later bodies.""" + root = _new_sandbox() + # calculator granted, file_read NOT -> middle step denied. Later step is a + # calculator read of a SENTINEL file inside the sandbox; if it ran we'd see output. + (root / "should_not_read.txt").write_text("LATER_STEP_EXECUTED", encoding="utf-8") + plan = [ + PlanStep("calculator", {"expression": "1+1"}, "ok step"), + PlanStep("file_read", {"filename": "notes.txt"}, "DENIED middle (ungranted)"), + PlanStep("file_read", {"filename": "should_not_read.txt"}, "must not run"), + ] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=["calculator"]) + result = rt.run("probe") + # Exactly 2 outcomes recorded (loop broke at the denied middle step). + later_never_ran = all( + o.result.output != "LATER_STEP_EXECUTED" for o in result.outcomes + ) + structure_ok = ( + len(result.outcomes) == 2 + and result.outcomes[0].permitted + and not result.outcomes[1].permitted + and result.stopped_early + ) + safe = structure_ok and later_never_ran + return safe, ( + f"n_outcomes={len(result.outcomes)} stopped_early={result.stopped_early} " + f"later_never_ran={later_never_ran}" + ) + + +def _attack_audit_tamper(p: EngineerProfile) -> tuple[bool, str]: + """Run steps under an AuditLog (chain valid), then mutate one internal entry + field; verify_chain() must then report tampering (return False).""" + rng = random.Random(p.seed) + root = _new_sandbox() + audit = AuditLog() + plan = [ + PlanStep("calculator", {"expression": "1+1"}, "a"), + PlanStep("calculator", {"expression": "2+2"}, "b"), + PlanStep("calculator", {"expression": "3+3"}, "c"), + ] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=["calculator"], audit_log=audit) + rt.run("audited") + + before = audit.verify_chain() + if not audit._records: + return False, "no audit records were produced" + idx = rng.randrange(len(audit._records)) + field_choice = rng.choice(["permitted", "action_id", "confidence"]) + rec = audit._records[idx] + if field_choice == "permitted": + rec["permitted"] = not rec["permitted"] + elif field_choice == "action_id": + rec["action_id"] = str(rec["action_id"]) + "-TAMPERED" + else: + rec["confidence"] = (rec.get("confidence", 1.0) or 0.0) - 0.5 + after = audit.verify_chain() + + safe = before is True and after is False + return safe, f"before={before} after={after} tampered_field={field_choice} idx={idx}" + + +def _attack_replay_after_deny(p: EngineerProfile) -> tuple[bool, str]: + """After a denied step, re-issue the exact same step. Authority must not leak: + the replay must stay denied.""" + root = _new_sandbox() + # file_read NOT granted -> denied. Replay the same plan; still denied. + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "notes.txt"}, "denied read")]) + rt, _reg = build_runtime(planner, root, granted_tools=["calculator"]) + first = rt.run("read").outcomes[0] + second = rt.run("read").outcomes[0] + safe = (not first.permitted) and (not second.permitted) and second.result.output is None + return safe, f"first={first.permitted} replay={second.permitted}" + + +def _attack_arg_fuzz(p: EngineerProfile) -> tuple[bool, str]: + """Permitted tools with pathological args. The loop must never crash and a + tool that raises must surface as a non-permitted/None-output result — never + a denial-bypassing success and never an exception escaping run().""" + rng = random.Random(p.seed) + root = _new_sandbox() + tool = rng.choice(["calculator", "web_search", "file_read"]) + pathological = rng.choice([ + "💥" * 500, + "\x00\x01\x02\x03", + "\n\r\t" * 100, + "𝔘𝔫𝔦𝔠𝔬𝔡𝔢" * 50, + " ", + "-" * 5000, + "نظام" * 200, + ]) + # Build args with the right key for the tool, sometimes wrong-typed. + key = {"calculator": "expression", "web_search": "query", "file_read": "filename"}[tool] + val: Any = rng.choice([pathological, None, {"nested": pathological}, 12345]) + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(tool, {key: val}, "fuzz")]), + root, granted_tools=[tool], + ) + crashed = False + try: + result = rt.run("fuzz") + except Exception as exc: + return False, f"run() raised {type(exc).__name__}: {exc}" + outcome = result.outcomes[0] + # Either it permitted with a benign string output (web_search of junk is fine), + # or it was denied/errored with output None. Never: permitted with leaked content. + if outcome.permitted: + out = outcome.result.output + safe = (out is None or isinstance(out, str)) and _no_leak([out]) and not crashed + else: + safe = outcome.result.output is None and not crashed + return safe, f"tool={tool} val_type={type(val).__name__} permitted={outcome.permitted}" + + +def _attack_identity_spoof(p: EngineerProfile) -> tuple[bool, str]: + """Act as 'agent-1' from a runtime whose agent holds NO claims (granted=[]). + The agent is owned but un-capable, so every tool call must be denied.""" + rng = random.Random(p.seed) + root = _new_sandbox() + tool = rng.choice(["calculator", "web_search", "file_read"]) + args = { + "calculator": {"expression": "1+1"}, + "web_search": {"query": "authgate"}, + "file_read": {"filename": "notes.txt"}, + }[tool] + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(tool, args, "spoofed authority")]), + root, granted_tools=[], # agent holds zero capabilities + ) + outcome = rt.run("act as agent-1").outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None + return safe, f"tool={tool} permitted={outcome.permitted}" + + +_ATTACK_FNS: dict[str, Callable[[EngineerProfile], tuple[bool, str]]] = { + "SANDBOX_ESCAPE": _attack_sandbox_escape, + "CAP_FORGERY": _attack_cap_forgery, + "CALC_INJECTION": _attack_calc_injection, + "PLAN_INJECTION": _attack_plan_injection, + "REVOCATION_BYPASS": _attack_revocation_bypass, + "DENIAL_PROBE": _attack_denial_probe, + "AUDIT_TAMPER": _attack_audit_tamper, + "REPLAY_AFTER_DENY": _attack_replay_after_deny, + "ARG_FUZZ": _attack_arg_fuzz, + "IDENTITY_SPOOF": _attack_identity_spoof, +} + + +# =========================================================================== +# Orchestration +# =========================================================================== + +def _run_one(profile: EngineerProfile) -> AttackResult: + fn = _ATTACK_FNS[profile.attack_category] + try: + blocked, detail = fn(profile) + except Exception as exc: + # An unhandled exception in the HARNESS is a harness bug, but we treat it + # conservatively as a non-block so it surfaces loudly rather than hiding. + blocked, detail = False, f"harness exception: {type(exc).__name__}: {exc}" + return AttackResult(profile=profile, category=profile.attack_category, + blocked=blocked, detail=detail) + + +def run_redteam(n: int = 1000, master_seed: int = 1337, backend: str = "python") -> RedTeamReport: + """Run n engineers against the runtime and return a deterministic report. + + backend: which authorization engine to attack — "python" (reference) or + "rust" (the formally-verified engine via the JSON wire). Same attacks, same + pass/fail bar: ZERO escapes regardless of which engine decides. + """ + global _ACTIVE_BACKEND + _ACTIVE_BACKEND = backend + profiles = _make_profiles(n, master_seed) + results = [_run_one(p) for p in profiles] + + escapes = [r for r in results if not r.blocked] + by_category: dict[str, tuple[int, int]] = {} + for cat in ATTACK_CATEGORIES: + cat_results = [r for r in results if r.category == cat] + blocked = sum(1 for r in cat_results if r.blocked) + by_category[cat] = (blocked, len(cat_results)) + + return RedTeamReport( + total=len(results), + blocked=sum(1 for r in results if r.blocked), + escapes=escapes, + by_category=by_category, + ) + + +def _print_report(report: RedTeamReport) -> None: + line = "=" * 64 + print(line) + print("AuthGate runtime RED-TEAM HARNESS — 1000 adversarial engineers") + print(line) + print(f"{'CATEGORY':<20}{'BLOCKED':>10}{'TOTAL':>8}{'STATUS':>12}") + print("-" * 64) + for cat in ATTACK_CATEGORIES: + blocked, total = report.by_category.get(cat, (0, 0)) + status = "HELD" if blocked == total and total > 0 else "BREACH" + print(f"{cat:<20}{blocked:>10}{total:>8}{status:>12}") + print("-" * 64) + overall = "ALL ATTACKS BLOCKED" if not report.escapes else "ESCAPES DETECTED" + print(f"{'TOTAL':<20}{report.blocked:>10}{report.total:>8}{overall:>20}") + print(line) + if report.escapes: + print(f"\n!!! {len(report.escapes)} ESCAPE(S) — runtime FAILED to resist:\n") + for esc in report.escapes: + p = esc.profile + print(f" [ESCAPE] #{p.id} {p.name} ({p.skill} {p.archetype})") + print(f" category={esc.category}") + print(f" detail ={esc.detail}") + else: + print("\nNo escapes. The capability gate, sandbox, calculator, revocation,") + print("denial halting, and audit chain all held against every engineer.") + + +def main() -> int: + report = run_redteam() + _print_report(report) + return 1 if report.escapes else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/redteam/test_redteam_regression.py b/redteam/test_redteam_regression.py index e89f8e8..df01bcb 100644 --- a/redteam/test_redteam_regression.py +++ b/redteam/test_redteam_regression.py @@ -10,10 +10,10 @@ import sys -from authgate.kernel.entities import Entity, Resource, RightsClaim, AgentType, ResourceType +from authgate.kernel.entities import AgentType, Entity, Resource, ResourceType, RightsClaim +from authgate.kernel.hardened import HardenedVerifier from authgate.kernel.registry import OwnershipRegistry from authgate.kernel.verifier import Action, FreedomVerifier -from authgate.kernel.hardened import HardenedVerifier H = AgentType.HUMAN M = AgentType.MACHINE diff --git a/src/authgate/runtime/__init__.py b/src/authgate/runtime/__init__.py new file mode 100644 index 0000000..c559e28 --- /dev/null +++ b/src/authgate/runtime/__init__.py @@ -0,0 +1,53 @@ +""" +authgate.runtime — minimal capability-gated AI agent runtime (MVP). + +This is the operational layer described in planmvp.md. It is NOT part of the TCB. +It turns an agent intent into a tool plan, then executes each step through the +existing kernel CallGate so that no tool runs without a verified capability: + + intent -> Planner -> [PlanStep] -> AgentRuntime loop + | + v per step + build Action + | + v + CallGate.verify + execute (kernel TCB) + | + permit -> sandboxed tool runs + deny -> stop, nothing further runs + | + v + RunLog (jsonl) + hash-chained AuditLog + +Design constraints (planmvp.md): single agent, 3 tools, mock planner, +AuthGate verify per call, simple sandbox, append-only log. No delegation graph, +no epoch DSL, no multi-agent, no federation in this layer. +""" +from __future__ import annotations + +from authgate.runtime.agent import AgentRuntime, RunResult, StepOutcome, build_runtime +from authgate.runtime.planner import MockPlanner, Planner, PlanStep, ScriptedPlanner +from authgate.runtime.run_log import RunLog +from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available +from authgate.runtime.sandbox import SandboxPolicy, SandboxResult, run_tool_sandboxed +from authgate.runtime.tools import Tool, ToolRegistry, build_default_tools + +__all__ = [ + "AgentRuntime", + "RunResult", + "StepOutcome", + "build_runtime", + "Planner", + "PlanStep", + "MockPlanner", + "ScriptedPlanner", + "RunLog", + "Tool", + "ToolRegistry", + "build_default_tools", + "SandboxPolicy", + "SandboxResult", + "run_tool_sandboxed", + "RustBackedVerifier", + "rust_backend_available", +] diff --git a/src/authgate/runtime/_sandbox_runner.py b/src/authgate/runtime/_sandbox_runner.py new file mode 100644 index 0000000..941277f --- /dev/null +++ b/src/authgate/runtime/_sandbox_runner.py @@ -0,0 +1,101 @@ +""" +Sandbox child process — runs ONE tool call under OS resource limits, then exits. + +This module is the body of the isolated process spawned by `sandbox.py`. It is +never imported by the runtime in-process; it is executed as +``python -m authgate.runtime._sandbox_runner`` with a JSON job on stdin and a +single JSON result line on stdout (prefixed by a sentinel so tool chatter on +stdout cannot be mistaken for the result). + +Why a separate process at all: in-process input validation (see tools.py) stops +the inputs we thought of. Process isolation stops the ones we didn't — a tool +that hangs, allocates without bound, or crashes the interpreter takes down only +this child, and the parent reaps it and returns a denial. On POSIX the limits are +enforced by the kernel via setrlimit (CPU seconds, address space, file size); on +Windows, where setrlimit does not exist, the parent's wall-clock timeout and +output cap are the enforcement (documented honestly, not pretended otherwise). +""" +from __future__ import annotations + +import importlib +import json +import sys +from pathlib import Path +from typing import Any + +# Sentinel framing: the parent reads the line AFTER this marker as the result, so +# anything a tool or an import writes to stdout before it is harmless noise. +RESULT_MARKER = "\x00AUTHGATE_SANDBOX_RESULT\x00" + + +def _apply_posix_limits(limits: dict[str, int]) -> None: + """Enforce kernel resource limits. No-op where `resource` is unavailable (Windows). + + Attributes are read dynamically so this stays type-clean on platforms whose + stubs lack the POSIX-only `setrlimit`/`RLIMIT_*` names. Each limit is applied + only when explicitly requested (> 0): a too-low RLIMIT_AS can stop the + interpreter from starting and RLIMIT_FSIZE=0 can break .pyc writes, so these + are opt-in hardening, not silent defaults. + """ + try: + import resource # POSIX-only + except ImportError: + return + setrlimit = getattr(resource, "setrlimit", None) + if setrlimit is None: + return + for limit_key, rlimit_name in ( + ("cpu_seconds", "RLIMIT_CPU"), + ("max_memory_bytes", "RLIMIT_AS"), + ("max_file_bytes", "RLIMIT_FSIZE"), + ): + value = int(limits.get(limit_key, 0)) + rlimit = getattr(resource, rlimit_name, None) + if value > 0 and rlimit is not None: + setrlimit(rlimit, (value, value)) + + +def _resolve_tool(job: dict[str, Any]): + """Map a job to the concrete callable. Tool identity comes from the trusted + parent (build_runtime), never from attacker-controlled plan data.""" + builtin = job.get("builtin") + if builtin == "file_read": + # file_read is a closure bound to the sandbox root; rebuild it here. + from authgate.runtime.tools import _make_read_file + return _make_read_file(Path(job["sandbox_root"])) + entry = job.get("entry") + if entry: + module_name, _, func_name = entry.partition(":") + module = importlib.import_module(module_name) + return getattr(module, func_name) + raise ValueError("job names neither a builtin nor an importable entry") + + +def main() -> int: + raw = sys.stdin.read() + try: + job = json.loads(raw) + except json.JSONDecodeError as exc: + print(RESULT_MARKER + json.dumps({"ok": False, "error": f"bad job: {exc}"})) + return 0 + + _apply_posix_limits(job.get("limits", {})) + max_out = int(job.get("limits", {}).get("max_output_bytes", 65536)) + if max_out <= 0: + max_out = 65536 + + try: + fn = _resolve_tool(job) + output = fn(**job.get("args", {})) + text = str(output) + truncated = len(text.encode("utf-8", "replace")) > max_out + result = {"ok": True, "output": text[:max_out], "truncated": truncated} + except Exception as exc: # any tool failure is a denial, surfaced to the parent + result = {"ok": False, "error": f"{type(exc).__name__}: {exc}"} + + print(RESULT_MARKER + json.dumps(result)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/authgate/runtime/agent.py b/src/authgate/runtime/agent.py new file mode 100644 index 0000000..dbdfbcb --- /dev/null +++ b/src/authgate/runtime/agent.py @@ -0,0 +1,230 @@ +""" +AgentRuntime — the MVP agent loop (non-TCB orchestration). + +This is the whole runtime, and it is deliberately tiny (planmvp.md STEP 1): + + plan = planner.plan(intent) + for step in plan: + action = build_action(step) # typed capability request + result = gate.execute(action, ...) # kernel verifies + runs (or denies) + run_log.record(...) # trace it + if denied: stop # a denial halts the plan + +Every security decision happens inside the kernel `CallGate` / `FreedomVerifier`; +this module only *sequences* steps and turns each one into a typed `Action`. It +holds no authority of its own — if the agent lacks a capability, the gate denies +the step and the loop stops, with nothing further executed. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from authgate.kernel.audit import AuditLog +from authgate.kernel.call_gate import CallGate, GateResult +from authgate.kernel.entities import AgentType, Entity, RightsClaim +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action, FreedomVerifier +from authgate.runtime.planner import Planner, PlanStep +from authgate.runtime.run_log import RunLog +from authgate.runtime.tools import Tool, ToolRegistry, build_default_tools + + +@dataclass(frozen=True) +class StepOutcome: + """One executed (or denied) plan step and its gate result.""" + + step: PlanStep + result: GateResult + + @property + def permitted(self) -> bool: + return self.result.permitted + + +@dataclass(frozen=True) +class RunResult: + """Outcome of running one intent through the runtime.""" + + intent: str + agent_id: str + outcomes: list[StepOutcome] = field(default_factory=list) + stopped_early: bool = False + + @property + def permitted_count(self) -> int: + return sum(1 for o in self.outcomes if o.permitted) + + @property + def denied_count(self) -> int: + return sum(1 for o in self.outcomes if not o.permitted) + + def outputs(self) -> list[Any]: + """Outputs of the permitted steps, in order — used for reproducibility checks.""" + return [o.result.output for o in self.outcomes if o.permitted] + + +class AgentRuntime: + """Sequences a plan through the kernel gate. Owns no authority itself.""" + + def __init__( + self, + agent: Entity, + gate: CallGate, + tools: ToolRegistry, + planner: Planner, + run_log: RunLog | None = None, + min_epoch: int = 0, + ) -> None: + self._agent = agent + self._gate = gate + self._tools = tools + self._planner = planner + self._run_log = run_log or RunLog() + self._min_epoch = min_epoch + + @property + def run_log(self) -> RunLog: + return self._run_log + + def run(self, intent: str) -> RunResult: + """Plan the intent, then gate-execute each step until done or denied.""" + steps = self._planner.plan(intent) + outcomes: list[StepOutcome] = [] + stopped_early = False + + for index, step in enumerate(steps): + result = self._execute_step(index, step) + outcomes.append(StepOutcome(step=step, result=result)) + if not result.permitted: + # A denied step halts the plan: later steps may depend on it, and + # continuing would let an agent "probe" past a denial. + stopped_early = index < len(steps) - 1 + break + + return RunResult( + intent=intent, agent_id=self._agent.name, + outcomes=outcomes, stopped_early=stopped_early, + ) + + def _execute_step(self, index: int, step: PlanStep) -> GateResult: + """Build the typed Action for a step and run it through the gate.""" + try: + tool = self._tools.get(step.tool) + except KeyError as exc: + # A planner (or prompt-injected plan) naming an unknown tool is a + # denial, not a crash — there is no capability to grant for it. + result = GateResult(permitted=False, + denied_reason=f"unknown tool: {exc}", tool_name=step.tool) + self._log(index, step, result) + return result + + action = self._build_action(index, step, tool) + result = self._gate.execute(action, tool.name, step.args) + self._log(index, step, result) + return result + + def _build_action(self, index: int, step: PlanStep, tool: Tool) -> Action: + """Turn a plan step into a typed capability request for the tool's resource.""" + reads = [tool.resource] if tool.mode == "read" else [] + writes = [tool.resource] if tool.mode == "write" else [] + return Action( + action_id=f"{self._agent.name}-step{index}-{tool.name}", + actor=self._agent, + description=step.rationale, + resources_read=reads, + resources_write=writes, + argument=json.dumps(step.args, sort_keys=True, default=str), + min_epoch=self._min_epoch, + ) + + def _log(self, index: int, step: PlanStep, result: GateResult) -> None: + self._run_log.record( + agent_id=self._agent.name, step_index=index, + tool=step.tool, args=step.args, + permitted=result.permitted, output=result.output, + denied_reason=result.denied_reason, + ) + + +def _make_verifier(backend: str, registry: OwnershipRegistry, audit: AuditLog, freeze: bool): + """Select the authorization engine. "python" = reference verifier; "rust" = + the formally-verified Rust engine (requires the built extension); "auto" = + Rust when available, else Python.""" + if backend == "python": + return FreedomVerifier(registry, audit_log=audit, freeze=freeze) + from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available + if backend == "rust": + return RustBackedVerifier(registry, audit_log=audit, freeze=freeze) + if backend == "auto": + if rust_backend_available(): + return RustBackedVerifier(registry, audit_log=audit, freeze=freeze) + return FreedomVerifier(registry, audit_log=audit, freeze=freeze) + raise ValueError(f"unknown backend {backend!r}; use 'python', 'rust', or 'auto'") + + +def _make_sandboxed_fn(tool_name: str, sandbox_root: Path, policy: Any): + """Wrap a tool so it executes in an isolated subprocess under `policy`. A + limit breach (timeout/OOM/crash) surfaces as an exception, which the gate + turns into a clean denial — same contract as an in-process tool raising.""" + from authgate.runtime.sandbox import run_tool_sandboxed + + def run(**args: Any) -> Any: + result = run_tool_sandboxed(tool_name, args, sandbox_root, policy) + if result.ok: + return result.output + raise RuntimeError(result.error or "sandbox denied") + + return run + + +def build_runtime( + intent_planner: Planner, + sandbox_root: Path, + granted_tools: list[str] | None = None, + audit_log: AuditLog | None = None, + run_log: RunLog | None = None, + freeze: bool = False, + backend: str = "python", + sandbox_policy: Any = None, +) -> tuple[AgentRuntime, OwnershipRegistry]: + """Wire a single-agent runtime for demos/tests. + + Creates a human owner, a machine agent it owns, and delegates read claims for + each tool in `granted_tools` (default: all). Tools NOT granted will be denied + by the gate when the planner tries to use them — exactly the MVP behavior. + + backend: "python" (default), "rust" (verified Rust engine via JSON wire), or + "auto". sandbox_policy: when set (a SandboxPolicy), tools run in an + isolated subprocess under OS resource limits instead of in-process. + + Returns (runtime, registry). The registry is returned so callers can revoke + claims / advance epochs and observe the gate react (when freeze=False). + """ + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + tools = build_default_tools(sandbox_root) + granted = set(tools.names() if granted_tools is None else granted_tools) + + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + for tool in tools: + if tool.name not in granted: + continue + # Owner asserts authority over the resource, then delegates read to the agent. + registry.add_claim(RightsClaim(owner, tool.resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, tool.resource, can_read=True), + delegated_by=owner) + + audit = audit_log if audit_log is not None else AuditLog() + verifier = _make_verifier(backend, registry, audit, freeze) + gate = CallGate(verifier) + for tool in tools: + fn = _make_sandboxed_fn(tool.name, sandbox_root, sandbox_policy) if sandbox_policy else tool.fn + gate.register(tool.name, fn) + + runtime = AgentRuntime(agent=agent, gate=gate, tools=tools, + planner=intent_planner, run_log=run_log) + return runtime, registry diff --git a/src/authgate/runtime/planner.py b/src/authgate/runtime/planner.py new file mode 100644 index 0000000..9ba932e --- /dev/null +++ b/src/authgate/runtime/planner.py @@ -0,0 +1,100 @@ +""" +Agent runtime planner — the deterministic LLM stand-in (non-TCB). + +A planner turns a natural-language intent into an ordered list of tool-call +steps. In production this is where an LLM would sit; here we use a fixed, +rule-based mock so the MVP is fully reproducible and testable. Real LLM +planners slot in behind the same `Planner` protocol without touching callers. + +This module is NOT part of the trusted computing base: a plan is only a +*request*. The kernel still gates every step, so a planner may legitimately +emit steps the agent lacks the capability to run (see `ScriptedPlanner`). +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Any, Protocol + + +@dataclass(frozen=True) +class PlanStep: + tool: str # tool name, e.g. "calculator" + args: dict[str, Any] = field(default_factory=dict) # kwargs for the tool fn + rationale: str = "" # short human-readable why (optional) + + +class Planner(Protocol): + def plan(self, intent: str) -> list[PlanStep]: ... + + +class ScriptedPlanner: + """Returns a pinned plan verbatim, ignoring the intent. + + Tests/demos use this to assert an exact step sequence — including plans that + deliberately contain a step the agent is not capable of, to exercise the + kernel's gating. + """ + + def __init__(self, steps: list[PlanStep]) -> None: + self._steps = list(steps) + + def plan(self, intent: str) -> list[PlanStep]: + # Copy so callers cannot mutate the pinned script through the returned list. + return list(self._steps) + + +# Trigger keywords kept as data so the rule order is obvious at a glance. +_ARITHMETIC_WORDS = ("calculate", "compute", "math", "sum", "plus", "times") +_SEARCH_WORDS = ("search", "find", "look up", "lookup", "what is") +_FILE_WORDS = ("read", "open", "file", "cat ") + +# A bare arithmetic expression (e.g. "2 + 3 * 4") is itself a calculation trigger, +# and the same regex extracts the expression to hand to the calculator tool. +_EXPR_RE = re.compile(r"[\d][\d+\-*/%.()\s]*[\d)]") + + +class MockPlanner: + """Deterministic, rule-based planner — a reproducible LLM substitute. + + Same intent always yields the same plan (hard MVP requirement). Rules are + applied in a fixed order; each appends a step when its trigger matches. + """ + + def plan(self, intent: str) -> list[PlanStep]: + text = intent.lower() + steps: list[PlanStep] = [] + self._maybe_arithmetic(text, steps) + self._maybe_search(text, intent, steps) + self._maybe_file(text, steps) + return steps + + def _maybe_arithmetic(self, text: str, steps: list[PlanStep]) -> None: + match = _EXPR_RE.search(text) + if not any(w in text for w in _ARITHMETIC_WORDS) and not match: + return + # Keyword without an actual expression still means "calculate something". + expression = match.group().strip() if match else "0" + steps.append(PlanStep("calculator", {"expression": expression}, + "intent asked for a calculation")) + + def _maybe_search(self, text: str, intent: str, steps: list[PlanStep]) -> None: + if any(w in text for w in _SEARCH_WORDS): + # Preserve original casing in the query; only matching is lowercased. + steps.append(PlanStep("web_search", {"query": intent.strip()}, + "intent asked to search")) + + def _maybe_file(self, text: str, steps: list[PlanStep]) -> None: + if any(w in text for w in _FILE_WORDS): + filename = self._extract_filename(text) + steps.append(PlanStep("file_read", {"filename": filename}, + "intent asked to read a file")) + + @staticmethod + def _extract_filename(text: str) -> str: + # A real filename-ish token carries a '.' or '/'; otherwise fall back so + # the plan is still actionable rather than empty. + for token in text.split(): + if "." in token or "/" in token: + return token + return "notes.txt" diff --git a/src/authgate/runtime/run_log.py b/src/authgate/runtime/run_log.py new file mode 100644 index 0000000..fda23d2 --- /dev/null +++ b/src/authgate/runtime/run_log.py @@ -0,0 +1,65 @@ +""" +RunLog — append-only execution trace for the agent runtime (non-TCB). + +This is the human/operator-facing record of *what the runtime did*: for each +plan step, the decision (permit/deny) and, when permitted, the tool output. It +complements — does not replace — the kernel's hash-chained `AuditLog`: + + * kernel AuditLog : tamper-evident record of every *verification decision* + (the security-critical integrity log). + * RunLog : operational trace including *tool results*, for debugging + and reproducibility checks. + +Format: one JSON object per line (.jsonl), append-only. Tool output is stored as +a truncated string so the trace never becomes an exfiltration channel for large +payloads and stays cheap to write. +""" +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from typing import Any + +_MAX_OUTPUT_CHARS = 2000 # keep traces bounded; full output lives in RunResult + + +@dataclass +class RunLog: + """Append-only jsonl trace of executed steps. path=None keeps it in memory.""" + + path: str | None = None + _entries: list[dict[str, Any]] = field(default_factory=list, init=False, repr=False) + + def record( + self, + agent_id: str, + step_index: int, + tool: str, + args: dict[str, Any], + permitted: bool, + output: Any = None, + denied_reason: str | None = None, + ) -> None: + """Append one step outcome. Never raises on policy denial.""" + entry = { + "ts": time.time(), + "agent_id": agent_id, + "step": step_index, + "tool": tool, + "args": args, + "decision": "permit" if permitted else "deny", + "output": (str(output)[:_MAX_OUTPUT_CHARS] if permitted else None), + "denied_reason": denied_reason, + } + self._entries.append(entry) + if self.path is not None: + with open(self.path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + + def entries(self) -> list[dict[str, Any]]: + """Snapshot of all recorded entries.""" + return list(self._entries) + + def __len__(self) -> int: + return len(self._entries) diff --git a/src/authgate/runtime/rust_backend.py b/src/authgate/runtime/rust_backend.py new file mode 100644 index 0000000..51b0520 --- /dev/null +++ b/src/authgate/runtime/rust_backend.py @@ -0,0 +1,163 @@ +""" +RustBackedVerifier — route the runtime's authorization decision through the +formally-verified Rust TCB engine, without importing Rust pyclass objects. + +Motivation (the honest gap this closes): the Kani/Lean proofs cover +``authgate-kernel/src/engine.rs``. The Python runtime, however, decides with +``authgate.kernel.verifier`` — a *different* implementation. So "the verified +kernel gates every call" and "the code that actually ran" were two codebases. + +This adapter makes the running system's permit/deny decision come from the +verified engine. The trust boundary is **JSON**: the Python registry + action are +serialized to the kernel wire format and handed to ``authgate_kernel.verify_json`` +(which calls ``crate::engine::verify`` and returns an ed25519-signed verdict). +Only JSON crosses — no Rust ``Entity``/``Action`` objects enter the Python +process, so the dual-type ``"Entity cannot be converted to Entity"`` problem +that forced ``AUTHGATE_BACKEND=python`` simply cannot occur here. + +Semantic reconciliation: the wire/engine has no *epoch* concept (the Python +verifier's revocation mechanism). We preserve it by serializing only the claims +the Python registry considers **currently valid** — unexpired, identity-matched, +delegation-chain-valid, and at or above the action's ``min_epoch``. The verified +engine then independently re-derives permit/deny from claim existence, machine +ownership (A4), no-dominion (A6), and the forbidden-flag set. Revocation and +epoch therefore behave identically to the pure-Python path, while the actual +decision is made by verified code. +""" +from __future__ import annotations + +import importlib +import json +from typing import Any + +from authgate.kernel.entities import Entity, Resource, RightsClaim +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action, VerificationResult + + +def rust_backend_available() -> bool: + """True iff the compiled verified-kernel extension can be imported.""" + try: + importlib.import_module("authgate_kernel") + return True + except ImportError: + return False + + +def _entity_wire(e: Entity) -> dict[str, str]: + return {"name": e.name, "kind": e.kind.name} + + +def _resource_wire(r: Resource) -> dict[str, Any]: + return { + "name": r.name, + "rtype": r.rtype.name, + "scope": r.scope, + "is_public": r.is_public, + "ifc_label": r.ifc_label, + } + + +def _claim_wire(c: RightsClaim) -> dict[str, Any]: + return { + "holder": _entity_wire(c.holder), + "resource": _resource_wire(c.resource), + "can_read": c.can_read, + "can_write": c.can_write, + "can_delegate": c.can_delegate, + "confidence": c.confidence, + "expires_at": c.expires_at, + "delegation_depth": 0, + } + + +# Action boolean flags, mapped 1:1 to the wire field names the engine reads. +_FLAG_FIELDS = ( + "increases_machine_sovereignty", + "resists_human_correction", + "bypasses_verifier", + "weakens_verifier", + "disables_corrigibility", + "machine_coalition_dominion", + "coerces", + "deceives", + "self_modification_weakens_verifier", + "machine_coalition_reduces_freedom", +) + + +def _action_wire(action: Action) -> dict[str, Any]: + wire: dict[str, Any] = { + "action_id": action.action_id, + "actor": _entity_wire(action.actor), + "description": action.description, + "resources_read": [_resource_wire(r) for r in action.resources_read], + "resources_write": [_resource_wire(r) for r in action.resources_write], + "resources_delegate": [_resource_wire(r) for r in action.resources_delegate], + "governs_humans": [_entity_wire(h) for h in action.governs_humans], + "argument": action.argument, + "delegation_depth": 0, + } + for flag in _FLAG_FIELDS: + wire[flag] = getattr(action, flag) + return wire + + +def _live_claims(registry: OwnershipRegistry, min_epoch: int) -> list[RightsClaim]: + """The claims the Python registry currently treats as valid — the set the + verified engine should see. This is where epoch/revocation/identity live.""" + live: list[RightsClaim] = [] + for c in registry._claims: + if not c.is_valid(): + continue + if not registry._identity_matches(c.holder): + continue + if not registry._delegation_chain_valid(c): + continue + if c.epoch < min_epoch: + continue + live.append(c) + return live + + +class RustBackedVerifier: + """Drop-in replacement for FreedomVerifier whose decision is made by the + verified Rust engine. Same ``verify(action) -> VerificationResult`` contract, + so CallGate, the audit log, and the runtime are unchanged.""" + + def __init__( + self, + registry: OwnershipRegistry, + audit_log: object = None, + freeze: bool = True, + ) -> None: + # Mirror FreedomVerifier's TOCTOU stance: snapshot unless told otherwise. + self.registry = ( + registry.freeze() if freeze and not getattr(registry, "_frozen", False) else registry + ) + self._audit_log = audit_log + self._ak = importlib.import_module("authgate_kernel") + + def verify(self, action: Action) -> VerificationResult: + registry_wire = { + "claims": [_claim_wire(c) for c in _live_claims(self.registry, action.min_epoch)], + "machine_owners": [ + {"machine": _entity_wire(m), "owner": _entity_wire(o)} + for m, o in self.registry._machine_owners.items() + ], + "trust_domains": [], + } + payload = {"registry": registry_wire, "action": _action_wire(action)} + out = json.loads(self._ak.verify_json(json.dumps(payload))) + + result = VerificationResult( + action_id=out["action_id"], + permitted=out["permitted"], + violations=tuple(out["violations"]), + warnings=tuple(out.get("warnings", ())), + confidence=out.get("confidence", 0.0), + requires_human_arbitration=out.get("requires_human_arbitration", False), + ) + if self._audit_log is not None: + self._audit_log.record(result) # type: ignore[attr-defined] + return result diff --git a/src/authgate/runtime/sandbox.py b/src/authgate/runtime/sandbox.py new file mode 100644 index 0000000..89e19c2 --- /dev/null +++ b/src/authgate/runtime/sandbox.py @@ -0,0 +1,160 @@ +""" +Real, OS-enforced sandbox for tool execution (non-TCB). + +`tools.py` validates the inputs it can foresee. This module contains the inputs +it cannot: it runs each tool in a separate OS process under resource limits and a +wall-clock deadline, so a tool that hangs, allocates without bound, or crashes the +interpreter is *killed by the OS / reaped by the parent* — it cannot take the +runtime down or block it forever. That is the difference between an in-process +prefix check (which the previous "sandbox" was) and actual containment. + +Honest platform scope: + * Always on (every platform): the parent's wall-clock timeout and the output + cap. These reliably kill hangs and bound output everywhere. + * POSIX, opt-in: CPU seconds, address space, and file size as hard kernel + limits (setrlimit in the child). Off by default — a too-tight + RLIMIT_AS/RLIMIT_FSIZE can stop the interpreter starting or writing + .pyc — so callers enable them with a known headroom budget. + * Windows — setrlimit does not exist; the rlimits are no-ops and enforcement is + the wall-clock timeout and output cap. Production confinement targets + Linux (the existing seccomp/WASM executors). + +What this is NOT: it is not a network/syscall jail. It bounds time, memory, and +output and isolates crashes. Network and filesystem confinement beyond the +file_read sandbox root require OS namespaces / seccomp / WASM (tracked elsewhere). +""" +from __future__ import annotations + +import json +import os +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import authgate +from authgate.runtime._sandbox_runner import RESULT_MARKER + +# The child runs `python -m authgate.runtime._sandbox_runner`, so it must be able +# to import `authgate`. Derive the src root from this package's location and put +# it on the child's PYTHONPATH; this works whether or not the package is installed. +_SRC_ROOT = str(Path(authgate.__file__).resolve().parent.parent) + +# Tools expressed as importable entry points (pure functions). file_read is bound +# to a per-runtime sandbox root, so it is dispatched as a builtin (see runner). +_ENTRY_POINTS = { + "calculator": "authgate.runtime.tools:calculate", + "web_search": "authgate.runtime.tools:web_search", +} +_ROOT_BOUND = {"file_read"} + + +@dataclass(frozen=True) +class SandboxPolicy: + """Resource envelope for one tool call. + + Cross-platform, always-on guards: the wall-clock timeout (parent kills the + child) and the output cap. The POSIX rlimits (cpu/memory/file-size) are + opt-in (0 = off) because a too-tight RLIMIT_AS or RLIMIT_FSIZE can prevent + the interpreter from starting or writing .pyc files — turning hardening into + spurious failures. Set them explicitly when running on Linux with a known + headroom budget. They are no-ops on Windows (no `resource` module).""" + + wall_timeout_s: float = 5.0 # parent kills the child past this (all platforms) + cpu_seconds: int = 0 # RLIMIT_CPU (POSIX); 0 = off + max_memory_mb: int = 0 # RLIMIT_AS (POSIX); 0 = off + max_file_bytes: int = 0 # RLIMIT_FSIZE (POSIX); 0 = off + max_output_bytes: int = 64 * 1024 # parent/child cap on returned output size + + def _limits(self) -> dict[str, int]: + return { + "cpu_seconds": self.cpu_seconds, + "max_memory_bytes": self.max_memory_mb * 1024 * 1024, + "max_file_bytes": self.max_file_bytes, + "max_output_bytes": self.max_output_bytes, + } + + +@dataclass(frozen=True) +class SandboxResult: + """Outcome of one sandboxed execution.""" + + ok: bool + output: str | None = None + error: str | None = None + killed: bool = False # True iff a resource/time limit terminated the child + + +def _build_job(tool: str, args: dict[str, Any], sandbox_root: Path, policy: SandboxPolicy) -> dict[str, Any]: + job: dict[str, Any] = {"args": args, "limits": policy._limits()} + if tool in _ROOT_BOUND: + job["builtin"] = tool + job["sandbox_root"] = str(sandbox_root) + elif tool in _ENTRY_POINTS: + job["entry"] = _ENTRY_POINTS[tool] + else: + # An unknown tool never reaches a process; the caller treats it as a denial. + job["entry"] = "" + return job + + +def run_tool_sandboxed( + tool: str, + args: dict[str, Any], + sandbox_root: Path, + policy: SandboxPolicy, + *, + entry_override: str | None = None, + env: dict[str, str] | None = None, +) -> SandboxResult: + """Run one tool call in an isolated child process under `policy`. + + `entry_override` ("module:function") is for tests that need to exercise the + sandbox against a deliberately hostile callable (e.g. one that hangs); the + runtime itself never sets it — tool identity comes from the registry. + """ + job = _build_job(tool, args, sandbox_root, policy) + if entry_override is not None: + job.pop("builtin", None) + job.pop("sandbox_root", None) + job["entry"] = entry_override + + child_env = dict(env) if env is not None else dict(os.environ) + existing_pp = child_env.get("PYTHONPATH", "") + if _SRC_ROOT not in existing_pp.split(os.pathsep): + child_env["PYTHONPATH"] = os.pathsep.join(p for p in (_SRC_ROOT, existing_pp) if p) + # No .pyc writes: keeps an opt-in RLIMIT_FSIZE from tripping on bytecode caching. + child_env.setdefault("PYTHONDONTWRITEBYTECODE", "1") + try: + proc = subprocess.run( + [sys.executable, "-m", "authgate.runtime._sandbox_runner"], + input=json.dumps(job), + capture_output=True, + text=True, + timeout=policy.wall_timeout_s, + env=child_env, + ) + except subprocess.TimeoutExpired: + return SandboxResult(ok=False, error="wall-clock timeout exceeded", killed=True) + + marker_at = proc.stdout.rfind(RESULT_MARKER) + if marker_at == -1: + # No result line: the child was killed before it could report (OOM, CPU + # limit -> SIGXCPU, segfault). That is containment working, not a bug. + detail = (proc.stderr or "").strip()[-200:] + return SandboxResult( + ok=False, + error=f"child terminated without result (rc={proc.returncode}): {detail}", + killed=True, + ) + + payload = proc.stdout[marker_at + len(RESULT_MARKER):].strip() + try: + result = json.loads(payload) + except json.JSONDecodeError as exc: + return SandboxResult(ok=False, error=f"unparseable sandbox result: {exc}", killed=True) + + if result.get("ok"): + return SandboxResult(ok=True, output=result.get("output")) + return SandboxResult(ok=False, error=result.get("error", "tool denied")) diff --git a/src/authgate/runtime/tools.py b/src/authgate/runtime/tools.py new file mode 100644 index 0000000..169b522 --- /dev/null +++ b/src/authgate/runtime/tools.py @@ -0,0 +1,224 @@ +""" +Agent runtime tools — the non-TCB action layer. + +These tools are NOT part of the trusted computing base. They are the concrete +side effects an agent can request (compute, file read, web search). Every tool +declares the capability (Resource + mode) it requires so the kernel can gate +invocation; the tools themselves perform the actual work once a capability check +has passed elsewhere. + +Defense in depth: `file_read` enforces its OWN sandbox boundary in addition to +the capability gate. Even if a capability were mis-issued, the tool still refuses +to read outside `sandbox_root`. Tools therefore assume they may be called with +hostile arguments and validate accordingly. +""" +from __future__ import annotations + +import ast +import operator +from collections.abc import Callable +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from authgate.kernel.entities import Resource, ResourceType + +_VALID_MODES = {"read", "write"} + + +@dataclass(frozen=True) +class Tool: + name: str + fn: Callable[..., Any] + resource: Resource # the capability this tool requires + mode: str # "read" or "write" + + def __post_init__(self) -> None: + if self.mode not in _VALID_MODES: + raise ValueError(f"mode must be one of {sorted(_VALID_MODES)}, got {self.mode!r}") + + +class ToolRegistry: + """Name-keyed collection of tools. Not a security boundary — just a lookup.""" + + def __init__(self) -> None: + self._tools: dict[str, Tool] = {} + + def register(self, tool: Tool) -> Tool: + self._tools[tool.name] = tool + return tool + + def get(self, name: str) -> Tool: + if name not in self._tools: + raise KeyError(f"unknown tool {name!r}; available: {self.names()}") + return self._tools[name] + + def names(self) -> list[str]: + return sorted(self._tools) + + def __iter__(self): + return iter(self._tools.values()) + + def __contains__(self, name: str) -> bool: + return name in self._tools + + +# --- calculator ------------------------------------------------------------ + +# Explicit allow-list of operators; anything absent here is rejected by default, +# which is the safe failure mode for evaluating untrusted expressions. +_BIN_OPS: dict[type[ast.operator], Callable[[Any, Any], Any]] = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod, + ast.Pow: operator.pow, +} +_UNARY_OPS: dict[type[ast.unaryop], Callable[[Any], Any]] = { + ast.UAdd: operator.pos, + ast.USub: operator.neg, +} + +# Resource-exhaustion bounds. Rejecting names/calls is not enough: pure arithmetic +# can still be a denial-of-service. `2**2**2**2**2**2` is valid arithmetic that asks +# for a number with ~10^19728 digits and hangs the process. These caps keep every +# accepted expression cheap to evaluate; anything past them is refused, not computed. +_MAX_EXPR_LEN = 256 # reject pathological inputs before they reach the parser +_MAX_POW_EXPONENT = 1000 # `a ** b` with |b| above this is refused (no giant ints) +_MAX_RESULT_BITS = 8192 # cap any intermediate integer's magnitude (~2466 digits) + + +def _check_pow(exponent: Any) -> None: + """Refuse exponents large enough to make `**` build an astronomically large int.""" + if isinstance(exponent, int) and abs(exponent) > _MAX_POW_EXPONENT: + raise ValueError(f"unsafe expression: exponent {exponent} exceeds {_MAX_POW_EXPONENT}") + + +def _check_magnitude(value: Any) -> None: + """Refuse intermediate ints whose magnitude could exhaust memory/CPU downstream.""" + if isinstance(value, int) and value.bit_length() > _MAX_RESULT_BITS: + raise ValueError("unsafe expression: intermediate result too large") + + +def _eval_node(node: ast.AST) -> Any: + """Recursively evaluate only the arithmetic AST nodes we allow.""" + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _BIN_OPS: + left = _eval_node(node.left) + right = _eval_node(node.right) + if type(node.op) is ast.Pow: + _check_pow(right) + result = _BIN_OPS[type(node.op)](left, right) + _check_magnitude(result) + return result + if isinstance(node, ast.UnaryOp) and type(node.op) in _UNARY_OPS: + return _UNARY_OPS[type(node.op)](_eval_node(node.operand)) + raise ValueError(f"unsafe expression: {ast.dump(node)}") + + +def calculate(expression: str) -> str: + """Evaluate an arithmetic expression without eval(); names/calls are rejected. + + The tool's contract is total: it either returns a numeric string or raises + ValueError. Resource-exhaustion and numeric-domain failures (overflow, + divide-by-zero) are normalized to ValueError so the CallGate sees a clean + denial rather than a hang or an uncaught error type. + """ + if len(expression) > _MAX_EXPR_LEN: + raise ValueError(f"unsafe expression: too long ({len(expression)} > {_MAX_EXPR_LEN})") + try: + tree = ast.parse(expression, mode="eval") + except (SyntaxError, ValueError, MemoryError, RecursionError) as e: + raise ValueError(f"unsafe expression: {e}") from e + try: + return str(_eval_node(tree.body)) + except (OverflowError, ZeroDivisionError) as e: + raise ValueError(f"unsafe expression: {e}") from e + + +# --- file_read ------------------------------------------------------------- + +# Windows reinterprets these basenames as DEVICES regardless of directory or +# extension: 'CON', 'CON.txt', and 'sub/NUL' all open a device, not a file. Such a +# path passes the sandbox containment check (it resolves nominally *inside* root) +# yet escapes it at open() time — 'CON' blocks forever waiting on console input +# (DoS), 'NUL'/'COMn'/'LPTn' touch real devices. So they must be refused by name. +_WINDOWS_RESERVED_DEVICES = frozenset({ + "CON", "PRN", "AUX", "NUL", + *(f"COM{i}" for i in range(1, 10)), + *(f"LPT{i}" for i in range(1, 10)), +}) + + +def _is_reserved_device(path: Path) -> bool: + """True if any path component is a Windows reserved device name.""" + for part in path.parts: + if part.split(".")[0].strip().upper() in _WINDOWS_RESERVED_DEVICES: + return True + return False + + +def _make_read_file(sandbox_root: Path) -> Callable[[str], str]: + """Build a reader bound to one sandbox root; the closure is the boundary.""" + root = sandbox_root.resolve() + + def read_file(filename: str) -> str: + # Resolve THEN verify containment so ../ traversal and absolute paths + # both collapse to a path we can prefix-check against the real root. + target = (root / filename).resolve() + if root != target and root not in target.parents: + raise PermissionError(f"path escapes sandbox: {filename}") + # Containment is necessary but not sufficient on Windows: a reserved + # device name resolves inside root yet opens a device, not a file. + if _is_reserved_device(Path(filename)) or _is_reserved_device(target): + raise PermissionError(f"reserved device name refused: {filename}") + return target.read_text(encoding="utf-8") + + return read_file + + +# --- web_search ------------------------------------------------------------ + +# Deterministic mock: same query always yields the same string (MVP requires +# reproducibility, and we do not make real network calls). +_CANNED_ANSWERS: dict[str, str] = { + "what is a capability": ( + "A capability is an unforgeable token that both names a resource and " + "grants authority to use it." + ), + "authgate": ( + "authgate is a capability-constrained authorization kernel that gates " + "agent tool execution." + ), +} + + +def web_search(query: str) -> str: + key = query.strip().lower() + if key in _CANNED_ANSWERS: + return _CANNED_ANSWERS[key] + return f"[mock] no indexed results for {query!r}" + + +# --- default registry ------------------------------------------------------ + +def build_default_tools(sandbox_root: Path) -> ToolRegistry: + """Registry of the 3 MVP tools, each wired to the capability it requires.""" + registry = ToolRegistry() + registry.register(Tool( + name="calculator", fn=calculate, + resource=Resource("compute", ResourceType.COMPUTE_SLOT), mode="read", + )) + registry.register(Tool( + name="file_read", fn=_make_read_file(sandbox_root), + resource=Resource("sandbox-fs", ResourceType.FILE, scope=str(sandbox_root)), + mode="read", + )) + registry.register(Tool( + name="web_search", fn=web_search, + resource=Resource("web", ResourceType.NETWORK_ENDPOINT), mode="read", + )) + return registry diff --git a/tests/test_runtime_agent.py b/tests/test_runtime_agent.py new file mode 100644 index 0000000..91b0137 --- /dev/null +++ b/tests/test_runtime_agent.py @@ -0,0 +1,173 @@ +"""Integration tests for authgate.runtime.agent — AgentRuntime + build_runtime.""" +from __future__ import annotations + +import pytest + +from authgate.kernel.audit import AuditLog +from authgate.runtime.agent import build_runtime +from authgate.runtime.planner import MockPlanner, PlanStep, ScriptedPlanner + + +@pytest.fixture +def sandbox(tmp_path): + (tmp_path / "a.txt").write_text("FILE CONTENT", encoding="utf-8") + return tmp_path + + +# --- granted tool executes ------------------------------------------------- + +def test_granted_calculator_permits_and_produces_output(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "2 + 2"})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("calc") + + assert result.permitted_count == 1 + assert result.denied_count == 0 + assert result.outputs() == ["4"] + assert result.stopped_early is False + + +def test_granted_execution_records_permit_entry_in_run_log(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "2 + 2"})]) + runtime, _ = build_runtime(planner, sandbox) + runtime.run("calc") + + entries = runtime.run_log.entries() + assert len(entries) == 1 + assert entries[0]["decision"] == "permit" + assert entries[0]["output"] == "4" + assert entries[0]["tool"] == "calculator" + + +def test_granted_file_read_returns_real_content(sandbox): + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("read a.txt") + assert result.outputs() == ["FILE CONTENT"] + + +# --- ungranted tool denied; tool body never runs --------------------------- + +def test_ungranted_tool_is_denied_and_body_never_runs(sandbox): + # file_read NOT granted -> gate denies before the tool fn executes. + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox, granted_tools=["calculator"]) + result = runtime.run("read a.txt") + + outcome = result.outcomes[0] + assert outcome.permitted is False + assert outcome.result.output is None # tool body never produced output + assert "capability" in outcome.result.denied_reason + assert result.denied_count == 1 + + +def test_ungranted_denial_logged_with_no_output(sandbox): + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox, granted_tools=["calculator"]) + runtime.run("x") + entry = runtime.run_log.entries()[0] + assert entry["decision"] == "deny" + assert entry["output"] is None + + +# --- denied step stops execution ------------------------------------------- + +def test_denied_step_halts_remaining_plan(sandbox): + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), # granted + PlanStep("file_read", {"filename": "a.txt"}), # NOT granted -> deny + PlanStep("web_search", {"query": "authgate"}), # must never run + ] + runtime, _ = build_runtime( + ScriptedPlanner(plan), sandbox, + granted_tools=["calculator", "web_search"], + ) + result = runtime.run("multi") + + assert len(result.outcomes) == 2 # third step never executed + assert result.stopped_early is True + assert result.permitted_count == 1 + assert result.denied_count == 1 + # web_search never appears in the run log + logged_tools = [e["tool"] for e in runtime.run_log.entries()] + assert "web_search" not in logged_tools + + +def test_denied_last_step_does_not_set_stopped_early(sandbox): + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), # granted + PlanStep("file_read", {"filename": "a.txt"}), # denied, but is last + ] + runtime, _ = build_runtime( + ScriptedPlanner(plan), sandbox, granted_tools=["calculator"] + ) + result = runtime.run("two") + assert len(result.outcomes) == 2 + assert result.stopped_early is False # nothing after the denial + + +# --- reproducibility ------------------------------------------------------- + +def test_two_fresh_runtimes_produce_equal_outputs(sandbox): + intent = "calculate 2 + 2 and search authgate" + runtime_a, _ = build_runtime(MockPlanner(), sandbox) + runtime_b, _ = build_runtime(MockPlanner(), sandbox) + assert runtime_a.run(intent).outputs() == runtime_b.run(intent).outputs() + + +def test_repeated_run_same_runtime_is_reproducible(sandbox): + intent = "search authgate" + runtime, _ = build_runtime(MockPlanner(), sandbox) + assert runtime.run(intent).outputs() == runtime.run(intent).outputs() + + +# --- unknown tool ---------------------------------------------------------- + +def test_unknown_tool_is_denied_without_crash(sandbox): + planner = ScriptedPlanner([PlanStep("nonexistent_tool", {})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("x") + + outcome = result.outcomes[0] + assert outcome.permitted is False + assert "unknown tool" in outcome.result.denied_reason + assert result.denied_count == 1 + + +# --- revocation ------------------------------------------------------------ + +def test_revocation_flips_permit_to_deny(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "3 + 3"})]) + runtime, registry = build_runtime(planner, sandbox, freeze=False) + + first = runtime.run("calc") + assert first.outcomes[0].permitted is True + + registry.revoke_all("agent-1") + + second = runtime.run("calc") + assert second.outcomes[0].permitted is False + + +# --- audit ----------------------------------------------------------------- + +def test_audit_chain_is_valid_after_run(sandbox): + audit = AuditLog() + planner = MockPlanner() + runtime, _ = build_runtime(planner, sandbox, audit_log=audit) + runtime.run("calculate 2 + 2 and search authgate") + + assert len(audit) > 0 + assert audit.verify_chain() is True + + +def test_audit_records_one_entry_per_step(sandbox): + audit = AuditLog() + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), + PlanStep("web_search", {"query": "authgate"}), + ] + runtime, _ = build_runtime(ScriptedPlanner(plan), sandbox, audit_log=audit) + runtime.run("two steps") + assert len(audit) == 2 + assert audit.verify_chain() is True diff --git a/tests/test_runtime_planner.py b/tests/test_runtime_planner.py new file mode 100644 index 0000000..741a13d --- /dev/null +++ b/tests/test_runtime_planner.py @@ -0,0 +1,149 @@ +"""Unit tests for authgate.runtime.planner — MockPlanner rules and ScriptedPlanner.""" +from __future__ import annotations + +import pytest + +from authgate.runtime.planner import MockPlanner, PlanStep, ScriptedPlanner + + +@pytest.fixture +def planner(): + return MockPlanner() + + +def _tools(steps): + return [s.tool for s in steps] + + +# --- determinism ----------------------------------------------------------- + +def test_mock_planner_is_deterministic(planner): + intent = "calculate 2 + 2 and search authgate and read notes.txt" + plans = [planner.plan(intent) for _ in range(3)] + assert plans[0] == plans[1] == plans[2] + + +# --- arithmetic rule ------------------------------------------------------- + +def test_arithmetic_keyword_triggers_calculator(planner): + steps = planner.plan("please compute the result") + assert _tools(steps) == ["calculator"] + + +def test_bare_expression_triggers_calculator_and_extracts_it(planner): + steps = planner.plan("2 + 3 * 4") + assert steps[0].tool == "calculator" + assert steps[0].args == {"expression": "2 + 3 * 4"} + + +def test_arithmetic_keyword_with_expression_extracts_expression(planner): + steps = planner.plan("calculate 5 * 5") + assert steps[0].args == {"expression": "5 * 5"} + + +def test_arithmetic_keyword_without_expression_falls_back_to_zero(planner): + steps = planner.plan("compute the sum") + assert steps[0].tool == "calculator" + assert steps[0].args == {"expression": "0"} + + +# --- search rule ----------------------------------------------------------- + +@pytest.mark.parametrize("word", ["search", "find", "look up", "lookup", "what is"]) +def test_search_keywords_trigger_web_search(planner, word): + steps = planner.plan(f"{word} something about cats") + assert "web_search" in _tools(steps) + + +def test_search_query_preserves_original_casing(planner): + intent = "Find AuthGate Documentation" + steps = planner.plan(intent) + search = next(s for s in steps if s.tool == "web_search") + assert search.args == {"query": "Find AuthGate Documentation"} + + +def test_search_query_is_stripped(planner): + steps = planner.plan(" search for kernels ") + search = next(s for s in steps if s.tool == "web_search") + assert search.args == {"query": "search for kernels"} + + +# --- file rule ------------------------------------------------------------- + +@pytest.mark.parametrize("word", ["read", "open", "file", "cat "]) +def test_file_keywords_trigger_file_read(planner, word): + steps = planner.plan(f"{word} something") + assert "file_read" in _tools(steps) + + +def test_filename_extraction_picks_dotted_token(planner): + steps = planner.plan("read notes.txt now") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "notes.txt"} + + +def test_filename_extraction_picks_slashed_token(planner): + steps = planner.plan("cat data/log.csv") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "data/log.csv"} + + +def test_filename_extraction_falls_back_when_no_filename_token(planner): + steps = planner.plan("open the file") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "notes.txt"} + + +# --- multi-rule ordering --------------------------------------------------- + +def test_multi_rule_intent_orders_arith_search_file(planner): + intent = "compute 5*5 and search authgate and read a.txt" + steps = planner.plan(intent) + assert _tools(steps) == ["calculator", "web_search", "file_read"] + + +def test_multi_rule_intent_args_are_correct(planner): + intent = "compute 5*5 and search authgate and read a.txt" + steps = planner.plan(intent) + assert steps[0].args == {"expression": "5*5"} + assert steps[1].args == {"query": intent} + assert steps[2].args == {"filename": "a.txt"} + + +# --- no match -------------------------------------------------------------- + +def test_no_match_yields_empty_plan(planner): + assert planner.plan("hello world good morning") == [] + + +# --- ScriptedPlanner ------------------------------------------------------- + +def test_scripted_planner_returns_equal_but_distinct_list(): + steps = [PlanStep("calculator", {"expression": "1 + 1"})] + sp = ScriptedPlanner(steps) + out = sp.plan("ignored intent") + assert out == steps + assert out is not steps + + +def test_scripted_planner_ignores_intent(): + sp = ScriptedPlanner([PlanStep("web_search", {"query": "x"})]) + assert sp.plan("intent A") == sp.plan("completely different intent B") + + +def test_scripted_planner_mutating_returned_list_does_not_affect_source(): + steps = [PlanStep("calculator", {"expression": "1 + 1"})] + sp = ScriptedPlanner(steps) + out = sp.plan("x") + out.append(PlanStep("web_search", {"query": "extra"})) + assert len(sp.plan("x")) == 1 + + +def test_scripted_planner_preserves_exact_step_sequence(): + plan = [ + PlanStep("calculator", {"expression": "2+2"}), + PlanStep("file_read", {"filename": "a.txt"}), + PlanStep("web_search", {"query": "authgate"}), + ] + sp = ScriptedPlanner(plan) + assert sp.plan("anything") == plan diff --git a/tests/test_runtime_redteam.py b/tests/test_runtime_redteam.py new file mode 100644 index 0000000..c09bc0c --- /dev/null +++ b/tests/test_runtime_redteam.py @@ -0,0 +1,120 @@ +""" +Pytest wrapper around the standalone runtime red-team harness. + +This asserts the security property the harness exists to prove: against 1000 +deterministically-generated adversarial engineers, the capability-gated runtime +holds — ZERO escapes. Any escape is a hard failure with a helpful listing. + +The heavy lifting (attacks, sandbox fixtures, determinism) lives in +``redteam/runtime_redteam.py``; this file imports it and asserts the report. +``tests/conftest`` is not relied upon for path setup — the harness wires ``src`` +onto sys.path itself, and we add the repo root so ``redteam`` is importable. +""" +from __future__ import annotations + +import importlib.util +import os +import sys +from pathlib import Path + +import pytest + +# The runtime layer's backend under test is the pure-Python kernel. +os.environ.setdefault("AUTHGATE_BACKEND", "python") + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SRC = _REPO_ROOT / "src" +for _p in (str(_SRC), str(_REPO_ROOT)): + if _p not in sys.path: + sys.path.insert(0, _p) + + +def _load_harness(): + """Import the standalone harness by file path (it lives outside any package).""" + harness_path = _REPO_ROOT / "redteam" / "runtime_redteam.py" + spec = importlib.util.spec_from_file_location("runtime_redteam", harness_path) + assert spec and spec.loader, f"cannot load harness at {harness_path}" + module = importlib.util.module_from_spec(spec) + # Register before exec: @dataclass resolves field types via + # sys.modules[cls.__module__], which is None until the module is registered. + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +_harness = _load_harness() + + +def _rust_available() -> bool: + """True iff the compiled verified-kernel extension can be imported.""" + from authgate.runtime.rust_backend import rust_backend_available + return rust_backend_available() + + +@pytest.fixture(scope="module") +def report(): + return _harness.run_redteam(1000, master_seed=1337) + + +def test_no_escapes(report): + """The headline guarantee: every one of the 1000 attacks is blocked.""" + if report.escapes: + lines = [ + f" #{e.profile.id} {e.profile.name} [{e.category}] " + f"({e.profile.skill} {e.profile.archetype}): {e.detail}" + for e in report.escapes + ] + pytest.fail( + f"{len(report.escapes)} ESCAPE(S) — the runtime FAILED to resist:\n" + + "\n".join(lines) + ) + assert report.escapes == [] + + +def test_total_is_1000(report): + assert report.total == 1000 + assert report.blocked == 1000 + + +def test_every_category_fully_blocked(report): + """Each attack category must be exercised (total>0) and fully held.""" + assert set(report.by_category) == set(_harness.ATTACK_CATEGORIES) + for category, (blocked, total) in report.by_category.items(): + assert total > 0, f"category {category} had no engineers" + assert blocked == total, ( + f"category {category}: only {blocked}/{total} blocked" + ) + + +def test_determinism(): + """Fixed seed -> identical report (same totals and per-category breakdown).""" + a = _harness.run_redteam(200, master_seed=2024) + b = _harness.run_redteam(200, master_seed=2024) + assert a.total == b.total + assert a.blocked == b.blocked + assert a.by_category == b.by_category + assert [r.profile.id for r in a.escapes] == [r.profile.id for r in b.escapes] + + +def test_no_real_file_leaked(report): + """Belt-and-suspenders: sandbox-escape attacks must never surface real OS + file content. The harness already asserts this per-attack; here we re-derive + the guarantee from the report's success and the leak markers it checks.""" + sandbox_blocked, sandbox_total = report.by_category["SANDBOX_ESCAPE"] + assert sandbox_total > 0 + assert sandbox_blocked == sandbox_total + # The markers the harness scans for include 'root:' (the /etc/passwd tell). + assert "root:" in str(_harness._LEAK_MARKERS).lower() + + +@pytest.mark.skipif( + not _rust_available(), + reason="verified Rust extension (authgate_kernel) not built in this environment", +) +def test_no_escapes_on_verified_rust_backend(): + """The same adversaries, but every permit/deny decision is made by the + formally-verified Rust engine. Zero escapes is the bar regardless of backend.""" + report = _harness.run_redteam(1000, master_seed=1337, backend="rust") + assert report.total == 1000 + assert report.blocked == 1000 + assert report.escapes == [] diff --git a/tests/test_runtime_run_log.py b/tests/test_runtime_run_log.py new file mode 100644 index 0000000..b9dc8a4 --- /dev/null +++ b/tests/test_runtime_run_log.py @@ -0,0 +1,98 @@ +"""Unit tests for authgate.runtime.run_log.RunLog.""" +from __future__ import annotations + +import json + +from authgate.runtime.run_log import RunLog + + +def test_record_permit_entry_shape(): + log = RunLog() + log.record("agent-1", 0, "calculator", {"expression": "1+1"}, True, output="2") + entry = log.entries()[0] + assert entry["agent_id"] == "agent-1" + assert entry["step"] == 0 + assert entry["tool"] == "calculator" + assert entry["args"] == {"expression": "1+1"} + assert entry["decision"] == "permit" + assert entry["output"] == "2" + assert entry["denied_reason"] is None + assert "ts" in entry + + +def test_record_deny_entry_shape(): + log = RunLog() + log.record( + "agent-1", 1, "file_read", {"filename": "a.txt"}, False, + output="should be dropped", denied_reason="capability gate denied", + ) + entry = log.entries()[0] + assert entry["decision"] == "deny" + assert entry["output"] is None # output suppressed on denial + assert entry["denied_reason"] == "capability gate denied" + + +def test_record_output_is_stringified(): + log = RunLog() + log.record("a", 0, "calculator", {}, True, output=42) + assert log.entries()[0]["output"] == "42" + + +def test_record_output_truncated_at_2000_chars(): + log = RunLog() + big = "x" * 5000 + log.record("a", 0, "tool", {}, True, output=big) + assert len(log.entries()[0]["output"]) == 2000 + + +def test_record_output_under_limit_not_truncated(): + log = RunLog() + payload = "y" * 1999 + log.record("a", 0, "tool", {}, True, output=payload) + assert log.entries()[0]["output"] == payload + + +def test_entries_returns_copy(): + log = RunLog() + log.record("a", 0, "tool", {}, True, output="ok") + snapshot = log.entries() + snapshot.append({"injected": True}) + assert len(log.entries()) == 1 # internal state unaffected + + +def test_len_tracks_record_count(): + log = RunLog() + assert len(log) == 0 + log.record("a", 0, "tool", {}, True, output="ok") + log.record("a", 1, "tool", {}, False, denied_reason="no") + assert len(log) == 2 + + +def test_path_mode_writes_valid_jsonl(tmp_path): + log_path = tmp_path / "run.jsonl" + log = RunLog(path=str(log_path)) + log.record("a", 0, "calculator", {"expression": "1+1"}, True, output="2") + log.record("a", 1, "file_read", {"filename": "x"}, False, denied_reason="denied") + + lines = log_path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 2 + parsed = [json.loads(line) for line in lines] + assert parsed[0]["decision"] == "permit" + assert parsed[0]["output"] == "2" + assert parsed[1]["decision"] == "deny" + assert parsed[1]["output"] is None + assert parsed[1]["denied_reason"] == "denied" + + +def test_path_mode_appends_across_records(tmp_path): + log_path = tmp_path / "run.jsonl" + log = RunLog(path=str(log_path)) + for i in range(3): + log.record("a", i, "tool", {}, True, output=str(i)) + assert len(log_path.read_text(encoding="utf-8").splitlines()) == 3 + + +def test_memory_mode_creates_no_file(tmp_path): + log = RunLog() # path=None + log.record("a", 0, "tool", {}, True, output="ok") + assert list(tmp_path.iterdir()) == [] diff --git a/tests/test_runtime_rust_backend.py b/tests/test_runtime_rust_backend.py new file mode 100644 index 0000000..aaee97b --- /dev/null +++ b/tests/test_runtime_rust_backend.py @@ -0,0 +1,95 @@ +""" +Tests for routing the runtime's authorization decision through the verified Rust +engine (RustBackedVerifier). + +Skipped automatically when the compiled `authgate_kernel` extension is not present +(e.g. CI jobs that do not build it). Where it IS present, these assert that the +Rust-backed decision matches the pure-Python verifier across the cases that +matter — including the epoch-revocation semantics the wire format does not model +natively (preserved by the adapter's valid-claim filter). +""" +from __future__ import annotations + +import pytest + +from authgate.runtime.rust_backend import rust_backend_available + +pytestmark = pytest.mark.skipif( + not rust_backend_available(), + reason="verified Rust extension (authgate_kernel) not built in this environment", +) + +from authgate.kernel.entities import ( # noqa: E402 + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry # noqa: E402 +from authgate.kernel.verifier import Action, FreedomVerifier # noqa: E402 +from authgate.runtime.rust_backend import RustBackedVerifier # noqa: E402 + + +def _scenario(grant: bool = True): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + if grant: + registry.add_claim(RightsClaim(owner, resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, resource, can_read=True), delegated_by=owner) + action = Action("t1", agent, resources_read=[resource]) + return registry, action, agent, resource + + +def test_rust_permits_when_granted(): + registry, action, _, _ = _scenario(grant=True) + assert RustBackedVerifier(registry).verify(action).permitted + + +def test_rust_denies_when_not_granted(): + registry, action, _, _ = _scenario(grant=False) + result = RustBackedVerifier(registry).verify(action) + assert not result.permitted + assert result.violations + + +@pytest.mark.parametrize("grant", [True, False]) +def test_rust_decision_matches_python(grant): + registry, action, _, _ = _scenario(grant=grant) + rust = RustBackedVerifier(registry).verify(action).permitted + python = FreedomVerifier(registry).verify(action).permitted + assert rust == python + + +def test_rust_unowned_machine_denied(): + # No register_machine -> A4 ownership violation, decided by the Rust engine. + agent = Entity("orphan", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.add_claim(RightsClaim(agent, resource, can_read=True)) + action = Action("t1", agent, resources_read=[resource]) + assert not RustBackedVerifier(registry).verify(action).permitted + + +def test_rust_epoch_revocation_preserved(): + # The wire format has no epoch; the adapter preserves it by filtering claims. + registry, _, agent, resource = _scenario(grant=True) + old_epoch_action = Action("t1", agent, resources_read=[resource], min_epoch=2) + # Claims default to epoch=1, so a min_epoch=2 action must be denied... + assert not RustBackedVerifier(registry, freeze=False).verify(old_epoch_action).permitted + # ...until the registry advances the epoch (reissues claims at epoch 2). + registry.advance_epoch(2) + assert RustBackedVerifier(registry, freeze=False).verify(old_epoch_action).permitted + + +def test_rust_records_to_audit_log(): + from authgate.kernel.audit import AuditLog + + registry, action, _, _ = _scenario(grant=True) + audit = AuditLog() + RustBackedVerifier(registry, audit_log=audit).verify(action) + assert len(audit._records) == 1 + assert audit.verify_chain() is True diff --git a/tests/test_runtime_rust_wire.py b/tests/test_runtime_rust_wire.py new file mode 100644 index 0000000..74b39b6 --- /dev/null +++ b/tests/test_runtime_rust_wire.py @@ -0,0 +1,97 @@ +""" +Tests for the pure JSON-wire serialization that feeds the verified Rust engine. + +These need no compiled extension — they exercise the marshalling functions and +the valid-claim filter directly, so they run everywhere (including CI without the +extension) and pin the exact shape the engine consumes. +""" +from __future__ import annotations + +from authgate.kernel.entities import ( + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action +from authgate.runtime.rust_backend import ( + _action_wire, + _claim_wire, + _entity_wire, + _live_claims, + _resource_wire, +) + + +def test_entity_wire_maps_kind(): + assert _entity_wire(Entity("a", AgentType.MACHINE)) == {"name": "a", "kind": "MACHINE"} + assert _entity_wire(Entity("op", AgentType.HUMAN))["kind"] == "HUMAN" + + +def test_resource_wire_shape(): + r = Resource("sales", ResourceType.DATASET, scope="/data/", is_public=True) + w = _resource_wire(r) + assert w["name"] == "sales" + assert w["rtype"] == "DATASET" + assert w["scope"] == "/data/" + assert w["is_public"] is True + + +def test_claim_wire_carries_rights_and_confidence(): + c = RightsClaim( + Entity("a", AgentType.MACHINE), + Resource("compute", ResourceType.COMPUTE_SLOT), + can_read=True, can_delegate=True, confidence=0.9, + ) + w = _claim_wire(c) + assert w["can_read"] is True + assert w["can_delegate"] is True + assert w["confidence"] == 0.9 + assert w["holder"]["name"] == "a" + + +def test_action_wire_includes_resources_and_flags(): + agent = Entity("a", AgentType.MACHINE) + res = Resource("compute", ResourceType.COMPUTE_SLOT) + action = Action("act1", agent, resources_read=[res], coerces=True) + w = _action_wire(action) + assert w["action_id"] == "act1" + assert w["actor"]["name"] == "a" + assert len(w["resources_read"]) == 1 + assert w["coerces"] is True + assert w["bypasses_verifier"] is False + + +def _registry_with_claim(epoch: int = 1): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + res = Resource("compute", ResourceType.COMPUTE_SLOT) + reg = OwnershipRegistry() + reg.register_machine(agent, owner) + reg.add_claim(RightsClaim(owner, res, can_read=True, can_delegate=True)) + reg.delegate(RightsClaim(agent, res, can_read=True), delegated_by=owner) + return reg + + +def test_live_claims_includes_valid_claims(): + reg = _registry_with_claim() + live = _live_claims(reg, min_epoch=0) + # owner's claim + delegated agent claim, both valid at epoch 0 + assert len(live) == 2 + + +def test_live_claims_filters_by_epoch(): + reg = _registry_with_claim() + # claims default to epoch 1; requiring epoch 2 filters them all out + assert _live_claims(reg, min_epoch=2) == [] + reg.advance_epoch(2) + assert len(_live_claims(reg, min_epoch=2)) == 2 + + +def test_live_claims_excludes_revoked(): + reg = _registry_with_claim() + reg.revoke_all("agent-1") + holders = {c.holder.name for c in _live_claims(reg, min_epoch=0)} + assert "agent-1" not in holders diff --git a/tests/test_runtime_sandbox.py b/tests/test_runtime_sandbox.py new file mode 100644 index 0000000..bc88465 --- /dev/null +++ b/tests/test_runtime_sandbox.py @@ -0,0 +1,166 @@ +""" +Tests for the real, OS-enforced tool sandbox (process isolation + limits). + +These prove the sandbox actually *contains* a tool rather than just checking its +inputs: a normal tool runs and returns; a hostile path is refused; a runaway +(hanging) tool is killed by the wall-clock deadline rather than blocking forever; +oversized output is capped. +""" +from __future__ import annotations + +import io +import json +import os +import time + +import pytest + +from authgate.runtime import _sandbox_runner as runner +from authgate.runtime.sandbox import SandboxPolicy, run_tool_sandboxed + +# --- sandbox child runner (in-process, so coverage sees it) ---------------- + +def _run_job(monkeypatch, capsys, job: dict) -> dict: + monkeypatch.setattr("sys.stdin", io.StringIO(json.dumps(job))) + rc = runner.main() + assert rc == 0 + out = capsys.readouterr().out + return json.loads(out.split(runner.RESULT_MARKER)[-1].strip()) + + +def test_runner_executes_importable_entry(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:calculate", + "args": {"expression": "6 * 7"}, "limits": {}, + }) + assert res["ok"] and res["output"] == "42" + + +def test_runner_executes_file_read_builtin(tmp_path, monkeypatch, capsys): + (tmp_path / "n.txt").write_text("inside", encoding="utf-8") + res = _run_job(monkeypatch, capsys, { + "builtin": "file_read", "sandbox_root": str(tmp_path), + "args": {"filename": "n.txt"}, "limits": {}, + }) + assert res["ok"] and res["output"] == "inside" + + +def test_runner_reports_tool_error_as_not_ok(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:calculate", + "args": {"expression": "open('x')"}, "limits": {}, + }) + assert not res["ok"] and "ValueError" in res["error"] + + +def test_runner_rejects_jobless_request(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, {"args": {}, "limits": {}}) + assert not res["ok"] + + +def test_runner_handles_bad_json(monkeypatch, capsys): + monkeypatch.setattr("sys.stdin", io.StringIO("{not json")) + runner.main() + out = capsys.readouterr().out + assert not json.loads(out.split(runner.RESULT_MARKER)[-1].strip())["ok"] + + +def test_runner_truncates_output(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:web_search", + "args": {"query": "Z" * 4000}, "limits": {"max_output_bytes": 50}, + }) + assert res["ok"] and len(res["output"]) <= 50 and res["truncated"] + + +def test_sandbox_runs_calculator_in_subprocess(tmp_path): + r = run_tool_sandboxed("calculator", {"expression": "2 + 3 * 4"}, tmp_path, SandboxPolicy()) + assert r.ok + assert r.output == "14" + + +def test_sandbox_reads_file_inside_root(tmp_path): + (tmp_path / "a.txt").write_text("hello", encoding="utf-8") + r = run_tool_sandboxed("file_read", {"filename": "a.txt"}, tmp_path, SandboxPolicy()) + assert r.ok + assert r.output == "hello" + + +def test_sandbox_denies_path_escape(tmp_path): + r = run_tool_sandboxed("file_read", {"filename": "../../etc/passwd"}, tmp_path, SandboxPolicy()) + assert not r.ok + assert "escapes sandbox" in (r.error or "") + + +def test_sandbox_kills_a_hanging_tool(tmp_path): + # A helper module the isolated child can import, whose tool hangs far longer + # than the deadline. Real containment means it is killed, not awaited. + helper_dir = tmp_path / "helpers" + helper_dir.mkdir() + (helper_dir / "hangtool.py").write_text( + "import time\n\ndef hang(seconds=60):\n time.sleep(seconds)\n return 'finished'\n", + encoding="utf-8", + ) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join([str(helper_dir), env.get("PYTHONPATH", "")]) + + t0 = time.time() + r = run_tool_sandboxed( + "x", {"seconds": 30}, tmp_path, + SandboxPolicy(wall_timeout_s=2.0), + entry_override="hangtool:hang", env=env, + ) + elapsed = time.time() - t0 + + assert not r.ok + assert r.killed + assert elapsed < 15, f"sandbox did not kill promptly (took {elapsed:.1f}s)" + + +def test_sandbox_caps_oversized_output(tmp_path): + long_query = "Z" * 5000 # web_search echoes the query back; output would be large + r = run_tool_sandboxed( + "web_search", {"query": long_query}, tmp_path, + SandboxPolicy(max_output_bytes=100), + ) + assert r.ok + assert r.output is not None + assert len(r.output) <= 100 + + +def test_sandbox_tool_error_is_denial_not_crash(tmp_path): + # calculator rejects non-arithmetic; through the sandbox that is a clean deny. + r = run_tool_sandboxed("calculator", {"expression": "__import__('os')"}, tmp_path, SandboxPolicy()) + assert not r.ok + assert r.output is None + assert "ValueError" in (r.error or "") + + +def _has_rlimit() -> bool: + try: + import resource # noqa: F401 + return True + except ImportError: + return False + + +@pytest.mark.skipif(not _has_rlimit(), reason="POSIX rlimits unavailable (Windows)") +def test_sandbox_cpu_limit_kills_busy_tool(tmp_path): + # A CPU-burning tool with no sleeps: only an RLIMIT_CPU (not the wall clock, + # set generously here) can stop it. Proves kernel-enforced CPU limiting. + helper_dir = tmp_path / "helpers" + helper_dir.mkdir() + (helper_dir / "burntool.py").write_text( + "def burn(n=0):\n x = 0\n while True:\n x += 1\n", + encoding="utf-8", + ) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join([str(helper_dir), env.get("PYTHONPATH", "")]) + + r = run_tool_sandboxed( + "x", {}, tmp_path, + SandboxPolicy(wall_timeout_s=30.0, cpu_seconds=1), + entry_override="burntool:burn", env=env, + ) + assert not r.ok + assert r.killed diff --git a/tests/test_runtime_tools.py b/tests/test_runtime_tools.py new file mode 100644 index 0000000..b13a53a --- /dev/null +++ b/tests/test_runtime_tools.py @@ -0,0 +1,300 @@ +"""Unit tests for authgate.runtime.tools — calculator, web_search, file_read, registry.""" +from __future__ import annotations + +import pytest + +from authgate.kernel.entities import Resource, ResourceType +from authgate.runtime.tools import ( + Tool, + ToolRegistry, + build_default_tools, + calculate, + web_search, +) + +# --- calculator: happy path ------------------------------------------------ + +def test_calculate_respects_operator_precedence(): + assert calculate("2 + 3 * 4") == "14" + + +def test_calculate_parentheses_override_precedence(): + assert calculate("(2 + 3) * 4") == "20" + + +def test_calculate_floats(): + assert calculate("3.5 * 2") == "7.0" + + +def test_calculate_floor_division(): + assert calculate("7 // 2") == "3" + + +def test_calculate_modulo(): + assert calculate("7 % 3") == "1" + + +def test_calculate_power(): + assert calculate("2 ** 3") == "8" + + +def test_calculate_unary_minus(): + assert calculate("-5") == "-5" + + +def test_calculate_unary_plus(): + assert calculate("+5") == "5" + + +def test_calculate_true_division_is_float(): + assert calculate("6 / 4") == "1.5" + + +def test_calculate_nested_expression(): + assert calculate("2 + (3 * (4 - 1))") == "11" + + +# --- calculator: failure / hostile input ----------------------------------- + +@pytest.mark.parametrize( + "expr", + [ + "__import__('os')", # function call + name + "os.system('x')", # attribute access + call + "abs(-1)", # function call + "foo", # bare name + "a.b", # attribute access + "1 < 2", # comparison + "1 == 1", # comparison + "", # empty string + "1 +", # trailing junk / syntax error + "[1, 2, 3]", # list literal + "1 if True else 2", # conditional expression + ], +) +def test_calculate_rejects_unsafe_expressions(expr): + with pytest.raises(ValueError): + calculate(expr) + + +# --- calculator: resource-exhaustion (DoS) bounds -------------------------- +# These are valid arithmetic that, unbounded, hang the process building giant +# integers. They must be REFUSED (ValueError), not computed. A regression here +# would hang the test run, so each must return essentially instantly. + +@pytest.mark.parametrize( + "expr", + [ + "2 ** 2 ** 2 ** 2 ** 2 ** 2", # ~10^19728 digits: the classic pow-bomb + "9 ** 9 ** 9", # right-assoc tower + "10 ** 100000", # single huge exponent + "1000 ** 1000", # magnitude blows the result-bits cap + ], +) +def test_calculate_rejects_resource_exhaustion(expr): + with pytest.raises(ValueError): + calculate(expr) + + +def test_calculate_rejects_overlong_expression(): + with pytest.raises(ValueError, match="too long"): + calculate("(" * 200 + "1" + ")" * 200) + + +def test_calculate_normalizes_divide_by_zero_to_valueerror(): + with pytest.raises(ValueError): + calculate("1 / 0") + + +def test_calculate_allows_reasonable_power(): + # Just under the exponent cap must still compute fine and fast. + assert calculate("2 ** 10") == "1024" + + +# --- web_search ------------------------------------------------------------ + +def test_web_search_is_deterministic_for_same_query(): + assert web_search("some arbitrary query") == web_search("some arbitrary query") + + +def test_web_search_canned_authgate_answer(): + result = web_search("authgate") + assert "capability-constrained authorization kernel" in result + + +def test_web_search_canned_capability_answer(): + result = web_search("what is a capability") + assert "unforgeable token" in result + + +def test_web_search_canned_is_case_and_space_insensitive(): + baseline = web_search("authgate") + assert web_search("AuthGate") == baseline + assert web_search(" AUTHGATE ") == baseline + + +def test_web_search_unknown_query_returns_mock_marker(): + query = "zzz totally unindexed thing" + result = web_search(query) + assert result == f"[mock] no indexed results for {query!r}" + + +def test_web_search_distinct_queries_differ(): + assert web_search("authgate") != web_search("what is a capability") + + +# --- file_read ------------------------------------------------------------- + +def _file_read_fn(sandbox_root): + return build_default_tools(sandbox_root).get("file_read").fn + + +def test_file_read_reads_file_inside_sandbox(tmp_path): + (tmp_path / "hello.txt").write_text("hello world", encoding="utf-8") + read_file = _file_read_fn(tmp_path) + assert read_file("hello.txt") == "hello world" + + +def test_file_read_reads_benign_subdirectory(tmp_path): + sub = tmp_path / "sub" + sub.mkdir() + (sub / "nested.txt").write_text("nested content", encoding="utf-8") + read_file = _file_read_fn(tmp_path) + assert read_file("sub/nested.txt") == "nested content" + + +def test_file_read_rejects_parent_traversal(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("../etc") + + +def test_file_read_rejects_deep_parent_traversal(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("../../secret") + + +def test_file_read_rejects_posix_absolute_path(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("/etc/passwd") + + +def test_file_read_rejects_windows_absolute_path(tmp_path): + # On Windows this is a drive-absolute path that escapes the sandbox + # (PermissionError). On POSIX, 'C:\\...' is merely a nonexistent in-sandbox + # filename (FileNotFoundError). Either way, no real file is read. + read_file = _file_read_fn(tmp_path) + with pytest.raises((PermissionError, FileNotFoundError)): + read_file("C:\\Windows\\win.ini") + + +def test_file_read_missing_file_inside_sandbox_raises_filenotfound(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(FileNotFoundError): + read_file("does_not_exist.txt") + + +@pytest.mark.parametrize( + "name", + ["CON", "NUL", "PRN", "AUX", "COM1", "LPT1", "CON.txt", "sub/NUL"], +) +def test_file_read_rejects_windows_reserved_devices(tmp_path, name): + # These resolve inside the sandbox but open a device on Windows ('CON' blocks + # forever = DoS). Must be refused by name on every platform for portability. + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="reserved device"): + read_file(name) + + +# --- Tool dataclass -------------------------------------------------------- + +def _dummy_resource(): + return Resource("compute", ResourceType.COMPUTE_SLOT) + + +def test_tool_accepts_read_mode(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="read") + assert tool.mode == "read" + + +def test_tool_accepts_write_mode(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="write") + assert tool.mode == "write" + + +def test_tool_rejects_invalid_mode(): + with pytest.raises(ValueError, match="mode must be one of"): + Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="execute") + + +def test_tool_is_frozen(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="read") + with pytest.raises(Exception): + tool.name = "other" # type: ignore[misc] + + +# --- ToolRegistry ---------------------------------------------------------- + +def _make_tool(name): + return Tool(name=name, fn=lambda: name, resource=_dummy_resource(), mode="read") + + +def test_registry_register_returns_tool(): + registry = ToolRegistry() + tool = _make_tool("alpha") + assert registry.register(tool) is tool + + +def test_registry_get_returns_registered_tool(): + registry = ToolRegistry() + tool = _make_tool("alpha") + registry.register(tool) + assert registry.get("alpha") is tool + + +def test_registry_get_missing_raises_keyerror(): + registry = ToolRegistry() + with pytest.raises(KeyError): + registry.get("nope") + + +def test_registry_names_are_sorted(): + registry = ToolRegistry() + registry.register(_make_tool("gamma")) + registry.register(_make_tool("alpha")) + registry.register(_make_tool("beta")) + assert registry.names() == ["alpha", "beta", "gamma"] + + +def test_registry_contains(): + registry = ToolRegistry() + registry.register(_make_tool("alpha")) + assert "alpha" in registry + assert "missing" not in registry + + +def test_registry_iter_yields_tools(): + registry = ToolRegistry() + a, b = _make_tool("a"), _make_tool("b") + registry.register(a) + registry.register(b) + assert set(registry) == {a, b} + + +# --- build_default_tools --------------------------------------------------- + +def test_build_default_tools_has_three_named_tools(tmp_path): + registry = build_default_tools(tmp_path) + assert registry.names() == ["calculator", "file_read", "web_search"] + + +def test_build_default_tools_all_read_mode(tmp_path): + registry = build_default_tools(tmp_path) + assert all(tool.mode == "read" for tool in registry) + + +def test_build_default_tools_calculator_fn_works(tmp_path): + registry = build_default_tools(tmp_path) + assert registry.get("calculator").fn("1 + 1") == "2"