From 5b619d88b8603ef0eb1b963c34cda807a74f891b Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Mon, 1 Jun 2026 15:13:00 +0300 Subject: [PATCH 1/7] Relicense to source-available, contact-first terms Replace PolyForm-NC/MIT with a contact-first license: read/evaluate only; any other use (incl. educational/research/RDI/commercial) needs prior written permission. Add NOTICE asserting independent authorship by Ali Pourrahim and that no third party holds an IP claim. - pyproject: license->LICENSE, author->Ali Pourrahim, classifier->Other/Proprietary (was MIT) - Cargo.toml: license-file + publish=false + author (was PolyForm) - deny.toml: ignore private crate license, drop PolyForm allow - README: license badge + section updated Co-Authored-By: Claude Opus 4.8 --- LICENSE | 191 +++++++++++++++++++++++++------------ NOTICE | 40 ++++++++ README.md | 11 ++- authgate-kernel/Cargo.toml | 4 +- authgate-kernel/deny.toml | 5 +- pyproject.toml | 6 +- 6 files changed, 188 insertions(+), 69 deletions(-) create mode 100644 NOTICE diff --git a/LICENSE b/LICENSE index dfc499e..1de978f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,61 +1,130 @@ -# PolyForm Noncommercial License 1.0.0 - - - -Required Notice: Copyright © 2026 Mohammadali Jannatkhahdoost (https://github.com/Aliipou) - -## Acceptance - -In order to get any license under these terms, you must agree to them as both strict obligations and conditions to all your licenses. - -## Copyright License - -The licensor grants you a copyright license for the software to do everything you might do with the software that would otherwise infringe the licensor's copyright in it for any permitted purpose. However, you may only distribute the software according to Distribution License and make changes or new works based on the software according to Changes and New Works License. - -## Distribution License - -The licensor grants you an additional copyright license to distribute copies of the software. Your license to distribute covers distributing the software with changes and new works permitted by Changes and New Works License. - -## Notices - -You must ensure that anyone who gets a copy of any part of the software from you also gets a copy of these terms or the URL for them above, as well as copies of any plain-text lines beginning with "Required Notice:" that the licensor provided with the software. - -## Changes and New Works License - -The licensor grants you an additional copyright license to make changes and new works based on the software for any permitted purpose. - -## Patent License - -The licensor grants you a patent license for the software that covers patent claims the licensor can license, or becomes able to license, that you would infringe by using the software. - -## Noncommercial Purposes - -Any noncommercial purpose is a permitted purpose. - -## Personal Uses - -Personal use for research, experiment, and testing for the benefit of public knowledge, personal study, private entertainment, hobby projects, amateur pursuits, or religious observance, without any anticipated commercial application, is use for a permitted purpose. - -## Noncommercial Organizations - -Use by any charitable organization, educational institution, public research organization, public safety or health organization, environmental protection organization, or government institution is use for a permitted purpose regardless of the source of funding or obligations resulting from the funding. - -## Fair Use - -You may have fair use rights for the software under applicable law. These terms do not limit them. - -## No Other Rights - -These terms do not allow you to sublicense or transfer any of your licenses to anyone else, or prevent the licensor from granting licenses to anyone else. These terms do not imply any other licenses. - -## Patent Defense - -If you make any written claim that the software infringes or contributes to infringement of any patent, your patent license for the software granted under these terms ends immediately. If your company makes such a claim, your patent license ends immediately for work on behalf of your company. - -## Violations - -The first time you are notified in writing that you have violated any of these terms, or done anything with the software not covered by your licenses, your licenses can nonetheless continue if you come into full compliance with these terms, and take practical steps to correct past violations, within 32 days of receiving notice. Otherwise, all your licenses end immediately. - -## No Liability - -As far as the law allows, the software comes as is, without any warranty or condition, and the licensor will not be liable to you for any damages arising out of these terms or the use or nature of the software, under any kind of legal claim. +AuthGate-Kernel Source-Available, Contact-First License +Version 1.0 — 2026 + +Required Notice: Copyright © 2026 Ali Pourrahim +(https://github.com/Aliipou). All rights reserved. + +================================================================ +0. SUMMARY (non-binding; the numbered terms below control) +================================================================ +You may READ and privately EVALUATE this source code. You may NOT +use, run, deploy, copy beyond evaluation, modify, distribute, or +build on it — for ANY purpose, by ANY person or organization, +including educational institutions, research/innovation (RDI) +offices, and commercial entities — without the Author's PRIOR +WRITTEN PERMISSION. You may never claim the Software, its ideas, or +any derivative as your own work or as originating from anyone other +than the Author. To request permission, contact the Author first +(Section 7). + +================================================================ +1. DEFINITIONS +================================================================ +"Software" means the source code, documentation, specifications, +designs, and all other material in this repository and any portion +of it. +"Author" / "Licensor" means Ali Pourrahim +(https://github.com/Aliipou), the sole owner of the copyright in +the Software, who created it independently and without the +resources, funding, employment, or coursework of any third party. +"You" means any individual or legal entity exercising any right +under this License. +"Use" means to run, execute, reproduce (beyond Evaluation), +modify, translate, adapt, create derivative works of, distribute, +publicly display, publish, sublicense, sell, incorporate into any +product, service, dataset, model, or research output, or otherwise +exploit the Software in any way. +"Evaluation" means reading the source and making one local copy +solely to study and assess it privately, with no other Use. + +================================================================ +2. RESERVATION OF RIGHTS +================================================================ +The Software is the exclusive property of the Author. All rights +not expressly granted in Section 3 are reserved. No right or +license is granted by implication, estoppel, or otherwise. + +================================================================ +3. LIMITED PERMISSION GRANTED +================================================================ +Subject to full compliance with this License, the Author grants You +a personal, non-exclusive, non-transferable, revocable permission +to perform Evaluation of the Software. This is the ONLY permission +granted without prior written authorization. + +================================================================ +4. PERMISSION REQUIRED FOR ALL OTHER USE +================================================================ +Any Use of the Software beyond Evaluation is prohibited unless and +until the Author grants You a separate, signed, written license for +that specific Use. This requirement applies to everyone without +exception, expressly including: + (a) educational institutions, universities, and their faculties, + departments, staff, and students; + (b) research, development, and innovation (RDI) units, technology- + transfer offices, incubators, and accelerators; + (c) government, public-sector, and nonprofit organizations; and + (d) commercial entities of any size. +Funding source, nonprofit status, or "research" or "educational" +characterization does NOT create any permitted Use. + +================================================================ +5. ATTRIBUTION AND INTEGRITY (anti-credit-taking) +================================================================ +5.1 You must not remove, obscure, or alter the "Required Notice" + line above or any copyright or authorship notice in the + Software. +5.2 You must not represent, directly or by implication, that the + Software, its design, its underlying ideas, or any derivative + of it was created, originated, co-authored, or owned by anyone + other than the Author. +5.3 You must not claim, register, publish, or seek credit for the + Software or its ideas — whether as a research output, thesis, + grant deliverable, RDI achievement, publication, patent, or + otherwise — as the work of any person or organization other + than the Author, without the Author's prior written permission. +5.4 Any Use authorized under Section 4 must carry clear, prominent + attribution to the Author in the form the Author specifies. + +================================================================ +6. INTELLECTUAL PROPERTY; NO PATENT OR TRADEMARK LICENSE +================================================================ +No patent license and no trademark license are granted under this +License. The Author reserves all patent rights, naming rights, and +moral rights to the fullest extent permitted by applicable law, +including the right to be identified as the author. + +================================================================ +7. HOW TO REQUEST PERMISSION +================================================================ +To request any license beyond Evaluation, or to discuss attribution +or collaboration, contact the Author IN WRITING BEFORE any Use: + Author: Ali Pourrahim + GitHub: https://github.com/Aliipou + Email: Alipourrahim.ap@gmail.com +Permission is effective only when granted by the Author in a signed +writing that identifies the permitted Use. Silence or non-response +is not permission. + +================================================================ +8. TERMINATION +================================================================ +Any breach of this License terminates all permissions granted to +You automatically and immediately. Upon termination You must cease +all Use and destroy all copies in Your possession. Sections 2, 5, +6, 9, and 10 survive termination. + +================================================================ +9. NO WARRANTY +================================================================ +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND +NON-INFRINGEMENT. + +================================================================ +10. LIMITATION OF LIABILITY +================================================================ +TO THE MAXIMUM EXTENT PERMITTED BY LAW, THE AUTHOR SHALL NOT BE +LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY ARISING FROM OR +IN CONNECTION WITH THE SOFTWARE OR ANY USE OF IT. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..e2da32d --- /dev/null +++ b/NOTICE @@ -0,0 +1,40 @@ +AUTHORSHIP AND OWNERSHIP NOTICE +================================ + +Project: authgate-kernel +Author: Ali Pourrahim (https://github.com/Aliipou) +Contact: Alipourrahim.ap@gmail.com +Copyright: © 2026 Ali Pourrahim. All rights reserved. + +Statement of independent authorship +------------------------------------ +authgate-kernel — a capability-constrained authorization kernel for +agent tool execution — was conceived, designed, and implemented by +the Author as an independent personal work. + +It was NOT created as part of, and does NOT incorporate or depend on: + - any course, thesis, or degree requirement of any institution; + - any employment, grant, scholarship, or funded research project; + - any institution's facilities, equipment, funding, or confidential + information. + +The Author is the sole owner of all intellectual property in this +work. No third party, including any university, research, development, +or innovation (RDI) office, holds any ownership interest, license, or +right to claim credit for this work, its design, or its underlying +ideas, except by a separate written agreement signed by the Author. + +Evidence of authorship and priority +----------------------------------- +Authorship and creation dates are evidenced by the public version- +control history of this repository (commit authorship and timestamps) +and its public publication record at: + https://github.com/Aliipou/authgate-kernel + +Use and attribution +-------------------- +Use of this work is governed by the LICENSE file in this repository +(AuthGate-Kernel Source-Available, Contact-First License). Any use +beyond private evaluation requires the Author's prior written +permission. Attribution to the Author must never be removed or +misrepresented. diff --git a/README.md b/README.md index e9c392c..db9d17f 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A wire format and a verify function. See [POSITIONING.md](POSITIONING.md). [![Tests](https://img.shields.io/badge/tests-1155%20passing-brightgreen.svg)](tests/) [![Kani](https://img.shields.io/badge/Kani-24%20harnesses-green.svg)](formal/) [![Lean4](https://img.shields.io/badge/Lean4-16%20theorems-blue.svg)](formal/lean4/) -[![License: PolyForm NC 1.0.0](https://img.shields.io/badge/License-PolyForm--Noncommercial--1.0.0-orange.svg)](LICENSE) +[![License: Source-Available (Contact-First)](https://img.shields.io/badge/License-Source--Available%20(Contact--First)-red.svg)](LICENSE) ## The problem @@ -382,4 +382,11 @@ If yes, it doesn't belong in the TCB. TCB changes require a written invariant ju ## License -MIT. See [`LICENSE`](LICENSE). +Source-available under the **AuthGate-Kernel Source-Available, Contact-First License** — see [`LICENSE`](LICENSE) and [`NOTICE`](NOTICE). + +You may read and privately evaluate the source. **Any** other use — running, +deploying, copying, modifying, distributing, or building on it, by anyone +including educational/research/RDI and commercial entities — requires the +author's **prior written permission**. The work may not be claimed, in whole +or in part, as anyone else's. To request permission, contact the author first +(see `LICENSE` §7). diff --git a/authgate-kernel/Cargo.toml b/authgate-kernel/Cargo.toml index 9ef896a..30caa0a 100644 --- a/authgate-kernel/Cargo.toml +++ b/authgate-kernel/Cargo.toml @@ -3,7 +3,9 @@ name = "authgate-kernel" version = "0.1.0" edition = "2021" description = "Formal, auditable, cryptographically-verifiable capability-governance kernel" -license = "PolyForm-Noncommercial-1.0.0" +license-file = "../LICENSE" +publish = false +authors = ["Ali Pourrahim "] [lib] name = "authgate_kernel" diff --git a/authgate-kernel/deny.toml b/authgate-kernel/deny.toml index 633dab8..f03256b 100644 --- a/authgate-kernel/deny.toml +++ b/authgate-kernel/deny.toml @@ -29,6 +29,9 @@ ignore = [ [licenses] version = 2 +# The workspace's own crate is source-available/proprietary (license-file, publish = false). +# Skip license checks on private (unpublished) crates; only dependencies are gated below. +private = { ignore = true } allow = [ "MIT", "Apache-2.0", @@ -40,8 +43,6 @@ allow = [ "Unicode-DFS-2016", "OpenSSL", "Zlib", - # This project's own license (source-available, noncommercial). - "PolyForm-Noncommercial-1.0.0", ] [bans] diff --git a/pyproject.toml b/pyproject.toml index 7ca8640..8a63fae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,17 +7,17 @@ name = "authgate" version = "1.0.0" description = "Capability-security kernel for agent runtimes — typed authority verification with formal invariants" readme = "README.md" -license = { text = "MIT" } +license = { file = "LICENSE" } requires-python = ">=3.11" authors = [ - { name = "Contributors" }, + { name = "Ali Pourrahim", email = "Alipourrahim.ap@gmail.com" }, ] keywords = ["capability-security", "agent-runtimes", "formal-verification", "authority-verification", "rust", "formal-methods"] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "Intended Audience :: Science/Research", - "License :: OSI Approved :: MIT License", + "License :: Other/Proprietary License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", From ccd6e4a54a5a768759d7d8438e901d9ca172fac8 Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Mon, 1 Jun 2026 15:27:47 +0300 Subject: [PATCH 2/7] Adopt PolyForm Noncommercial 1.0.0 (noncommercial-free, commercial reserved) Move from the contact-first license to PolyForm Noncommercial 1.0.0: research, educational, evaluation, and internal non-commercial use are allowed with attribution; commercial use, production deployment, resale, and SaaS require a separate commercial license. This keeps the IP/commercial leverage while enabling PhD evaluation, citation, and external validation. - LICENSE: PolyForm Noncommercial 1.0.0 (Required Notice: Ali Pourrahim) + commercial-license note - README: license badge + use-rights table + commercial contact - NOTICE: use section updated to the noncommercial model - pyproject/Cargo: license -> PolyForm-Noncommercial-1.0.0 (SPDX) - deny.toml: re-allow PolyForm-Noncommercial-1.0.0, drop private-ignore Co-Authored-By: Claude Opus 4.8 --- LICENSE | 197 +++++++++++++------------------------ NOTICE | 8 +- README.md | 23 +++-- authgate-kernel/Cargo.toml | 3 +- authgate-kernel/deny.toml | 5 +- pyproject.toml | 2 +- 6 files changed, 91 insertions(+), 147 deletions(-) diff --git a/LICENSE b/LICENSE index 1de978f..f9e4838 100644 --- a/LICENSE +++ b/LICENSE @@ -1,130 +1,67 @@ -AuthGate-Kernel Source-Available, Contact-First License -Version 1.0 — 2026 - -Required Notice: Copyright © 2026 Ali Pourrahim -(https://github.com/Aliipou). All rights reserved. - -================================================================ -0. SUMMARY (non-binding; the numbered terms below control) -================================================================ -You may READ and privately EVALUATE this source code. You may NOT -use, run, deploy, copy beyond evaluation, modify, distribute, or -build on it — for ANY purpose, by ANY person or organization, -including educational institutions, research/innovation (RDI) -offices, and commercial entities — without the Author's PRIOR -WRITTEN PERMISSION. You may never claim the Software, its ideas, or -any derivative as your own work or as originating from anyone other -than the Author. To request permission, contact the Author first -(Section 7). - -================================================================ -1. DEFINITIONS -================================================================ -"Software" means the source code, documentation, specifications, -designs, and all other material in this repository and any portion -of it. -"Author" / "Licensor" means Ali Pourrahim -(https://github.com/Aliipou), the sole owner of the copyright in -the Software, who created it independently and without the -resources, funding, employment, or coursework of any third party. -"You" means any individual or legal entity exercising any right -under this License. -"Use" means to run, execute, reproduce (beyond Evaluation), -modify, translate, adapt, create derivative works of, distribute, -publicly display, publish, sublicense, sell, incorporate into any -product, service, dataset, model, or research output, or otherwise -exploit the Software in any way. -"Evaluation" means reading the source and making one local copy -solely to study and assess it privately, with no other Use. - -================================================================ -2. RESERVATION OF RIGHTS -================================================================ -The Software is the exclusive property of the Author. All rights -not expressly granted in Section 3 are reserved. No right or -license is granted by implication, estoppel, or otherwise. - -================================================================ -3. LIMITED PERMISSION GRANTED -================================================================ -Subject to full compliance with this License, the Author grants You -a personal, non-exclusive, non-transferable, revocable permission -to perform Evaluation of the Software. This is the ONLY permission -granted without prior written authorization. - -================================================================ -4. PERMISSION REQUIRED FOR ALL OTHER USE -================================================================ -Any Use of the Software beyond Evaluation is prohibited unless and -until the Author grants You a separate, signed, written license for -that specific Use. This requirement applies to everyone without -exception, expressly including: - (a) educational institutions, universities, and their faculties, - departments, staff, and students; - (b) research, development, and innovation (RDI) units, technology- - transfer offices, incubators, and accelerators; - (c) government, public-sector, and nonprofit organizations; and - (d) commercial entities of any size. -Funding source, nonprofit status, or "research" or "educational" -characterization does NOT create any permitted Use. - -================================================================ -5. ATTRIBUTION AND INTEGRITY (anti-credit-taking) -================================================================ -5.1 You must not remove, obscure, or alter the "Required Notice" - line above or any copyright or authorship notice in the - Software. -5.2 You must not represent, directly or by implication, that the - Software, its design, its underlying ideas, or any derivative - of it was created, originated, co-authored, or owned by anyone - other than the Author. -5.3 You must not claim, register, publish, or seek credit for the - Software or its ideas — whether as a research output, thesis, - grant deliverable, RDI achievement, publication, patent, or - otherwise — as the work of any person or organization other - than the Author, without the Author's prior written permission. -5.4 Any Use authorized under Section 4 must carry clear, prominent - attribution to the Author in the form the Author specifies. - -================================================================ -6. INTELLECTUAL PROPERTY; NO PATENT OR TRADEMARK LICENSE -================================================================ -No patent license and no trademark license are granted under this -License. The Author reserves all patent rights, naming rights, and -moral rights to the fullest extent permitted by applicable law, -including the right to be identified as the author. - -================================================================ -7. HOW TO REQUEST PERMISSION -================================================================ -To request any license beyond Evaluation, or to discuss attribution -or collaboration, contact the Author IN WRITING BEFORE any Use: - Author: Ali Pourrahim - GitHub: https://github.com/Aliipou - Email: Alipourrahim.ap@gmail.com -Permission is effective only when granted by the Author in a signed -writing that identifies the permitted Use. Silence or non-response -is not permission. - -================================================================ -8. TERMINATION -================================================================ -Any breach of this License terminates all permissions granted to -You automatically and immediately. Upon termination You must cease -all Use and destroy all copies in Your possession. Sections 2, 5, -6, 9, and 10 survive termination. - -================================================================ -9. NO WARRANTY -================================================================ -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND -NON-INFRINGEMENT. - -================================================================ -10. LIMITATION OF LIABILITY -================================================================ -TO THE MAXIMUM EXTENT PERMITTED BY LAW, THE AUTHOR SHALL NOT BE -LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY ARISING FROM OR -IN CONNECTION WITH THE SOFTWARE OR ANY USE OF IT. +# PolyForm Noncommercial License 1.0.0 + + + +Required Notice: Copyright © 2026 Ali Pourrahim (https://github.com/Aliipou) + +## Acceptance + +In order to get any license under these terms, you must agree to them as both strict obligations and conditions to all your licenses. + +## Copyright License + +The licensor grants you a copyright license for the software to do everything you might do with the software that would otherwise infringe the licensor's copyright in it for any permitted purpose. However, you may only distribute the software according to Distribution License and make changes or new works based on the software according to Changes and New Works License. + +## Distribution License + +The licensor grants you an additional copyright license to distribute copies of the software. Your license to distribute covers distributing the software with changes and new works permitted by Changes and New Works License. + +## Notices + +You must ensure that anyone who gets a copy of any part of the software from you also gets a copy of these terms or the URL for them above, as well as copies of any plain-text lines beginning with "Required Notice:" that the licensor provided with the software. + +## Changes and New Works License + +The licensor grants you an additional copyright license to make changes and new works based on the software for any permitted purpose. + +## Patent License + +The licensor grants you a patent license for the software that covers patent claims the licensor can license, or becomes able to license, that you would infringe by using the software. + +## Noncommercial Purposes + +Any noncommercial purpose is a permitted purpose. + +## Personal Uses + +Personal use for research, experiment, and testing for the benefit of public knowledge, personal study, private entertainment, hobby projects, amateur pursuits, or religious observance, without any anticipated commercial application, is use for a permitted purpose. + +## Noncommercial Organizations + +Use by any charitable organization, educational institution, public research organization, public safety or health organization, environmental protection organization, or government institution is use for a permitted purpose regardless of the source of funding or obligations resulting from the funding. + +## Fair Use + +You may have fair use rights for the software under applicable law. These terms do not limit them. + +## No Other Rights + +These terms do not allow you to sublicense or transfer any of your licenses to anyone else, or prevent the licensor from granting licenses to anyone else. These terms do not imply any other licenses. + +## Patent Defense + +If you make any written claim that the software infringes or contributes to infringement of any patent, your patent license for the software granted under these terms ends immediately. If your company makes such a claim, your patent license ends immediately for work on behalf of your company. + +## Violations + +The first time you are notified in writing that you have violated any of these terms, or done anything with the software not covered by your licenses, your licenses can nonetheless continue if you come into full compliance with these terms, and take practical steps to correct past violations, within 32 days of receiving notice. Otherwise, all your licenses end immediately. + +## No Liability + +As far as the law allows, the software comes as is, without any warranty or condition, and the licensor will not be liable to you for any damages arising out of these terms or the use or nature of the software, under any kind of legal claim. + +--- + +Commercial use, production deployment, resale, and SaaS offerings are NOT +permitted under this license. A separate commercial license is available — +contact Ali Pourrahim . diff --git a/NOTICE b/NOTICE index e2da32d..38dbd66 100644 --- a/NOTICE +++ b/NOTICE @@ -34,7 +34,9 @@ and its public publication record at: Use and attribution -------------------- Use of this work is governed by the LICENSE file in this repository -(AuthGate-Kernel Source-Available, Contact-First License). Any use -beyond private evaluation requires the Author's prior written -permission. Attribution to the Author must never be removed or +(PolyForm Noncommercial License 1.0.0). Noncommercial use — research, +educational, evaluation, and internal non-commercial testing — is +permitted with attribution. Commercial use, production deployment, +resale, and SaaS offerings require a separate commercial license from +the Author. Attribution to the Author must never be removed or misrepresented. diff --git a/README.md b/README.md index db9d17f..6ef6429 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A wire format and a verify function. See [POSITIONING.md](POSITIONING.md). [![Tests](https://img.shields.io/badge/tests-1155%20passing-brightgreen.svg)](tests/) [![Kani](https://img.shields.io/badge/Kani-24%20harnesses-green.svg)](formal/) [![Lean4](https://img.shields.io/badge/Lean4-16%20theorems-blue.svg)](formal/lean4/) -[![License: Source-Available (Contact-First)](https://img.shields.io/badge/License-Source--Available%20(Contact--First)-red.svg)](LICENSE) +[![License: PolyForm Noncommercial 1.0.0](https://img.shields.io/badge/License-PolyForm--Noncommercial--1.0.0-orange.svg)](LICENSE) ## The problem @@ -382,11 +382,18 @@ If yes, it doesn't belong in the TCB. TCB changes require a written invariant ju ## License -Source-available under the **AuthGate-Kernel Source-Available, Contact-First License** — see [`LICENSE`](LICENSE) and [`NOTICE`](NOTICE). +**Source-available** under the [PolyForm Noncommercial License 1.0.0](LICENSE) — see also [`NOTICE`](NOTICE). -You may read and privately evaluate the source. **Any** other use — running, -deploying, copying, modifying, distributing, or building on it, by anyone -including educational/research/RDI and commercial entities — requires the -author's **prior written permission**. The work may not be claimed, in whole -or in part, as anyone else's. To request permission, contact the author first -(see `LICENSE` §7). +| Use | Status | +|---|---| +| Evaluation | ✅ Allowed | +| Research | ✅ Allowed | +| Educational | ✅ Allowed | +| Internal non-commercial testing | ✅ Allowed | +| Redistribution (non-commercial) | ✅ Allowed, with attribution | +| Production deployment | ⛔ Requires commercial license | +| Commercial use / SaaS / resale | ⛔ Requires commercial license | +| Patent rights | Reserved | + +A **commercial license is available separately.** For production or commercial use, +contact **Ali Pourrahim — Alipourrahim.ap@gmail.com**. diff --git a/authgate-kernel/Cargo.toml b/authgate-kernel/Cargo.toml index 30caa0a..1a93fc2 100644 --- a/authgate-kernel/Cargo.toml +++ b/authgate-kernel/Cargo.toml @@ -3,8 +3,7 @@ name = "authgate-kernel" version = "0.1.0" edition = "2021" description = "Formal, auditable, cryptographically-verifiable capability-governance kernel" -license-file = "../LICENSE" -publish = false +license = "PolyForm-Noncommercial-1.0.0" authors = ["Ali Pourrahim "] [lib] diff --git a/authgate-kernel/deny.toml b/authgate-kernel/deny.toml index f03256b..633dab8 100644 --- a/authgate-kernel/deny.toml +++ b/authgate-kernel/deny.toml @@ -29,9 +29,6 @@ ignore = [ [licenses] version = 2 -# The workspace's own crate is source-available/proprietary (license-file, publish = false). -# Skip license checks on private (unpublished) crates; only dependencies are gated below. -private = { ignore = true } allow = [ "MIT", "Apache-2.0", @@ -43,6 +40,8 @@ allow = [ "Unicode-DFS-2016", "OpenSSL", "Zlib", + # This project's own license (source-available, noncommercial). + "PolyForm-Noncommercial-1.0.0", ] [bans] diff --git a/pyproject.toml b/pyproject.toml index 8a63fae..19bd149 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "authgate" version = "1.0.0" description = "Capability-security kernel for agent runtimes — typed authority verification with formal invariants" readme = "README.md" -license = { file = "LICENSE" } +license = { text = "PolyForm-Noncommercial-1.0.0" } requires-python = ">=3.11" authors = [ { name = "Ali Pourrahim", email = "Alipourrahim.ap@gmail.com" }, From 9e317fd48d60639118942cdade6214dc4339569c Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Wed, 10 Jun 2026 17:08:24 +0300 Subject: [PATCH 3/7] feat(runtime): capability-gated agent runtime MVP + red-team hardening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recovers the runtime MVP interrupted by a shutdown and hardens it against two real vulnerabilities found by the adversarial harness. Runtime layer (non-TCB): Agent -> Planner -> loop -> CallGate.verify -> sandboxed Tool -> RunLog/AuditLog. 3 tools (calculator, file_read, web_search), mock/scripted planners, append-only jsonl trace. No multi-agent, no delegation DSL, no federation — MVP scope only. redteam/runtime_redteam.py: deterministic harness of N adversarial "engineers" across 10 attack categories; any escape is a hard failure. Vulnerabilities found and fixed: - DoS via unbounded `**` in calculator: `2**2**2**2**2**2` builds a ~10^19728-digit int and hangs the process (it hung the 1000-engineer run). Now bounded by expr length, exponent cap, and result-bits cap; overflow/divide-by-zero normalized to ValueError so the gate sees a clean denial instead of a hang. - Sandbox bypass via Windows reserved device names: `CON`/`NUL`/`COMn`/ `LPTn` resolve inside the sandbox yet open a device at open() time (`CON` blocks forever = DoS). Refused by name on every platform. Both vectors now covered by unit tests and the red-team harness. Full suite: 1283 passed; ruff + mypy clean (AUTHGATE_BACKEND=python). Co-Authored-By: Claude Opus 4.8 --- examples/sandbox/notes.txt | 2 + redteam/runtime_redteam.py | 596 +++++++++++++++++++++++++++++ redteam/test_redteam_regression.py | 4 +- src/authgate/runtime/__init__.py | 46 +++ src/authgate/runtime/agent.py | 192 ++++++++++ src/authgate/runtime/planner.py | 100 +++++ src/authgate/runtime/run_log.py | 65 ++++ src/authgate/runtime/tools.py | 224 +++++++++++ tests/test_runtime_agent.py | 173 +++++++++ tests/test_runtime_planner.py | 149 ++++++++ tests/test_runtime_redteam.py | 101 +++++ tests/test_runtime_run_log.py | 98 +++++ tests/test_runtime_tools.py | 297 ++++++++++++++ 13 files changed, 2045 insertions(+), 2 deletions(-) create mode 100644 examples/sandbox/notes.txt create mode 100644 redteam/runtime_redteam.py create mode 100644 src/authgate/runtime/__init__.py create mode 100644 src/authgate/runtime/agent.py create mode 100644 src/authgate/runtime/planner.py create mode 100644 src/authgate/runtime/run_log.py create mode 100644 src/authgate/runtime/tools.py create mode 100644 tests/test_runtime_agent.py create mode 100644 tests/test_runtime_planner.py create mode 100644 tests/test_runtime_redteam.py create mode 100644 tests/test_runtime_run_log.py create mode 100644 tests/test_runtime_tools.py diff --git a/examples/sandbox/notes.txt b/examples/sandbox/notes.txt new file mode 100644 index 0000000..5eae509 --- /dev/null +++ b/examples/sandbox/notes.txt @@ -0,0 +1,2 @@ +hello from sandbox +revenue Q1: 1200000 diff --git a/redteam/runtime_redteam.py b/redteam/runtime_redteam.py new file mode 100644 index 0000000..485df21 --- /dev/null +++ b/redteam/runtime_redteam.py @@ -0,0 +1,596 @@ +""" +runtime_redteam.py — adversarial red-team harness for the capability-gated runtime. + +WHAT THIS IS +------------ +A standalone, deterministic harness that conjures 1000 distinct "engineers" — each +with its own seed, archetype and skill level — and pits every one of them against +the `authgate.runtime` agent runtime. Each engineer mounts exactly ONE attack drawn +from one of ten categories. The runtime is the *defender*: the harness PROVES that +every attack is blocked. ANY successful escape is a hard FAILURE (exit code 1). + +"Blocked" means the system correctly resisted the attack: the kernel gate denied the +step, the tool's own sandbox refused, the calculator rejected the expression, the +audit chain detected tampering, etc. An "escape" is the opposite — the attacker got +the runtime to do something it must never do (read a real system file, run code, +act without a capability, slip past a revocation, hide a tamper). + +DESIGN NOTES +------------ +* Fully deterministic: a fixed master seed yields an identical RedTeamReport. +* Standalone: it inserts ``src`` onto sys.path itself, so it runs without conftest. +* It does NOT modify or weaken any source under ``src``. It only *observes* the + runtime's behavior and asserts the safe outcome. If a real vulnerability existed, + an attack's ``blocked`` would come back False and surface as an escape — the + harness never masks that. + +RUN +--- + $env:AUTHGATE_BACKEND='python' # PowerShell + python redteam/runtime_redteam.py +""" +from __future__ import annotations + +import logging +import os +import random +import sys +import tempfile +from collections.abc import Callable +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +# --- standalone path wiring (the repo's tests use conftest; we must not rely on it) --- +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SRC = _REPO_ROOT / "src" +if str(_SRC) not in sys.path: + sys.path.insert(0, str(_SRC)) +# The runtime layer is not part of the TCB; the python backend is the one under test. +os.environ.setdefault("AUTHGATE_BACKEND", "python") + +# Each attack deliberately provokes denials, and the kernel logs a WARNING per +# deny. Across thousands of internal runs that log volume dominates runtime, so we +# quiet the authgate logger. This is purely a verbosity/performance change in the +# HARNESS — it does not touch any security decision in the source under test. +logging.getLogger("authgate").setLevel(logging.CRITICAL) + +from authgate.kernel.audit import AuditLog # noqa: E402 +from authgate.runtime import ( # noqa: E402 + AgentRuntime, + PlanStep, + ScriptedPlanner, + build_runtime, +) +from authgate.runtime.tools import calculate # noqa: E402 + +# Sentinel that must NEVER appear in any returned output: it is the marker of a +# real system-file read leaking through the sandbox. +_LEAK_MARKERS = ("root:", "[boot loader]", "win.ini", "/bin/bash", "daemon:") + + +# =========================================================================== +# Engineer population +# =========================================================================== + +_ARCHETYPES = [ + "path-traversal artist", "capability forger", "expression smuggler", + "prompt injector", "revocation racer", "denial prober", "ledger tamperer", + "replay operator", "fuzz gremlin", "identity ghost", "supply-chain mole", + "side-channel listener", +] +_SKILLS = ["junior", "mid", "senior", "principal", "nation-state"] +_FIRST = [ + "Mara", "Iqbal", "Soren", "Yuki", "Dmitri", "Aisha", "Tomas", "Lena", + "Kwame", "Priya", "Olek", "Nadia", "Hiro", "Camila", "Bjorn", "Fatima", + "Rashid", "Ingrid", "Diego", "Mei", "Anton", "Zara", "Lucas", "Noor", +] +_LAST = [ + "Voss", "Khan", "Aaltonen", "Tanaka", "Petrov", "Okafor", "Nguyen", + "Lindqvist", "Mbeki", "Sharma", "Kowalski", "Haddad", "Sato", "Reyes", + "Eriksson", "Ali", "Costa", "Wang", "Novak", "Bianchi", "Park", "Singh", +] + +# Attack categories in fixed order; index == id % len drives even distribution. +ATTACK_CATEGORIES = [ + "SANDBOX_ESCAPE", + "CAP_FORGERY", + "CALC_INJECTION", + "PLAN_INJECTION", + "REVOCATION_BYPASS", + "DENIAL_PROBE", + "AUDIT_TAMPER", + "REPLAY_AFTER_DENY", + "ARG_FUZZ", + "IDENTITY_SPOOF", +] + + +@dataclass(frozen=True) +class EngineerProfile: + """One adversary. Synthesized deterministically from the master seed.""" + + id: int + name: str + archetype: str + seed: int + attack_category: str + skill: str + bio: str + + +@dataclass(frozen=True) +class AttackResult: + """Outcome of one engineer's single attack.""" + + profile: EngineerProfile + category: str + blocked: bool # True == system correctly resisted; False == ESCAPE + detail: str + + +@dataclass +class RedTeamReport: + total: int + blocked: int + escapes: list[AttackResult] = field(default_factory=list) + by_category: dict[str, tuple[int, int]] = field(default_factory=dict) # cat -> (blocked, total) + + +def _make_profiles(n: int, master_seed: int) -> list[EngineerProfile]: + """Deterministically synthesize n distinct engineers from one master seed.""" + rng = random.Random(master_seed) + profiles: list[EngineerProfile] = [] + for i in range(n): + category = ATTACK_CATEGORIES[i % len(ATTACK_CATEGORIES)] + first = rng.choice(_FIRST) + last = rng.choice(_LAST) + name = f"{first} {last}-{i:04d}" + archetype = rng.choice(_ARCHETYPES) + skill = rng.choice(_SKILLS) + seed = rng.randrange(1, 2**31 - 1) + bio = ( + f"{skill} {archetype} (#{i}); favors the {category} vector; " + f"operates from seed {seed}." + ) + profiles.append(EngineerProfile( + id=i, name=name, archetype=archetype, seed=seed, + attack_category=category, skill=skill, bio=bio, + )) + return profiles + + +# =========================================================================== +# Sandbox fixture (benign files only; we NEVER touch real OS files) +# =========================================================================== + +def _new_sandbox() -> Path: + """A fresh tmp dir seeded with a couple of innocuous files.""" + root = Path(tempfile.mkdtemp(prefix="redteam_sbx_")) + (root / "notes.txt").write_text("benign notes inside the sandbox\n", encoding="utf-8") + (root / "data.txt").write_text("12345\n", encoding="utf-8") + return root + + +def _no_leak(outputs: list[Any]) -> bool: + """True iff no returned output contains the fingerprint of a real system file.""" + for out in outputs: + if not isinstance(out, str): + continue + low = out.lower() + if any(m.lower() in low for m in _LEAK_MARKERS): + return False + return True + + +# =========================================================================== +# Attack categories — each returns (blocked: bool, detail: str) +# =========================================================================== + +_HOSTILE_PATH_TEMPLATES = [ + "../etc/passwd", + "../../../../etc/shadow", + "../../../../../../etc/passwd", + "/etc/passwd", + "/etc/shadow", + r"C:\Windows\win.ini", + r"C:\boot.ini", + "..%2f..%2fetc%2fpasswd", + "....//....//etc/passwd", + "foo/../../bar/../../../etc/passwd", + "notes.txt/../../../../etc/passwd", + "./../" * 8 + "etc/passwd", + "etc/passwd\x00.txt", + "a" * 4000 + "/etc/passwd", + "../" * 64 + "Windows/System32/drivers/etc/hosts", + "subdir/../../../../etc/passwd", + # Windows reserved device names: resolve inside the sandbox yet open a device + # (CON blocks forever = DoS; NUL/COMn/LPTn touch hardware). Must be refused. + "CON", "NUL", "PRN", "AUX", "COM1", "LPT1", "CON.txt", "subdir/NUL", +] + + +def _attack_sandbox_escape(p: EngineerProfile) -> tuple[bool, str]: + """file_read GRANTED (capability passes) but a hostile path must be refused + by the tool's own sandbox boundary — and no real file content may leak.""" + rng = random.Random(p.seed) + root = _new_sandbox() + path = rng.choice(_HOSTILE_PATH_TEMPLATES) + # Occasionally append junk to vary it per engineer. + if rng.random() < 0.5: + path = path + ("." * rng.randint(1, 5)) + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep("file_read", {"filename": path}, "exfiltrate")]), + root, granted_tools=["file_read"], + ) + result = rt.run("read a file") + outcome = result.outcomes[0] + # Must be a denial (the tool raised PermissionError -> gate captures as not permitted), + # and absolutely no permitted outcome may carry real system-file content. + leaked_outputs = result.outputs() # only permitted steps' outputs + safe = (not outcome.permitted) and outcome.result.output is None and _no_leak(leaked_outputs) + detail = ( + f"path={path[:48]!r} permitted={outcome.permitted} " + f"reason={str(outcome.result.denied_reason)[:60]!r} no_leak={_no_leak(leaked_outputs)}" + ) + return safe, detail + + +def _attack_cap_forgery(p: EngineerProfile) -> tuple[bool, str]: + """Use a tool that was never granted. The kernel gate must DENY it and the + tool body must never run (output is None).""" + rng = random.Random(p.seed) + root = _new_sandbox() + # Pick a target tool, then grant everything EXCEPT it. + target = rng.choice(["file_read", "web_search", "calculator"]) + others = [t for t in ["file_read", "web_search", "calculator"] if t != target] + granted = rng.choice([[], others, [others[0]]]) # never includes target + args = { + "file_read": {"filename": "notes.txt"}, + "web_search": {"query": "authgate"}, + "calculator": {"expression": "2+2"}, + }[target] + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(target, args, "use ungranted tool")]), + root, granted_tools=granted, + ) + outcome = rt.run("go").outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None + return safe, f"target={target} granted={granted} permitted={outcome.permitted}" + + +_CALC_PAYLOADS = [ + "__import__('os').system('echo pwned')", + "().__class__.__bases__[0].__subclasses__()", + "lambda: 1", + "open('/etc/passwd')", + "1; import os", + "os.system('id')", + "eval('2+2')", + "(1).__class__", + "x + 1", + "f'{1+1}'", + "[].append(1)", + "{'a': 1}", + "2 ** 2 ** 2 ** 2 ** 2 ** 2", + "(" * 200 + "1" + ")" * 200, + "__builtins__", + "globals()", + "1 if True else 2", + "0xdeadbeef.bit_length()", +] + + +def _attack_calc_injection(p: EngineerProfile) -> tuple[bool, str]: + """Calculator must reject any non-arithmetic expression: calculate() raises + ValueError directly, AND through the runtime the step is denied with no + object/code execution leaking out.""" + rng = random.Random(p.seed) + root = _new_sandbox() + expr = rng.choice(_CALC_PAYLOADS) + + # 1) direct tool contract: must raise ValueError. + raised_value_error = False + try: + calculate(expr) + except ValueError: + raised_value_error = True + except Exception: + raised_value_error = False # any OTHER exception type would be suspicious + + # 2) through the runtime: granted calculator, but the step must be denied and + # output must be None (never a live Python object / side effect). + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep("calculator", {"expression": expr}, "exec")]), + root, granted_tools=["calculator"], + ) + outcome = rt.run("calculate").outcomes[0] + runtime_blocked = (not outcome.permitted) and outcome.result.output is None + safe = raised_value_error and runtime_blocked + return safe, ( + f"expr={expr[:40]!r} value_error={raised_value_error} " + f"permitted={outcome.permitted}" + ) + + +_FORBIDDEN_TOOLS = [ + "shell", "exec", "../tool", "calculator\n; rm -rf /", "file_write", + "system", "eval", "os.system", "", "CALCULATOR", "web_search ", "net", +] + + +def _attack_plan_injection(p: EngineerProfile) -> tuple[bool, str]: + """A prompt-injected plan naming an unknown/forbidden tool, or carrying a huge + arg payload, must produce a clean denial — never a crash, and loop integrity + must hold (run() returns normally).""" + rng = random.Random(p.seed) + root = _new_sandbox() + bad_tool = rng.choice(_FORBIDDEN_TOOLS) + huge = "A" * rng.randint(1000, 20000) + plan = [PlanStep(bad_tool, {"payload": huge, "x": rng.random()}, "injected")] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=None) + crashed = False + try: + result = rt.run("do the injected thing") + except Exception as exc: # run() must never raise on a hostile plan + return False, f"run() raised {type(exc).__name__}: {exc}" + outcome = result.outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None and not crashed + return safe, f"tool={bad_tool!r} permitted={outcome.permitted}" + + +def _attack_revocation_bypass(p: EngineerProfile) -> tuple[bool, str]: + """Run a granted tool (permit), then revoke_all and run again -> must DENY. + Also an epoch-revocation sub-check: a runtime demanding a higher min_epoch + rejects old-epoch claims, and reissuing via advance_epoch restores them.""" + root = _new_sandbox() + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "1+1"}, "compute")]) + rt, reg = build_runtime(planner, root, granted_tools=["calculator"], freeze=False) + + first = rt.run("compute").outcomes[0] + if not first.permitted: + return False, "baseline run was denied (tool was never usable)" + + reg.revoke_all("agent-1") + after_revoke = rt.run("compute").outcomes[0] + revoke_holds = not after_revoke.permitted + + # Epoch variant: a fresh runtime over the SAME gate demanding min_epoch=2. + # Claims default to epoch=1, so they must be rejected until advance_epoch(2). + # (We re-grant by re-building to avoid coupling to the revoked registry.) + rt2, reg2 = build_runtime(planner, root, granted_tools=["calculator"], freeze=False) + rt_hi = AgentRuntime( + agent=rt2._agent, gate=rt2._gate, tools=rt2._tools, planner=planner, min_epoch=2, + ) + epoch_old_rejected = not rt_hi.run("compute").outcomes[0].permitted + reg2.advance_epoch(2) + epoch_reissued_ok = rt_hi.run("compute").outcomes[0].permitted + epoch_holds = epoch_old_rejected and epoch_reissued_ok + + safe = revoke_holds and epoch_holds + return safe, ( + f"first={first.permitted} after_revoke={after_revoke.permitted} " + f"epoch_old_rejected={epoch_old_rejected} epoch_reissued={epoch_reissued_ok}" + ) + + +def _attack_denial_probe(p: EngineerProfile) -> tuple[bool, str]: + """A plan: permitted step, then a DENIED step in the middle, then a later step + that must NEVER execute. The runtime must stop_early and never run later bodies.""" + root = _new_sandbox() + # calculator granted, file_read NOT -> middle step denied. Later step is a + # calculator read of a SENTINEL file inside the sandbox; if it ran we'd see output. + (root / "should_not_read.txt").write_text("LATER_STEP_EXECUTED", encoding="utf-8") + plan = [ + PlanStep("calculator", {"expression": "1+1"}, "ok step"), + PlanStep("file_read", {"filename": "notes.txt"}, "DENIED middle (ungranted)"), + PlanStep("file_read", {"filename": "should_not_read.txt"}, "must not run"), + ] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=["calculator"]) + result = rt.run("probe") + # Exactly 2 outcomes recorded (loop broke at the denied middle step). + later_never_ran = all( + o.result.output != "LATER_STEP_EXECUTED" for o in result.outcomes + ) + structure_ok = ( + len(result.outcomes) == 2 + and result.outcomes[0].permitted + and not result.outcomes[1].permitted + and result.stopped_early + ) + safe = structure_ok and later_never_ran + return safe, ( + f"n_outcomes={len(result.outcomes)} stopped_early={result.stopped_early} " + f"later_never_ran={later_never_ran}" + ) + + +def _attack_audit_tamper(p: EngineerProfile) -> tuple[bool, str]: + """Run steps under an AuditLog (chain valid), then mutate one internal entry + field; verify_chain() must then report tampering (return False).""" + rng = random.Random(p.seed) + root = _new_sandbox() + audit = AuditLog() + plan = [ + PlanStep("calculator", {"expression": "1+1"}, "a"), + PlanStep("calculator", {"expression": "2+2"}, "b"), + PlanStep("calculator", {"expression": "3+3"}, "c"), + ] + rt, _reg = build_runtime(ScriptedPlanner(plan), root, granted_tools=["calculator"], audit_log=audit) + rt.run("audited") + + before = audit.verify_chain() + if not audit._records: + return False, "no audit records were produced" + idx = rng.randrange(len(audit._records)) + field_choice = rng.choice(["permitted", "action_id", "confidence"]) + rec = audit._records[idx] + if field_choice == "permitted": + rec["permitted"] = not rec["permitted"] + elif field_choice == "action_id": + rec["action_id"] = str(rec["action_id"]) + "-TAMPERED" + else: + rec["confidence"] = (rec.get("confidence", 1.0) or 0.0) - 0.5 + after = audit.verify_chain() + + safe = before is True and after is False + return safe, f"before={before} after={after} tampered_field={field_choice} idx={idx}" + + +def _attack_replay_after_deny(p: EngineerProfile) -> tuple[bool, str]: + """After a denied step, re-issue the exact same step. Authority must not leak: + the replay must stay denied.""" + root = _new_sandbox() + # file_read NOT granted -> denied. Replay the same plan; still denied. + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "notes.txt"}, "denied read")]) + rt, _reg = build_runtime(planner, root, granted_tools=["calculator"]) + first = rt.run("read").outcomes[0] + second = rt.run("read").outcomes[0] + safe = (not first.permitted) and (not second.permitted) and second.result.output is None + return safe, f"first={first.permitted} replay={second.permitted}" + + +def _attack_arg_fuzz(p: EngineerProfile) -> tuple[bool, str]: + """Permitted tools with pathological args. The loop must never crash and a + tool that raises must surface as a non-permitted/None-output result — never + a denial-bypassing success and never an exception escaping run().""" + rng = random.Random(p.seed) + root = _new_sandbox() + tool = rng.choice(["calculator", "web_search", "file_read"]) + pathological = rng.choice([ + "💥" * 500, + "\x00\x01\x02\x03", + "\n\r\t" * 100, + "𝔘𝔫𝔦𝔠𝔬𝔡𝔢" * 50, + " ", + "-" * 5000, + "نظام" * 200, + ]) + # Build args with the right key for the tool, sometimes wrong-typed. + key = {"calculator": "expression", "web_search": "query", "file_read": "filename"}[tool] + val: Any = rng.choice([pathological, None, {"nested": pathological}, 12345]) + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(tool, {key: val}, "fuzz")]), + root, granted_tools=[tool], + ) + crashed = False + try: + result = rt.run("fuzz") + except Exception as exc: + return False, f"run() raised {type(exc).__name__}: {exc}" + outcome = result.outcomes[0] + # Either it permitted with a benign string output (web_search of junk is fine), + # or it was denied/errored with output None. Never: permitted with leaked content. + if outcome.permitted: + out = outcome.result.output + safe = (out is None or isinstance(out, str)) and _no_leak([out]) and not crashed + else: + safe = outcome.result.output is None and not crashed + return safe, f"tool={tool} val_type={type(val).__name__} permitted={outcome.permitted}" + + +def _attack_identity_spoof(p: EngineerProfile) -> tuple[bool, str]: + """Act as 'agent-1' from a runtime whose agent holds NO claims (granted=[]). + The agent is owned but un-capable, so every tool call must be denied.""" + rng = random.Random(p.seed) + root = _new_sandbox() + tool = rng.choice(["calculator", "web_search", "file_read"]) + args = { + "calculator": {"expression": "1+1"}, + "web_search": {"query": "authgate"}, + "file_read": {"filename": "notes.txt"}, + }[tool] + rt, _reg = build_runtime( + ScriptedPlanner([PlanStep(tool, args, "spoofed authority")]), + root, granted_tools=[], # agent holds zero capabilities + ) + outcome = rt.run("act as agent-1").outcomes[0] + safe = (not outcome.permitted) and outcome.result.output is None + return safe, f"tool={tool} permitted={outcome.permitted}" + + +_ATTACK_FNS: dict[str, Callable[[EngineerProfile], tuple[bool, str]]] = { + "SANDBOX_ESCAPE": _attack_sandbox_escape, + "CAP_FORGERY": _attack_cap_forgery, + "CALC_INJECTION": _attack_calc_injection, + "PLAN_INJECTION": _attack_plan_injection, + "REVOCATION_BYPASS": _attack_revocation_bypass, + "DENIAL_PROBE": _attack_denial_probe, + "AUDIT_TAMPER": _attack_audit_tamper, + "REPLAY_AFTER_DENY": _attack_replay_after_deny, + "ARG_FUZZ": _attack_arg_fuzz, + "IDENTITY_SPOOF": _attack_identity_spoof, +} + + +# =========================================================================== +# Orchestration +# =========================================================================== + +def _run_one(profile: EngineerProfile) -> AttackResult: + fn = _ATTACK_FNS[profile.attack_category] + try: + blocked, detail = fn(profile) + except Exception as exc: + # An unhandled exception in the HARNESS is a harness bug, but we treat it + # conservatively as a non-block so it surfaces loudly rather than hiding. + blocked, detail = False, f"harness exception: {type(exc).__name__}: {exc}" + return AttackResult(profile=profile, category=profile.attack_category, + blocked=blocked, detail=detail) + + +def run_redteam(n: int = 1000, master_seed: int = 1337) -> RedTeamReport: + """Run n engineers against the runtime and return a deterministic report.""" + profiles = _make_profiles(n, master_seed) + results = [_run_one(p) for p in profiles] + + escapes = [r for r in results if not r.blocked] + by_category: dict[str, tuple[int, int]] = {} + for cat in ATTACK_CATEGORIES: + cat_results = [r for r in results if r.category == cat] + blocked = sum(1 for r in cat_results if r.blocked) + by_category[cat] = (blocked, len(cat_results)) + + return RedTeamReport( + total=len(results), + blocked=sum(1 for r in results if r.blocked), + escapes=escapes, + by_category=by_category, + ) + + +def _print_report(report: RedTeamReport) -> None: + line = "=" * 64 + print(line) + print("AuthGate runtime RED-TEAM HARNESS — 1000 adversarial engineers") + print(line) + print(f"{'CATEGORY':<20}{'BLOCKED':>10}{'TOTAL':>8}{'STATUS':>12}") + print("-" * 64) + for cat in ATTACK_CATEGORIES: + blocked, total = report.by_category.get(cat, (0, 0)) + status = "HELD" if blocked == total and total > 0 else "BREACH" + print(f"{cat:<20}{blocked:>10}{total:>8}{status:>12}") + print("-" * 64) + overall = "ALL ATTACKS BLOCKED" if not report.escapes else "ESCAPES DETECTED" + print(f"{'TOTAL':<20}{report.blocked:>10}{report.total:>8}{overall:>20}") + print(line) + if report.escapes: + print(f"\n!!! {len(report.escapes)} ESCAPE(S) — runtime FAILED to resist:\n") + for esc in report.escapes: + p = esc.profile + print(f" [ESCAPE] #{p.id} {p.name} ({p.skill} {p.archetype})") + print(f" category={esc.category}") + print(f" detail ={esc.detail}") + else: + print("\nNo escapes. The capability gate, sandbox, calculator, revocation,") + print("denial halting, and audit chain all held against every engineer.") + + +def main() -> int: + report = run_redteam() + _print_report(report) + return 1 if report.escapes else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/redteam/test_redteam_regression.py b/redteam/test_redteam_regression.py index e89f8e8..df01bcb 100644 --- a/redteam/test_redteam_regression.py +++ b/redteam/test_redteam_regression.py @@ -10,10 +10,10 @@ import sys -from authgate.kernel.entities import Entity, Resource, RightsClaim, AgentType, ResourceType +from authgate.kernel.entities import AgentType, Entity, Resource, ResourceType, RightsClaim +from authgate.kernel.hardened import HardenedVerifier from authgate.kernel.registry import OwnershipRegistry from authgate.kernel.verifier import Action, FreedomVerifier -from authgate.kernel.hardened import HardenedVerifier H = AgentType.HUMAN M = AgentType.MACHINE diff --git a/src/authgate/runtime/__init__.py b/src/authgate/runtime/__init__.py new file mode 100644 index 0000000..244db0b --- /dev/null +++ b/src/authgate/runtime/__init__.py @@ -0,0 +1,46 @@ +""" +authgate.runtime — minimal capability-gated AI agent runtime (MVP). + +This is the operational layer described in planmvp.md. It is NOT part of the TCB. +It turns an agent intent into a tool plan, then executes each step through the +existing kernel CallGate so that no tool runs without a verified capability: + + intent -> Planner -> [PlanStep] -> AgentRuntime loop + | + v per step + build Action + | + v + CallGate.verify + execute (kernel TCB) + | + permit -> sandboxed tool runs + deny -> stop, nothing further runs + | + v + RunLog (jsonl) + hash-chained AuditLog + +Design constraints (planmvp.md): single agent, 3 tools, mock planner, +AuthGate verify per call, simple sandbox, append-only log. No delegation graph, +no epoch DSL, no multi-agent, no federation in this layer. +""" +from __future__ import annotations + +from authgate.runtime.agent import AgentRuntime, RunResult, StepOutcome, build_runtime +from authgate.runtime.planner import MockPlanner, Planner, PlanStep, ScriptedPlanner +from authgate.runtime.run_log import RunLog +from authgate.runtime.tools import Tool, ToolRegistry, build_default_tools + +__all__ = [ + "AgentRuntime", + "RunResult", + "StepOutcome", + "build_runtime", + "Planner", + "PlanStep", + "MockPlanner", + "ScriptedPlanner", + "RunLog", + "Tool", + "ToolRegistry", + "build_default_tools", +] diff --git a/src/authgate/runtime/agent.py b/src/authgate/runtime/agent.py new file mode 100644 index 0000000..7ec5f1b --- /dev/null +++ b/src/authgate/runtime/agent.py @@ -0,0 +1,192 @@ +""" +AgentRuntime — the MVP agent loop (non-TCB orchestration). + +This is the whole runtime, and it is deliberately tiny (planmvp.md STEP 1): + + plan = planner.plan(intent) + for step in plan: + action = build_action(step) # typed capability request + result = gate.execute(action, ...) # kernel verifies + runs (or denies) + run_log.record(...) # trace it + if denied: stop # a denial halts the plan + +Every security decision happens inside the kernel `CallGate` / `FreedomVerifier`; +this module only *sequences* steps and turns each one into a typed `Action`. It +holds no authority of its own — if the agent lacks a capability, the gate denies +the step and the loop stops, with nothing further executed. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from authgate.kernel.audit import AuditLog +from authgate.kernel.call_gate import CallGate, GateResult +from authgate.kernel.entities import AgentType, Entity, RightsClaim +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action, FreedomVerifier +from authgate.runtime.planner import Planner, PlanStep +from authgate.runtime.run_log import RunLog +from authgate.runtime.tools import Tool, ToolRegistry, build_default_tools + + +@dataclass(frozen=True) +class StepOutcome: + """One executed (or denied) plan step and its gate result.""" + + step: PlanStep + result: GateResult + + @property + def permitted(self) -> bool: + return self.result.permitted + + +@dataclass(frozen=True) +class RunResult: + """Outcome of running one intent through the runtime.""" + + intent: str + agent_id: str + outcomes: list[StepOutcome] = field(default_factory=list) + stopped_early: bool = False + + @property + def permitted_count(self) -> int: + return sum(1 for o in self.outcomes if o.permitted) + + @property + def denied_count(self) -> int: + return sum(1 for o in self.outcomes if not o.permitted) + + def outputs(self) -> list[Any]: + """Outputs of the permitted steps, in order — used for reproducibility checks.""" + return [o.result.output for o in self.outcomes if o.permitted] + + +class AgentRuntime: + """Sequences a plan through the kernel gate. Owns no authority itself.""" + + def __init__( + self, + agent: Entity, + gate: CallGate, + tools: ToolRegistry, + planner: Planner, + run_log: RunLog | None = None, + min_epoch: int = 0, + ) -> None: + self._agent = agent + self._gate = gate + self._tools = tools + self._planner = planner + self._run_log = run_log or RunLog() + self._min_epoch = min_epoch + + @property + def run_log(self) -> RunLog: + return self._run_log + + def run(self, intent: str) -> RunResult: + """Plan the intent, then gate-execute each step until done or denied.""" + steps = self._planner.plan(intent) + outcomes: list[StepOutcome] = [] + stopped_early = False + + for index, step in enumerate(steps): + result = self._execute_step(index, step) + outcomes.append(StepOutcome(step=step, result=result)) + if not result.permitted: + # A denied step halts the plan: later steps may depend on it, and + # continuing would let an agent "probe" past a denial. + stopped_early = index < len(steps) - 1 + break + + return RunResult( + intent=intent, agent_id=self._agent.name, + outcomes=outcomes, stopped_early=stopped_early, + ) + + def _execute_step(self, index: int, step: PlanStep) -> GateResult: + """Build the typed Action for a step and run it through the gate.""" + try: + tool = self._tools.get(step.tool) + except KeyError as exc: + # A planner (or prompt-injected plan) naming an unknown tool is a + # denial, not a crash — there is no capability to grant for it. + result = GateResult(permitted=False, + denied_reason=f"unknown tool: {exc}", tool_name=step.tool) + self._log(index, step, result) + return result + + action = self._build_action(index, step, tool) + result = self._gate.execute(action, tool.name, step.args) + self._log(index, step, result) + return result + + def _build_action(self, index: int, step: PlanStep, tool: Tool) -> Action: + """Turn a plan step into a typed capability request for the tool's resource.""" + reads = [tool.resource] if tool.mode == "read" else [] + writes = [tool.resource] if tool.mode == "write" else [] + return Action( + action_id=f"{self._agent.name}-step{index}-{tool.name}", + actor=self._agent, + description=step.rationale, + resources_read=reads, + resources_write=writes, + argument=json.dumps(step.args, sort_keys=True, default=str), + min_epoch=self._min_epoch, + ) + + def _log(self, index: int, step: PlanStep, result: GateResult) -> None: + self._run_log.record( + agent_id=self._agent.name, step_index=index, + tool=step.tool, args=step.args, + permitted=result.permitted, output=result.output, + denied_reason=result.denied_reason, + ) + + +def build_runtime( + intent_planner: Planner, + sandbox_root: Path, + granted_tools: list[str] | None = None, + audit_log: AuditLog | None = None, + run_log: RunLog | None = None, + freeze: bool = False, +) -> tuple[AgentRuntime, OwnershipRegistry]: + """Wire a single-agent runtime for demos/tests. + + Creates a human owner, a machine agent it owns, and delegates read claims for + each tool in `granted_tools` (default: all). Tools NOT granted will be denied + by the gate when the planner tries to use them — exactly the MVP behavior. + + Returns (runtime, registry). The registry is returned so callers can revoke + claims / advance epochs and observe the gate react (when freeze=False). + """ + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + tools = build_default_tools(sandbox_root) + granted = set(tools.names() if granted_tools is None else granted_tools) + + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + for tool in tools: + if tool.name not in granted: + continue + # Owner asserts authority over the resource, then delegates read to the agent. + registry.add_claim(RightsClaim(owner, tool.resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, tool.resource, can_read=True), + delegated_by=owner) + + audit = audit_log if audit_log is not None else AuditLog() + verifier = FreedomVerifier(registry, audit_log=audit, freeze=freeze) + gate = CallGate(verifier) + for tool in tools: + gate.register(tool.name, tool.fn) + + runtime = AgentRuntime(agent=agent, gate=gate, tools=tools, + planner=intent_planner, run_log=run_log) + return runtime, registry diff --git a/src/authgate/runtime/planner.py b/src/authgate/runtime/planner.py new file mode 100644 index 0000000..9ba932e --- /dev/null +++ b/src/authgate/runtime/planner.py @@ -0,0 +1,100 @@ +""" +Agent runtime planner — the deterministic LLM stand-in (non-TCB). + +A planner turns a natural-language intent into an ordered list of tool-call +steps. In production this is where an LLM would sit; here we use a fixed, +rule-based mock so the MVP is fully reproducible and testable. Real LLM +planners slot in behind the same `Planner` protocol without touching callers. + +This module is NOT part of the trusted computing base: a plan is only a +*request*. The kernel still gates every step, so a planner may legitimately +emit steps the agent lacks the capability to run (see `ScriptedPlanner`). +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Any, Protocol + + +@dataclass(frozen=True) +class PlanStep: + tool: str # tool name, e.g. "calculator" + args: dict[str, Any] = field(default_factory=dict) # kwargs for the tool fn + rationale: str = "" # short human-readable why (optional) + + +class Planner(Protocol): + def plan(self, intent: str) -> list[PlanStep]: ... + + +class ScriptedPlanner: + """Returns a pinned plan verbatim, ignoring the intent. + + Tests/demos use this to assert an exact step sequence — including plans that + deliberately contain a step the agent is not capable of, to exercise the + kernel's gating. + """ + + def __init__(self, steps: list[PlanStep]) -> None: + self._steps = list(steps) + + def plan(self, intent: str) -> list[PlanStep]: + # Copy so callers cannot mutate the pinned script through the returned list. + return list(self._steps) + + +# Trigger keywords kept as data so the rule order is obvious at a glance. +_ARITHMETIC_WORDS = ("calculate", "compute", "math", "sum", "plus", "times") +_SEARCH_WORDS = ("search", "find", "look up", "lookup", "what is") +_FILE_WORDS = ("read", "open", "file", "cat ") + +# A bare arithmetic expression (e.g. "2 + 3 * 4") is itself a calculation trigger, +# and the same regex extracts the expression to hand to the calculator tool. +_EXPR_RE = re.compile(r"[\d][\d+\-*/%.()\s]*[\d)]") + + +class MockPlanner: + """Deterministic, rule-based planner — a reproducible LLM substitute. + + Same intent always yields the same plan (hard MVP requirement). Rules are + applied in a fixed order; each appends a step when its trigger matches. + """ + + def plan(self, intent: str) -> list[PlanStep]: + text = intent.lower() + steps: list[PlanStep] = [] + self._maybe_arithmetic(text, steps) + self._maybe_search(text, intent, steps) + self._maybe_file(text, steps) + return steps + + def _maybe_arithmetic(self, text: str, steps: list[PlanStep]) -> None: + match = _EXPR_RE.search(text) + if not any(w in text for w in _ARITHMETIC_WORDS) and not match: + return + # Keyword without an actual expression still means "calculate something". + expression = match.group().strip() if match else "0" + steps.append(PlanStep("calculator", {"expression": expression}, + "intent asked for a calculation")) + + def _maybe_search(self, text: str, intent: str, steps: list[PlanStep]) -> None: + if any(w in text for w in _SEARCH_WORDS): + # Preserve original casing in the query; only matching is lowercased. + steps.append(PlanStep("web_search", {"query": intent.strip()}, + "intent asked to search")) + + def _maybe_file(self, text: str, steps: list[PlanStep]) -> None: + if any(w in text for w in _FILE_WORDS): + filename = self._extract_filename(text) + steps.append(PlanStep("file_read", {"filename": filename}, + "intent asked to read a file")) + + @staticmethod + def _extract_filename(text: str) -> str: + # A real filename-ish token carries a '.' or '/'; otherwise fall back so + # the plan is still actionable rather than empty. + for token in text.split(): + if "." in token or "/" in token: + return token + return "notes.txt" diff --git a/src/authgate/runtime/run_log.py b/src/authgate/runtime/run_log.py new file mode 100644 index 0000000..fda23d2 --- /dev/null +++ b/src/authgate/runtime/run_log.py @@ -0,0 +1,65 @@ +""" +RunLog — append-only execution trace for the agent runtime (non-TCB). + +This is the human/operator-facing record of *what the runtime did*: for each +plan step, the decision (permit/deny) and, when permitted, the tool output. It +complements — does not replace — the kernel's hash-chained `AuditLog`: + + * kernel AuditLog : tamper-evident record of every *verification decision* + (the security-critical integrity log). + * RunLog : operational trace including *tool results*, for debugging + and reproducibility checks. + +Format: one JSON object per line (.jsonl), append-only. Tool output is stored as +a truncated string so the trace never becomes an exfiltration channel for large +payloads and stays cheap to write. +""" +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from typing import Any + +_MAX_OUTPUT_CHARS = 2000 # keep traces bounded; full output lives in RunResult + + +@dataclass +class RunLog: + """Append-only jsonl trace of executed steps. path=None keeps it in memory.""" + + path: str | None = None + _entries: list[dict[str, Any]] = field(default_factory=list, init=False, repr=False) + + def record( + self, + agent_id: str, + step_index: int, + tool: str, + args: dict[str, Any], + permitted: bool, + output: Any = None, + denied_reason: str | None = None, + ) -> None: + """Append one step outcome. Never raises on policy denial.""" + entry = { + "ts": time.time(), + "agent_id": agent_id, + "step": step_index, + "tool": tool, + "args": args, + "decision": "permit" if permitted else "deny", + "output": (str(output)[:_MAX_OUTPUT_CHARS] if permitted else None), + "denied_reason": denied_reason, + } + self._entries.append(entry) + if self.path is not None: + with open(self.path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + + def entries(self) -> list[dict[str, Any]]: + """Snapshot of all recorded entries.""" + return list(self._entries) + + def __len__(self) -> int: + return len(self._entries) diff --git a/src/authgate/runtime/tools.py b/src/authgate/runtime/tools.py new file mode 100644 index 0000000..169b522 --- /dev/null +++ b/src/authgate/runtime/tools.py @@ -0,0 +1,224 @@ +""" +Agent runtime tools — the non-TCB action layer. + +These tools are NOT part of the trusted computing base. They are the concrete +side effects an agent can request (compute, file read, web search). Every tool +declares the capability (Resource + mode) it requires so the kernel can gate +invocation; the tools themselves perform the actual work once a capability check +has passed elsewhere. + +Defense in depth: `file_read` enforces its OWN sandbox boundary in addition to +the capability gate. Even if a capability were mis-issued, the tool still refuses +to read outside `sandbox_root`. Tools therefore assume they may be called with +hostile arguments and validate accordingly. +""" +from __future__ import annotations + +import ast +import operator +from collections.abc import Callable +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from authgate.kernel.entities import Resource, ResourceType + +_VALID_MODES = {"read", "write"} + + +@dataclass(frozen=True) +class Tool: + name: str + fn: Callable[..., Any] + resource: Resource # the capability this tool requires + mode: str # "read" or "write" + + def __post_init__(self) -> None: + if self.mode not in _VALID_MODES: + raise ValueError(f"mode must be one of {sorted(_VALID_MODES)}, got {self.mode!r}") + + +class ToolRegistry: + """Name-keyed collection of tools. Not a security boundary — just a lookup.""" + + def __init__(self) -> None: + self._tools: dict[str, Tool] = {} + + def register(self, tool: Tool) -> Tool: + self._tools[tool.name] = tool + return tool + + def get(self, name: str) -> Tool: + if name not in self._tools: + raise KeyError(f"unknown tool {name!r}; available: {self.names()}") + return self._tools[name] + + def names(self) -> list[str]: + return sorted(self._tools) + + def __iter__(self): + return iter(self._tools.values()) + + def __contains__(self, name: str) -> bool: + return name in self._tools + + +# --- calculator ------------------------------------------------------------ + +# Explicit allow-list of operators; anything absent here is rejected by default, +# which is the safe failure mode for evaluating untrusted expressions. +_BIN_OPS: dict[type[ast.operator], Callable[[Any, Any], Any]] = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod, + ast.Pow: operator.pow, +} +_UNARY_OPS: dict[type[ast.unaryop], Callable[[Any], Any]] = { + ast.UAdd: operator.pos, + ast.USub: operator.neg, +} + +# Resource-exhaustion bounds. Rejecting names/calls is not enough: pure arithmetic +# can still be a denial-of-service. `2**2**2**2**2**2` is valid arithmetic that asks +# for a number with ~10^19728 digits and hangs the process. These caps keep every +# accepted expression cheap to evaluate; anything past them is refused, not computed. +_MAX_EXPR_LEN = 256 # reject pathological inputs before they reach the parser +_MAX_POW_EXPONENT = 1000 # `a ** b` with |b| above this is refused (no giant ints) +_MAX_RESULT_BITS = 8192 # cap any intermediate integer's magnitude (~2466 digits) + + +def _check_pow(exponent: Any) -> None: + """Refuse exponents large enough to make `**` build an astronomically large int.""" + if isinstance(exponent, int) and abs(exponent) > _MAX_POW_EXPONENT: + raise ValueError(f"unsafe expression: exponent {exponent} exceeds {_MAX_POW_EXPONENT}") + + +def _check_magnitude(value: Any) -> None: + """Refuse intermediate ints whose magnitude could exhaust memory/CPU downstream.""" + if isinstance(value, int) and value.bit_length() > _MAX_RESULT_BITS: + raise ValueError("unsafe expression: intermediate result too large") + + +def _eval_node(node: ast.AST) -> Any: + """Recursively evaluate only the arithmetic AST nodes we allow.""" + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _BIN_OPS: + left = _eval_node(node.left) + right = _eval_node(node.right) + if type(node.op) is ast.Pow: + _check_pow(right) + result = _BIN_OPS[type(node.op)](left, right) + _check_magnitude(result) + return result + if isinstance(node, ast.UnaryOp) and type(node.op) in _UNARY_OPS: + return _UNARY_OPS[type(node.op)](_eval_node(node.operand)) + raise ValueError(f"unsafe expression: {ast.dump(node)}") + + +def calculate(expression: str) -> str: + """Evaluate an arithmetic expression without eval(); names/calls are rejected. + + The tool's contract is total: it either returns a numeric string or raises + ValueError. Resource-exhaustion and numeric-domain failures (overflow, + divide-by-zero) are normalized to ValueError so the CallGate sees a clean + denial rather than a hang or an uncaught error type. + """ + if len(expression) > _MAX_EXPR_LEN: + raise ValueError(f"unsafe expression: too long ({len(expression)} > {_MAX_EXPR_LEN})") + try: + tree = ast.parse(expression, mode="eval") + except (SyntaxError, ValueError, MemoryError, RecursionError) as e: + raise ValueError(f"unsafe expression: {e}") from e + try: + return str(_eval_node(tree.body)) + except (OverflowError, ZeroDivisionError) as e: + raise ValueError(f"unsafe expression: {e}") from e + + +# --- file_read ------------------------------------------------------------- + +# Windows reinterprets these basenames as DEVICES regardless of directory or +# extension: 'CON', 'CON.txt', and 'sub/NUL' all open a device, not a file. Such a +# path passes the sandbox containment check (it resolves nominally *inside* root) +# yet escapes it at open() time — 'CON' blocks forever waiting on console input +# (DoS), 'NUL'/'COMn'/'LPTn' touch real devices. So they must be refused by name. +_WINDOWS_RESERVED_DEVICES = frozenset({ + "CON", "PRN", "AUX", "NUL", + *(f"COM{i}" for i in range(1, 10)), + *(f"LPT{i}" for i in range(1, 10)), +}) + + +def _is_reserved_device(path: Path) -> bool: + """True if any path component is a Windows reserved device name.""" + for part in path.parts: + if part.split(".")[0].strip().upper() in _WINDOWS_RESERVED_DEVICES: + return True + return False + + +def _make_read_file(sandbox_root: Path) -> Callable[[str], str]: + """Build a reader bound to one sandbox root; the closure is the boundary.""" + root = sandbox_root.resolve() + + def read_file(filename: str) -> str: + # Resolve THEN verify containment so ../ traversal and absolute paths + # both collapse to a path we can prefix-check against the real root. + target = (root / filename).resolve() + if root != target and root not in target.parents: + raise PermissionError(f"path escapes sandbox: {filename}") + # Containment is necessary but not sufficient on Windows: a reserved + # device name resolves inside root yet opens a device, not a file. + if _is_reserved_device(Path(filename)) or _is_reserved_device(target): + raise PermissionError(f"reserved device name refused: {filename}") + return target.read_text(encoding="utf-8") + + return read_file + + +# --- web_search ------------------------------------------------------------ + +# Deterministic mock: same query always yields the same string (MVP requires +# reproducibility, and we do not make real network calls). +_CANNED_ANSWERS: dict[str, str] = { + "what is a capability": ( + "A capability is an unforgeable token that both names a resource and " + "grants authority to use it." + ), + "authgate": ( + "authgate is a capability-constrained authorization kernel that gates " + "agent tool execution." + ), +} + + +def web_search(query: str) -> str: + key = query.strip().lower() + if key in _CANNED_ANSWERS: + return _CANNED_ANSWERS[key] + return f"[mock] no indexed results for {query!r}" + + +# --- default registry ------------------------------------------------------ + +def build_default_tools(sandbox_root: Path) -> ToolRegistry: + """Registry of the 3 MVP tools, each wired to the capability it requires.""" + registry = ToolRegistry() + registry.register(Tool( + name="calculator", fn=calculate, + resource=Resource("compute", ResourceType.COMPUTE_SLOT), mode="read", + )) + registry.register(Tool( + name="file_read", fn=_make_read_file(sandbox_root), + resource=Resource("sandbox-fs", ResourceType.FILE, scope=str(sandbox_root)), + mode="read", + )) + registry.register(Tool( + name="web_search", fn=web_search, + resource=Resource("web", ResourceType.NETWORK_ENDPOINT), mode="read", + )) + return registry diff --git a/tests/test_runtime_agent.py b/tests/test_runtime_agent.py new file mode 100644 index 0000000..91b0137 --- /dev/null +++ b/tests/test_runtime_agent.py @@ -0,0 +1,173 @@ +"""Integration tests for authgate.runtime.agent — AgentRuntime + build_runtime.""" +from __future__ import annotations + +import pytest + +from authgate.kernel.audit import AuditLog +from authgate.runtime.agent import build_runtime +from authgate.runtime.planner import MockPlanner, PlanStep, ScriptedPlanner + + +@pytest.fixture +def sandbox(tmp_path): + (tmp_path / "a.txt").write_text("FILE CONTENT", encoding="utf-8") + return tmp_path + + +# --- granted tool executes ------------------------------------------------- + +def test_granted_calculator_permits_and_produces_output(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "2 + 2"})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("calc") + + assert result.permitted_count == 1 + assert result.denied_count == 0 + assert result.outputs() == ["4"] + assert result.stopped_early is False + + +def test_granted_execution_records_permit_entry_in_run_log(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "2 + 2"})]) + runtime, _ = build_runtime(planner, sandbox) + runtime.run("calc") + + entries = runtime.run_log.entries() + assert len(entries) == 1 + assert entries[0]["decision"] == "permit" + assert entries[0]["output"] == "4" + assert entries[0]["tool"] == "calculator" + + +def test_granted_file_read_returns_real_content(sandbox): + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("read a.txt") + assert result.outputs() == ["FILE CONTENT"] + + +# --- ungranted tool denied; tool body never runs --------------------------- + +def test_ungranted_tool_is_denied_and_body_never_runs(sandbox): + # file_read NOT granted -> gate denies before the tool fn executes. + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox, granted_tools=["calculator"]) + result = runtime.run("read a.txt") + + outcome = result.outcomes[0] + assert outcome.permitted is False + assert outcome.result.output is None # tool body never produced output + assert "capability" in outcome.result.denied_reason + assert result.denied_count == 1 + + +def test_ungranted_denial_logged_with_no_output(sandbox): + planner = ScriptedPlanner([PlanStep("file_read", {"filename": "a.txt"})]) + runtime, _ = build_runtime(planner, sandbox, granted_tools=["calculator"]) + runtime.run("x") + entry = runtime.run_log.entries()[0] + assert entry["decision"] == "deny" + assert entry["output"] is None + + +# --- denied step stops execution ------------------------------------------- + +def test_denied_step_halts_remaining_plan(sandbox): + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), # granted + PlanStep("file_read", {"filename": "a.txt"}), # NOT granted -> deny + PlanStep("web_search", {"query": "authgate"}), # must never run + ] + runtime, _ = build_runtime( + ScriptedPlanner(plan), sandbox, + granted_tools=["calculator", "web_search"], + ) + result = runtime.run("multi") + + assert len(result.outcomes) == 2 # third step never executed + assert result.stopped_early is True + assert result.permitted_count == 1 + assert result.denied_count == 1 + # web_search never appears in the run log + logged_tools = [e["tool"] for e in runtime.run_log.entries()] + assert "web_search" not in logged_tools + + +def test_denied_last_step_does_not_set_stopped_early(sandbox): + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), # granted + PlanStep("file_read", {"filename": "a.txt"}), # denied, but is last + ] + runtime, _ = build_runtime( + ScriptedPlanner(plan), sandbox, granted_tools=["calculator"] + ) + result = runtime.run("two") + assert len(result.outcomes) == 2 + assert result.stopped_early is False # nothing after the denial + + +# --- reproducibility ------------------------------------------------------- + +def test_two_fresh_runtimes_produce_equal_outputs(sandbox): + intent = "calculate 2 + 2 and search authgate" + runtime_a, _ = build_runtime(MockPlanner(), sandbox) + runtime_b, _ = build_runtime(MockPlanner(), sandbox) + assert runtime_a.run(intent).outputs() == runtime_b.run(intent).outputs() + + +def test_repeated_run_same_runtime_is_reproducible(sandbox): + intent = "search authgate" + runtime, _ = build_runtime(MockPlanner(), sandbox) + assert runtime.run(intent).outputs() == runtime.run(intent).outputs() + + +# --- unknown tool ---------------------------------------------------------- + +def test_unknown_tool_is_denied_without_crash(sandbox): + planner = ScriptedPlanner([PlanStep("nonexistent_tool", {})]) + runtime, _ = build_runtime(planner, sandbox) + result = runtime.run("x") + + outcome = result.outcomes[0] + assert outcome.permitted is False + assert "unknown tool" in outcome.result.denied_reason + assert result.denied_count == 1 + + +# --- revocation ------------------------------------------------------------ + +def test_revocation_flips_permit_to_deny(sandbox): + planner = ScriptedPlanner([PlanStep("calculator", {"expression": "3 + 3"})]) + runtime, registry = build_runtime(planner, sandbox, freeze=False) + + first = runtime.run("calc") + assert first.outcomes[0].permitted is True + + registry.revoke_all("agent-1") + + second = runtime.run("calc") + assert second.outcomes[0].permitted is False + + +# --- audit ----------------------------------------------------------------- + +def test_audit_chain_is_valid_after_run(sandbox): + audit = AuditLog() + planner = MockPlanner() + runtime, _ = build_runtime(planner, sandbox, audit_log=audit) + runtime.run("calculate 2 + 2 and search authgate") + + assert len(audit) > 0 + assert audit.verify_chain() is True + + +def test_audit_records_one_entry_per_step(sandbox): + audit = AuditLog() + plan = [ + PlanStep("calculator", {"expression": "1 + 1"}), + PlanStep("web_search", {"query": "authgate"}), + ] + runtime, _ = build_runtime(ScriptedPlanner(plan), sandbox, audit_log=audit) + runtime.run("two steps") + assert len(audit) == 2 + assert audit.verify_chain() is True diff --git a/tests/test_runtime_planner.py b/tests/test_runtime_planner.py new file mode 100644 index 0000000..741a13d --- /dev/null +++ b/tests/test_runtime_planner.py @@ -0,0 +1,149 @@ +"""Unit tests for authgate.runtime.planner — MockPlanner rules and ScriptedPlanner.""" +from __future__ import annotations + +import pytest + +from authgate.runtime.planner import MockPlanner, PlanStep, ScriptedPlanner + + +@pytest.fixture +def planner(): + return MockPlanner() + + +def _tools(steps): + return [s.tool for s in steps] + + +# --- determinism ----------------------------------------------------------- + +def test_mock_planner_is_deterministic(planner): + intent = "calculate 2 + 2 and search authgate and read notes.txt" + plans = [planner.plan(intent) for _ in range(3)] + assert plans[0] == plans[1] == plans[2] + + +# --- arithmetic rule ------------------------------------------------------- + +def test_arithmetic_keyword_triggers_calculator(planner): + steps = planner.plan("please compute the result") + assert _tools(steps) == ["calculator"] + + +def test_bare_expression_triggers_calculator_and_extracts_it(planner): + steps = planner.plan("2 + 3 * 4") + assert steps[0].tool == "calculator" + assert steps[0].args == {"expression": "2 + 3 * 4"} + + +def test_arithmetic_keyword_with_expression_extracts_expression(planner): + steps = planner.plan("calculate 5 * 5") + assert steps[0].args == {"expression": "5 * 5"} + + +def test_arithmetic_keyword_without_expression_falls_back_to_zero(planner): + steps = planner.plan("compute the sum") + assert steps[0].tool == "calculator" + assert steps[0].args == {"expression": "0"} + + +# --- search rule ----------------------------------------------------------- + +@pytest.mark.parametrize("word", ["search", "find", "look up", "lookup", "what is"]) +def test_search_keywords_trigger_web_search(planner, word): + steps = planner.plan(f"{word} something about cats") + assert "web_search" in _tools(steps) + + +def test_search_query_preserves_original_casing(planner): + intent = "Find AuthGate Documentation" + steps = planner.plan(intent) + search = next(s for s in steps if s.tool == "web_search") + assert search.args == {"query": "Find AuthGate Documentation"} + + +def test_search_query_is_stripped(planner): + steps = planner.plan(" search for kernels ") + search = next(s for s in steps if s.tool == "web_search") + assert search.args == {"query": "search for kernels"} + + +# --- file rule ------------------------------------------------------------- + +@pytest.mark.parametrize("word", ["read", "open", "file", "cat "]) +def test_file_keywords_trigger_file_read(planner, word): + steps = planner.plan(f"{word} something") + assert "file_read" in _tools(steps) + + +def test_filename_extraction_picks_dotted_token(planner): + steps = planner.plan("read notes.txt now") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "notes.txt"} + + +def test_filename_extraction_picks_slashed_token(planner): + steps = planner.plan("cat data/log.csv") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "data/log.csv"} + + +def test_filename_extraction_falls_back_when_no_filename_token(planner): + steps = planner.plan("open the file") + fr = next(s for s in steps if s.tool == "file_read") + assert fr.args == {"filename": "notes.txt"} + + +# --- multi-rule ordering --------------------------------------------------- + +def test_multi_rule_intent_orders_arith_search_file(planner): + intent = "compute 5*5 and search authgate and read a.txt" + steps = planner.plan(intent) + assert _tools(steps) == ["calculator", "web_search", "file_read"] + + +def test_multi_rule_intent_args_are_correct(planner): + intent = "compute 5*5 and search authgate and read a.txt" + steps = planner.plan(intent) + assert steps[0].args == {"expression": "5*5"} + assert steps[1].args == {"query": intent} + assert steps[2].args == {"filename": "a.txt"} + + +# --- no match -------------------------------------------------------------- + +def test_no_match_yields_empty_plan(planner): + assert planner.plan("hello world good morning") == [] + + +# --- ScriptedPlanner ------------------------------------------------------- + +def test_scripted_planner_returns_equal_but_distinct_list(): + steps = [PlanStep("calculator", {"expression": "1 + 1"})] + sp = ScriptedPlanner(steps) + out = sp.plan("ignored intent") + assert out == steps + assert out is not steps + + +def test_scripted_planner_ignores_intent(): + sp = ScriptedPlanner([PlanStep("web_search", {"query": "x"})]) + assert sp.plan("intent A") == sp.plan("completely different intent B") + + +def test_scripted_planner_mutating_returned_list_does_not_affect_source(): + steps = [PlanStep("calculator", {"expression": "1 + 1"})] + sp = ScriptedPlanner(steps) + out = sp.plan("x") + out.append(PlanStep("web_search", {"query": "extra"})) + assert len(sp.plan("x")) == 1 + + +def test_scripted_planner_preserves_exact_step_sequence(): + plan = [ + PlanStep("calculator", {"expression": "2+2"}), + PlanStep("file_read", {"filename": "a.txt"}), + PlanStep("web_search", {"query": "authgate"}), + ] + sp = ScriptedPlanner(plan) + assert sp.plan("anything") == plan diff --git a/tests/test_runtime_redteam.py b/tests/test_runtime_redteam.py new file mode 100644 index 0000000..45dc869 --- /dev/null +++ b/tests/test_runtime_redteam.py @@ -0,0 +1,101 @@ +""" +Pytest wrapper around the standalone runtime red-team harness. + +This asserts the security property the harness exists to prove: against 1000 +deterministically-generated adversarial engineers, the capability-gated runtime +holds — ZERO escapes. Any escape is a hard failure with a helpful listing. + +The heavy lifting (attacks, sandbox fixtures, determinism) lives in +``redteam/runtime_redteam.py``; this file imports it and asserts the report. +``tests/conftest`` is not relied upon for path setup — the harness wires ``src`` +onto sys.path itself, and we add the repo root so ``redteam`` is importable. +""" +from __future__ import annotations + +import importlib.util +import os +import sys +from pathlib import Path + +import pytest + +# The runtime layer's backend under test is the pure-Python kernel. +os.environ.setdefault("AUTHGATE_BACKEND", "python") + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SRC = _REPO_ROOT / "src" +for _p in (str(_SRC), str(_REPO_ROOT)): + if _p not in sys.path: + sys.path.insert(0, _p) + + +def _load_harness(): + """Import the standalone harness by file path (it lives outside any package).""" + harness_path = _REPO_ROOT / "redteam" / "runtime_redteam.py" + spec = importlib.util.spec_from_file_location("runtime_redteam", harness_path) + assert spec and spec.loader, f"cannot load harness at {harness_path}" + module = importlib.util.module_from_spec(spec) + # Register before exec: @dataclass resolves field types via + # sys.modules[cls.__module__], which is None until the module is registered. + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +_harness = _load_harness() + + +@pytest.fixture(scope="module") +def report(): + return _harness.run_redteam(1000, master_seed=1337) + + +def test_no_escapes(report): + """The headline guarantee: every one of the 1000 attacks is blocked.""" + if report.escapes: + lines = [ + f" #{e.profile.id} {e.profile.name} [{e.category}] " + f"({e.profile.skill} {e.profile.archetype}): {e.detail}" + for e in report.escapes + ] + pytest.fail( + f"{len(report.escapes)} ESCAPE(S) — the runtime FAILED to resist:\n" + + "\n".join(lines) + ) + assert report.escapes == [] + + +def test_total_is_1000(report): + assert report.total == 1000 + assert report.blocked == 1000 + + +def test_every_category_fully_blocked(report): + """Each attack category must be exercised (total>0) and fully held.""" + assert set(report.by_category) == set(_harness.ATTACK_CATEGORIES) + for category, (blocked, total) in report.by_category.items(): + assert total > 0, f"category {category} had no engineers" + assert blocked == total, ( + f"category {category}: only {blocked}/{total} blocked" + ) + + +def test_determinism(): + """Fixed seed -> identical report (same totals and per-category breakdown).""" + a = _harness.run_redteam(200, master_seed=2024) + b = _harness.run_redteam(200, master_seed=2024) + assert a.total == b.total + assert a.blocked == b.blocked + assert a.by_category == b.by_category + assert [r.profile.id for r in a.escapes] == [r.profile.id for r in b.escapes] + + +def test_no_real_file_leaked(report): + """Belt-and-suspenders: sandbox-escape attacks must never surface real OS + file content. The harness already asserts this per-attack; here we re-derive + the guarantee from the report's success and the leak markers it checks.""" + sandbox_blocked, sandbox_total = report.by_category["SANDBOX_ESCAPE"] + assert sandbox_total > 0 + assert sandbox_blocked == sandbox_total + # The markers the harness scans for include 'root:' (the /etc/passwd tell). + assert "root:" in str(_harness._LEAK_MARKERS).lower() diff --git a/tests/test_runtime_run_log.py b/tests/test_runtime_run_log.py new file mode 100644 index 0000000..b9dc8a4 --- /dev/null +++ b/tests/test_runtime_run_log.py @@ -0,0 +1,98 @@ +"""Unit tests for authgate.runtime.run_log.RunLog.""" +from __future__ import annotations + +import json + +from authgate.runtime.run_log import RunLog + + +def test_record_permit_entry_shape(): + log = RunLog() + log.record("agent-1", 0, "calculator", {"expression": "1+1"}, True, output="2") + entry = log.entries()[0] + assert entry["agent_id"] == "agent-1" + assert entry["step"] == 0 + assert entry["tool"] == "calculator" + assert entry["args"] == {"expression": "1+1"} + assert entry["decision"] == "permit" + assert entry["output"] == "2" + assert entry["denied_reason"] is None + assert "ts" in entry + + +def test_record_deny_entry_shape(): + log = RunLog() + log.record( + "agent-1", 1, "file_read", {"filename": "a.txt"}, False, + output="should be dropped", denied_reason="capability gate denied", + ) + entry = log.entries()[0] + assert entry["decision"] == "deny" + assert entry["output"] is None # output suppressed on denial + assert entry["denied_reason"] == "capability gate denied" + + +def test_record_output_is_stringified(): + log = RunLog() + log.record("a", 0, "calculator", {}, True, output=42) + assert log.entries()[0]["output"] == "42" + + +def test_record_output_truncated_at_2000_chars(): + log = RunLog() + big = "x" * 5000 + log.record("a", 0, "tool", {}, True, output=big) + assert len(log.entries()[0]["output"]) == 2000 + + +def test_record_output_under_limit_not_truncated(): + log = RunLog() + payload = "y" * 1999 + log.record("a", 0, "tool", {}, True, output=payload) + assert log.entries()[0]["output"] == payload + + +def test_entries_returns_copy(): + log = RunLog() + log.record("a", 0, "tool", {}, True, output="ok") + snapshot = log.entries() + snapshot.append({"injected": True}) + assert len(log.entries()) == 1 # internal state unaffected + + +def test_len_tracks_record_count(): + log = RunLog() + assert len(log) == 0 + log.record("a", 0, "tool", {}, True, output="ok") + log.record("a", 1, "tool", {}, False, denied_reason="no") + assert len(log) == 2 + + +def test_path_mode_writes_valid_jsonl(tmp_path): + log_path = tmp_path / "run.jsonl" + log = RunLog(path=str(log_path)) + log.record("a", 0, "calculator", {"expression": "1+1"}, True, output="2") + log.record("a", 1, "file_read", {"filename": "x"}, False, denied_reason="denied") + + lines = log_path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 2 + parsed = [json.loads(line) for line in lines] + assert parsed[0]["decision"] == "permit" + assert parsed[0]["output"] == "2" + assert parsed[1]["decision"] == "deny" + assert parsed[1]["output"] is None + assert parsed[1]["denied_reason"] == "denied" + + +def test_path_mode_appends_across_records(tmp_path): + log_path = tmp_path / "run.jsonl" + log = RunLog(path=str(log_path)) + for i in range(3): + log.record("a", i, "tool", {}, True, output=str(i)) + assert len(log_path.read_text(encoding="utf-8").splitlines()) == 3 + + +def test_memory_mode_creates_no_file(tmp_path): + log = RunLog() # path=None + log.record("a", 0, "tool", {}, True, output="ok") + assert list(tmp_path.iterdir()) == [] diff --git a/tests/test_runtime_tools.py b/tests/test_runtime_tools.py new file mode 100644 index 0000000..a5726bf --- /dev/null +++ b/tests/test_runtime_tools.py @@ -0,0 +1,297 @@ +"""Unit tests for authgate.runtime.tools — calculator, web_search, file_read, registry.""" +from __future__ import annotations + +import pytest + +from authgate.kernel.entities import Resource, ResourceType +from authgate.runtime.tools import ( + Tool, + ToolRegistry, + build_default_tools, + calculate, + web_search, +) + +# --- calculator: happy path ------------------------------------------------ + +def test_calculate_respects_operator_precedence(): + assert calculate("2 + 3 * 4") == "14" + + +def test_calculate_parentheses_override_precedence(): + assert calculate("(2 + 3) * 4") == "20" + + +def test_calculate_floats(): + assert calculate("3.5 * 2") == "7.0" + + +def test_calculate_floor_division(): + assert calculate("7 // 2") == "3" + + +def test_calculate_modulo(): + assert calculate("7 % 3") == "1" + + +def test_calculate_power(): + assert calculate("2 ** 3") == "8" + + +def test_calculate_unary_minus(): + assert calculate("-5") == "-5" + + +def test_calculate_unary_plus(): + assert calculate("+5") == "5" + + +def test_calculate_true_division_is_float(): + assert calculate("6 / 4") == "1.5" + + +def test_calculate_nested_expression(): + assert calculate("2 + (3 * (4 - 1))") == "11" + + +# --- calculator: failure / hostile input ----------------------------------- + +@pytest.mark.parametrize( + "expr", + [ + "__import__('os')", # function call + name + "os.system('x')", # attribute access + call + "abs(-1)", # function call + "foo", # bare name + "a.b", # attribute access + "1 < 2", # comparison + "1 == 1", # comparison + "", # empty string + "1 +", # trailing junk / syntax error + "[1, 2, 3]", # list literal + "1 if True else 2", # conditional expression + ], +) +def test_calculate_rejects_unsafe_expressions(expr): + with pytest.raises(ValueError): + calculate(expr) + + +# --- calculator: resource-exhaustion (DoS) bounds -------------------------- +# These are valid arithmetic that, unbounded, hang the process building giant +# integers. They must be REFUSED (ValueError), not computed. A regression here +# would hang the test run, so each must return essentially instantly. + +@pytest.mark.parametrize( + "expr", + [ + "2 ** 2 ** 2 ** 2 ** 2 ** 2", # ~10^19728 digits: the classic pow-bomb + "9 ** 9 ** 9", # right-assoc tower + "10 ** 100000", # single huge exponent + "1000 ** 1000", # magnitude blows the result-bits cap + ], +) +def test_calculate_rejects_resource_exhaustion(expr): + with pytest.raises(ValueError): + calculate(expr) + + +def test_calculate_rejects_overlong_expression(): + with pytest.raises(ValueError, match="too long"): + calculate("(" * 200 + "1" + ")" * 200) + + +def test_calculate_normalizes_divide_by_zero_to_valueerror(): + with pytest.raises(ValueError): + calculate("1 / 0") + + +def test_calculate_allows_reasonable_power(): + # Just under the exponent cap must still compute fine and fast. + assert calculate("2 ** 10") == "1024" + + +# --- web_search ------------------------------------------------------------ + +def test_web_search_is_deterministic_for_same_query(): + assert web_search("some arbitrary query") == web_search("some arbitrary query") + + +def test_web_search_canned_authgate_answer(): + result = web_search("authgate") + assert "capability-constrained authorization kernel" in result + + +def test_web_search_canned_capability_answer(): + result = web_search("what is a capability") + assert "unforgeable token" in result + + +def test_web_search_canned_is_case_and_space_insensitive(): + baseline = web_search("authgate") + assert web_search("AuthGate") == baseline + assert web_search(" AUTHGATE ") == baseline + + +def test_web_search_unknown_query_returns_mock_marker(): + query = "zzz totally unindexed thing" + result = web_search(query) + assert result == f"[mock] no indexed results for {query!r}" + + +def test_web_search_distinct_queries_differ(): + assert web_search("authgate") != web_search("what is a capability") + + +# --- file_read ------------------------------------------------------------- + +def _file_read_fn(sandbox_root): + return build_default_tools(sandbox_root).get("file_read").fn + + +def test_file_read_reads_file_inside_sandbox(tmp_path): + (tmp_path / "hello.txt").write_text("hello world", encoding="utf-8") + read_file = _file_read_fn(tmp_path) + assert read_file("hello.txt") == "hello world" + + +def test_file_read_reads_benign_subdirectory(tmp_path): + sub = tmp_path / "sub" + sub.mkdir() + (sub / "nested.txt").write_text("nested content", encoding="utf-8") + read_file = _file_read_fn(tmp_path) + assert read_file("sub/nested.txt") == "nested content" + + +def test_file_read_rejects_parent_traversal(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("../etc") + + +def test_file_read_rejects_deep_parent_traversal(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("../../secret") + + +def test_file_read_rejects_posix_absolute_path(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("/etc/passwd") + + +def test_file_read_rejects_windows_absolute_path(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="escapes sandbox"): + read_file("C:\\Windows\\win.ini") + + +def test_file_read_missing_file_inside_sandbox_raises_filenotfound(tmp_path): + read_file = _file_read_fn(tmp_path) + with pytest.raises(FileNotFoundError): + read_file("does_not_exist.txt") + + +@pytest.mark.parametrize( + "name", + ["CON", "NUL", "PRN", "AUX", "COM1", "LPT1", "CON.txt", "sub/NUL"], +) +def test_file_read_rejects_windows_reserved_devices(tmp_path, name): + # These resolve inside the sandbox but open a device on Windows ('CON' blocks + # forever = DoS). Must be refused by name on every platform for portability. + read_file = _file_read_fn(tmp_path) + with pytest.raises(PermissionError, match="reserved device"): + read_file(name) + + +# --- Tool dataclass -------------------------------------------------------- + +def _dummy_resource(): + return Resource("compute", ResourceType.COMPUTE_SLOT) + + +def test_tool_accepts_read_mode(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="read") + assert tool.mode == "read" + + +def test_tool_accepts_write_mode(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="write") + assert tool.mode == "write" + + +def test_tool_rejects_invalid_mode(): + with pytest.raises(ValueError, match="mode must be one of"): + Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="execute") + + +def test_tool_is_frozen(): + tool = Tool(name="t", fn=lambda: None, resource=_dummy_resource(), mode="read") + with pytest.raises(Exception): + tool.name = "other" # type: ignore[misc] + + +# --- ToolRegistry ---------------------------------------------------------- + +def _make_tool(name): + return Tool(name=name, fn=lambda: name, resource=_dummy_resource(), mode="read") + + +def test_registry_register_returns_tool(): + registry = ToolRegistry() + tool = _make_tool("alpha") + assert registry.register(tool) is tool + + +def test_registry_get_returns_registered_tool(): + registry = ToolRegistry() + tool = _make_tool("alpha") + registry.register(tool) + assert registry.get("alpha") is tool + + +def test_registry_get_missing_raises_keyerror(): + registry = ToolRegistry() + with pytest.raises(KeyError): + registry.get("nope") + + +def test_registry_names_are_sorted(): + registry = ToolRegistry() + registry.register(_make_tool("gamma")) + registry.register(_make_tool("alpha")) + registry.register(_make_tool("beta")) + assert registry.names() == ["alpha", "beta", "gamma"] + + +def test_registry_contains(): + registry = ToolRegistry() + registry.register(_make_tool("alpha")) + assert "alpha" in registry + assert "missing" not in registry + + +def test_registry_iter_yields_tools(): + registry = ToolRegistry() + a, b = _make_tool("a"), _make_tool("b") + registry.register(a) + registry.register(b) + assert set(registry) == {a, b} + + +# --- build_default_tools --------------------------------------------------- + +def test_build_default_tools_has_three_named_tools(tmp_path): + registry = build_default_tools(tmp_path) + assert registry.names() == ["calculator", "file_read", "web_search"] + + +def test_build_default_tools_all_read_mode(tmp_path): + registry = build_default_tools(tmp_path) + assert all(tool.mode == "read" for tool in registry) + + +def test_build_default_tools_calculator_fn_works(tmp_path): + registry = build_default_tools(tmp_path) + assert registry.get("calculator").fn("1 + 1") == "2" From dbeb9c54f08e035c2d512de9c9f42a0cbfad3b81 Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Wed, 10 Jun 2026 18:50:32 +0300 Subject: [PATCH 4/7] feat(runtime): decide on the verified Rust TCB + real process sandbox Closes the two structural gaps between "proven" and "running": 1. Runtime authorization now runs on the verified engine. RustBackedVerifier serializes the Python registry+action to the kernel JSON wire and calls authgate_kernel.verify_json -> crate::engine::verify (the Kani/Lean-verified engine), returning an ed25519-signed verdict. JSON is the only boundary, so no Rust pyclass objects enter Python and the dual-Entity TypeError that forced AUTHGATE_BACKEND=python cannot occur. Epoch revocation (absent from the wire) is preserved by serializing only currently-valid claims. build_runtime(backend="python"|"rust"|"auto"); default stays python so environments without the built extension are unaffected. 2. Tools run in a real sandbox, not an in-process check. SandboxPolicy executes each tool in an isolated subprocess with a wall-clock deadline + output cap (all platforms) and opt-in POSIX rlimits (cpu/memory/file-size; off by default to avoid interpreter-startup footguns). A hanging/runaway/crashing tool is killed and reaped -> clean denial. Verification: 1000-engineer red team is green on BOTH backends (0 escapes), including epoch-revocation and audit-tamper. Full suite 1297 passed, 1 skipped (POSIX-rlimit test on Windows); ruff + mypy clean. Benchmarks: native Rust verify ~0.8us (10-claim); Python verifier ~38us/decision; verified engine via JSON+ed25519 ~544us/decision (~14x, sub-ms, JSON+signing dominated). See redteam/bench_verify.py. The PyO3 extension is built separately (cargo --lib); the runtime falls back to the pure-Python verifier wherever it is absent (CI included), keeping CI green. README updated with the runtime, backends, sandbox, red team, and benchmarks. Co-Authored-By: Claude Opus 4.8 --- README.md | 80 +++++++++++- redteam/bench_verify.py | 66 ++++++++++ redteam/runtime_redteam.py | 22 +++- src/authgate/runtime/__init__.py | 7 + src/authgate/runtime/_sandbox_runner.py | 101 +++++++++++++++ src/authgate/runtime/agent.py | 42 +++++- src/authgate/runtime/rust_backend.py | 163 ++++++++++++++++++++++++ src/authgate/runtime/sandbox.py | 160 +++++++++++++++++++++++ tests/test_runtime_redteam.py | 19 +++ tests/test_runtime_rust_backend.py | 95 ++++++++++++++ tests/test_runtime_sandbox.py | 109 ++++++++++++++++ 11 files changed, 857 insertions(+), 7 deletions(-) create mode 100644 redteam/bench_verify.py create mode 100644 src/authgate/runtime/_sandbox_runner.py create mode 100644 src/authgate/runtime/rust_backend.py create mode 100644 src/authgate/runtime/sandbox.py create mode 100644 tests/test_runtime_rust_backend.py create mode 100644 tests/test_runtime_sandbox.py diff --git a/README.md b/README.md index 6ef6429..e527da1 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,8 @@ A wire format and a verify function. See [POSITIONING.md](POSITIONING.md). [![CI](https://github.com/Aliipou/authgate-kernel/actions/workflows/ci.yml/badge.svg)](https://github.com/Aliipou/authgate-kernel/actions) [![Rust](https://img.shields.io/badge/kernel-Rust-orange.svg)](authgate-kernel/) -[![Tests](https://img.shields.io/badge/tests-1155%20passing-brightgreen.svg)](tests/) +[![Tests](https://img.shields.io/badge/tests-1297%20passing-brightgreen.svg)](tests/) +[![Red team](https://img.shields.io/badge/red--team-1000%20engineers%2C%200%20escapes-brightgreen.svg)](redteam/) [![Kani](https://img.shields.io/badge/Kani-24%20harnesses-green.svg)](formal/) [![Lean4](https://img.shields.io/badge/Lean4-16%20theorems-blue.svg)](formal/lean4/) [![License: PolyForm Noncommercial 1.0.0](https://img.shields.io/badge/License-PolyForm--Noncommercial--1.0.0-orange.svg)](LICENSE) @@ -59,6 +60,79 @@ This is the same principle as capability-based OS security (seL4, CHERI), applie --- +## Agent runtime (operational layer) + +`src/authgate/runtime/` is a minimal, working agent runtime that puts the gate on +the real path — **not** part of the TCB: + +``` +intent → Planner → [PlanStep] → runtime loop → verify(action) → sandboxed tool → RunLog + AuditLog + │ deny → stop, nothing further runs +``` + +Single agent, 3 tools (calculator, file_read, web_search), deterministic planner. +A denied step halts the plan. Two things make it more than a demo: + +**1. It can run on the *verified* kernel.** `build_runtime(..., backend="rust")` +routes every permit/deny decision into the Kani/Lean-verified Rust engine +(`engine::verify`) and returns an ed25519-signed verdict. The boundary is **JSON** +(`verify_json`) — no Rust objects enter Python, so the running system and the +proven code are finally the same code for the decision that matters. `backend= +"python"` (default) uses the pure-Python reference verifier; `"auto"` picks Rust +when the extension is built. Epoch-revocation semantics (which the wire format +does not model) are preserved by serializing only currently-valid claims. + +**2. Tools run in a real sandbox.** `SandboxPolicy` executes each tool in an +isolated subprocess under a wall-clock deadline and output cap (every platform), +plus opt-in POSIX rlimits (CPU/memory/file-size). A tool that hangs, crashes, or +runs away is *killed and reaped*, surfacing as a clean denial — not an in-process +prefix check. + +### Adversarial verification — 1000 engineers + +`redteam/runtime_redteam.py` synthesizes 1000 deterministic adversarial +"engineers" across 10 attack classes (sandbox escape, capability forgery, +calculator injection, plan injection, revocation/epoch bypass, denial probing, +audit tampering, replay-after-deny, argument fuzzing, identity spoofing). The bar +is **zero escapes** — any one is a hard failure. + +| Backend | Result | +|---|---| +| Pure-Python verifier | 1000/1000 blocked, 0 escapes | +| **Verified Rust engine** (via JSON wire) | 1000/1000 blocked, 0 escapes | + +Two real vulnerabilities were found and fixed by this harness: an unbounded-`**` +calculator DoS (a `2**2**2**2**2**2` that hangs the process) and a Windows +reserved-device-name sandbox bypass (`CON` blocks forever; `NUL`/`COMn` open +devices). Both are now regression-covered. + +### Benchmark — cost of a verified decision + +`verify()` latency per gated tool call (`redteam/bench_verify.py`): + +| Path | Latency | Throughput | +|---|---|---| +| Pure-Python `FreedomVerifier` | ~38 µs/decision | ~26,000 /s | +| Verified Rust engine (JSON wire + ed25519 sign) | ~544 µs/decision | ~1,800 /s | + +Routing through the verified engine costs ~14× (JSON marshalling + per-call +signing) but stays sub-millisecond — a sound default for an agent runtime. + +### Building the verified extension + +The Rust kernel exposes a PyO3 module (`authgate_kernel`). Build it with: + +```bash +# ASCII build dir (the C toolchain mangles non-ASCII paths); GNU toolchain. +CARGO_TARGET_DIR=/tmp/akbuild cargo build --lib --release +# copy the cdylib next to the package as authgate_kernel.{pyd,so} +``` + +The runtime falls back to the pure-Python verifier wherever the extension is not +built (CI included), so the runtime tests are green with or without it. + +--- + ## What it does NOT do | Not this | Why | @@ -343,8 +417,10 @@ The gap between `Permit/Deny` and actual constrained execution: | Gap | Status | What closes it | |---|---|---| +| **Runtime decides on the verified TCB** | **Done** (`backend="rust"`) | Runtime routes verify() into `engine::verify` over the JSON wire; 1000-engineer red team green on the Rust backend | +| **Real tool sandbox** (process isolation) | **Done** (`SandboxPolicy`) | Subprocess + wall-clock deadline + output cap everywhere; opt-in POSIX rlimits. Kills hangs/runaways/crashes | | **WASM sandbox** (`cargo build --features sandbox`) | Blocked: Windows SDK kernel32.lib missing | Install Windows SDK 10.0.22621 or build on Linux | -| **OS-level confinement** (seccomp-bpf) | Not implemented | Wrap tool subprocess with seccomp filter | +| **OS-level confinement** (seccomp-bpf, network jail) | Partial: process isolation + rlimits done; no syscall/network jail | seccomp filter / namespaces / WASM around the tool subprocess | | **End-to-end integration test** | **Done** (`tests/test_integration_e2e.py`) | 18 assertions: tool call → gate → audit chain | | **TLC model checker** | Java not installed | `java -jar tla2tools.jar -tool MC_AuthGateV3` | | **CLI** | Exists; not packaged | `pip install authgate-kernel` | diff --git a/redteam/bench_verify.py b/redteam/bench_verify.py new file mode 100644 index 0000000..7a64e35 --- /dev/null +++ b/redteam/bench_verify.py @@ -0,0 +1,66 @@ +""" +Benchmark: cost of an authorization decision, pure-Python verifier vs the +verified Rust engine reached over the JSON wire (RustBackedVerifier). + +This is the decision-relevant number for the runtime: every gated tool call pays +one verify(). It answers "what does routing through the verified TCB cost per +call?" Run: AUTHGATE_BACKEND=python python redteam/bench_verify.py +""" +from __future__ import annotations + +import os +import sys +import time +from pathlib import Path + +os.environ.setdefault("AUTHGATE_BACKEND", "python") +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from authgate.kernel.entities import ( # noqa: E402 + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry # noqa: E402 +from authgate.kernel.verifier import Action, FreedomVerifier # noqa: E402 +from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available # noqa: E402 + + +def _scenario(): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + registry.add_claim(RightsClaim(owner, resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, resource, can_read=True), delegated_by=owner) + return registry, Action("t1", agent, resources_read=[resource]) + + +def _bench(label: str, verify, action, n: int) -> None: + verify(action) # warm up + t0 = time.perf_counter() + for _ in range(n): + verify(action) + elapsed = time.perf_counter() - t0 + per = elapsed / n * 1e6 # microseconds per decision + print(f" {label:32} {per:9.2f} us/decision {n/elapsed:12,.0f} decisions/s") + + +def main() -> int: + registry, action = _scenario() + n = 20000 + print(f"verify() latency over {n:,} permitted decisions (AUTHGATE_BACKEND=" + f"{os.environ.get('AUTHGATE_BACKEND')}):") + _bench("pure-Python FreedomVerifier", FreedomVerifier(registry).verify, action, n) + if rust_backend_available(): + _bench("verified Rust engine (JSON wire)", RustBackedVerifier(registry).verify, action, n) + else: + print(" verified Rust engine: SKIPPED (authgate_kernel extension not built)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/redteam/runtime_redteam.py b/redteam/runtime_redteam.py index 485df21..05a4642 100644 --- a/redteam/runtime_redteam.py +++ b/redteam/runtime_redteam.py @@ -60,10 +60,19 @@ AgentRuntime, PlanStep, ScriptedPlanner, - build_runtime, ) +from authgate.runtime import build_runtime as _build_runtime_impl # noqa: E402 from authgate.runtime.tools import calculate # noqa: E402 +# Which authorization engine every attack runs against. run_redteam() sets this; +# wrapping build_runtime here means all attacks pick it up without per-call edits. +_ACTIVE_BACKEND = "python" + + +def build_runtime(*args: object, **kwargs: object): # noqa: ANN002, ANN003 + kwargs.setdefault("backend", _ACTIVE_BACKEND) + return _build_runtime_impl(*args, **kwargs) + # Sentinel that must NEVER appear in any returned output: it is the marker of a # real system-file read leaking through the sandbox. _LEAK_MARKERS = ("root:", "[boot loader]", "win.ini", "/bin/bash", "daemon:") @@ -539,8 +548,15 @@ def _run_one(profile: EngineerProfile) -> AttackResult: blocked=blocked, detail=detail) -def run_redteam(n: int = 1000, master_seed: int = 1337) -> RedTeamReport: - """Run n engineers against the runtime and return a deterministic report.""" +def run_redteam(n: int = 1000, master_seed: int = 1337, backend: str = "python") -> RedTeamReport: + """Run n engineers against the runtime and return a deterministic report. + + backend: which authorization engine to attack — "python" (reference) or + "rust" (the formally-verified engine via the JSON wire). Same attacks, same + pass/fail bar: ZERO escapes regardless of which engine decides. + """ + global _ACTIVE_BACKEND + _ACTIVE_BACKEND = backend profiles = _make_profiles(n, master_seed) results = [_run_one(p) for p in profiles] diff --git a/src/authgate/runtime/__init__.py b/src/authgate/runtime/__init__.py index 244db0b..c559e28 100644 --- a/src/authgate/runtime/__init__.py +++ b/src/authgate/runtime/__init__.py @@ -28,6 +28,8 @@ from authgate.runtime.agent import AgentRuntime, RunResult, StepOutcome, build_runtime from authgate.runtime.planner import MockPlanner, Planner, PlanStep, ScriptedPlanner from authgate.runtime.run_log import RunLog +from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available +from authgate.runtime.sandbox import SandboxPolicy, SandboxResult, run_tool_sandboxed from authgate.runtime.tools import Tool, ToolRegistry, build_default_tools __all__ = [ @@ -43,4 +45,9 @@ "Tool", "ToolRegistry", "build_default_tools", + "SandboxPolicy", + "SandboxResult", + "run_tool_sandboxed", + "RustBackedVerifier", + "rust_backend_available", ] diff --git a/src/authgate/runtime/_sandbox_runner.py b/src/authgate/runtime/_sandbox_runner.py new file mode 100644 index 0000000..941277f --- /dev/null +++ b/src/authgate/runtime/_sandbox_runner.py @@ -0,0 +1,101 @@ +""" +Sandbox child process — runs ONE tool call under OS resource limits, then exits. + +This module is the body of the isolated process spawned by `sandbox.py`. It is +never imported by the runtime in-process; it is executed as +``python -m authgate.runtime._sandbox_runner`` with a JSON job on stdin and a +single JSON result line on stdout (prefixed by a sentinel so tool chatter on +stdout cannot be mistaken for the result). + +Why a separate process at all: in-process input validation (see tools.py) stops +the inputs we thought of. Process isolation stops the ones we didn't — a tool +that hangs, allocates without bound, or crashes the interpreter takes down only +this child, and the parent reaps it and returns a denial. On POSIX the limits are +enforced by the kernel via setrlimit (CPU seconds, address space, file size); on +Windows, where setrlimit does not exist, the parent's wall-clock timeout and +output cap are the enforcement (documented honestly, not pretended otherwise). +""" +from __future__ import annotations + +import importlib +import json +import sys +from pathlib import Path +from typing import Any + +# Sentinel framing: the parent reads the line AFTER this marker as the result, so +# anything a tool or an import writes to stdout before it is harmless noise. +RESULT_MARKER = "\x00AUTHGATE_SANDBOX_RESULT\x00" + + +def _apply_posix_limits(limits: dict[str, int]) -> None: + """Enforce kernel resource limits. No-op where `resource` is unavailable (Windows). + + Attributes are read dynamically so this stays type-clean on platforms whose + stubs lack the POSIX-only `setrlimit`/`RLIMIT_*` names. Each limit is applied + only when explicitly requested (> 0): a too-low RLIMIT_AS can stop the + interpreter from starting and RLIMIT_FSIZE=0 can break .pyc writes, so these + are opt-in hardening, not silent defaults. + """ + try: + import resource # POSIX-only + except ImportError: + return + setrlimit = getattr(resource, "setrlimit", None) + if setrlimit is None: + return + for limit_key, rlimit_name in ( + ("cpu_seconds", "RLIMIT_CPU"), + ("max_memory_bytes", "RLIMIT_AS"), + ("max_file_bytes", "RLIMIT_FSIZE"), + ): + value = int(limits.get(limit_key, 0)) + rlimit = getattr(resource, rlimit_name, None) + if value > 0 and rlimit is not None: + setrlimit(rlimit, (value, value)) + + +def _resolve_tool(job: dict[str, Any]): + """Map a job to the concrete callable. Tool identity comes from the trusted + parent (build_runtime), never from attacker-controlled plan data.""" + builtin = job.get("builtin") + if builtin == "file_read": + # file_read is a closure bound to the sandbox root; rebuild it here. + from authgate.runtime.tools import _make_read_file + return _make_read_file(Path(job["sandbox_root"])) + entry = job.get("entry") + if entry: + module_name, _, func_name = entry.partition(":") + module = importlib.import_module(module_name) + return getattr(module, func_name) + raise ValueError("job names neither a builtin nor an importable entry") + + +def main() -> int: + raw = sys.stdin.read() + try: + job = json.loads(raw) + except json.JSONDecodeError as exc: + print(RESULT_MARKER + json.dumps({"ok": False, "error": f"bad job: {exc}"})) + return 0 + + _apply_posix_limits(job.get("limits", {})) + max_out = int(job.get("limits", {}).get("max_output_bytes", 65536)) + if max_out <= 0: + max_out = 65536 + + try: + fn = _resolve_tool(job) + output = fn(**job.get("args", {})) + text = str(output) + truncated = len(text.encode("utf-8", "replace")) > max_out + result = {"ok": True, "output": text[:max_out], "truncated": truncated} + except Exception as exc: # any tool failure is a denial, surfaced to the parent + result = {"ok": False, "error": f"{type(exc).__name__}: {exc}"} + + print(RESULT_MARKER + json.dumps(result)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/authgate/runtime/agent.py b/src/authgate/runtime/agent.py index 7ec5f1b..dbdfbcb 100644 --- a/src/authgate/runtime/agent.py +++ b/src/authgate/runtime/agent.py @@ -149,6 +149,37 @@ def _log(self, index: int, step: PlanStep, result: GateResult) -> None: ) +def _make_verifier(backend: str, registry: OwnershipRegistry, audit: AuditLog, freeze: bool): + """Select the authorization engine. "python" = reference verifier; "rust" = + the formally-verified Rust engine (requires the built extension); "auto" = + Rust when available, else Python.""" + if backend == "python": + return FreedomVerifier(registry, audit_log=audit, freeze=freeze) + from authgate.runtime.rust_backend import RustBackedVerifier, rust_backend_available + if backend == "rust": + return RustBackedVerifier(registry, audit_log=audit, freeze=freeze) + if backend == "auto": + if rust_backend_available(): + return RustBackedVerifier(registry, audit_log=audit, freeze=freeze) + return FreedomVerifier(registry, audit_log=audit, freeze=freeze) + raise ValueError(f"unknown backend {backend!r}; use 'python', 'rust', or 'auto'") + + +def _make_sandboxed_fn(tool_name: str, sandbox_root: Path, policy: Any): + """Wrap a tool so it executes in an isolated subprocess under `policy`. A + limit breach (timeout/OOM/crash) surfaces as an exception, which the gate + turns into a clean denial — same contract as an in-process tool raising.""" + from authgate.runtime.sandbox import run_tool_sandboxed + + def run(**args: Any) -> Any: + result = run_tool_sandboxed(tool_name, args, sandbox_root, policy) + if result.ok: + return result.output + raise RuntimeError(result.error or "sandbox denied") + + return run + + def build_runtime( intent_planner: Planner, sandbox_root: Path, @@ -156,6 +187,8 @@ def build_runtime( audit_log: AuditLog | None = None, run_log: RunLog | None = None, freeze: bool = False, + backend: str = "python", + sandbox_policy: Any = None, ) -> tuple[AgentRuntime, OwnershipRegistry]: """Wire a single-agent runtime for demos/tests. @@ -163,6 +196,10 @@ def build_runtime( each tool in `granted_tools` (default: all). Tools NOT granted will be denied by the gate when the planner tries to use them — exactly the MVP behavior. + backend: "python" (default), "rust" (verified Rust engine via JSON wire), or + "auto". sandbox_policy: when set (a SandboxPolicy), tools run in an + isolated subprocess under OS resource limits instead of in-process. + Returns (runtime, registry). The registry is returned so callers can revoke claims / advance epochs and observe the gate react (when freeze=False). """ @@ -182,10 +219,11 @@ def build_runtime( delegated_by=owner) audit = audit_log if audit_log is not None else AuditLog() - verifier = FreedomVerifier(registry, audit_log=audit, freeze=freeze) + verifier = _make_verifier(backend, registry, audit, freeze) gate = CallGate(verifier) for tool in tools: - gate.register(tool.name, tool.fn) + fn = _make_sandboxed_fn(tool.name, sandbox_root, sandbox_policy) if sandbox_policy else tool.fn + gate.register(tool.name, fn) runtime = AgentRuntime(agent=agent, gate=gate, tools=tools, planner=intent_planner, run_log=run_log) diff --git a/src/authgate/runtime/rust_backend.py b/src/authgate/runtime/rust_backend.py new file mode 100644 index 0000000..51b0520 --- /dev/null +++ b/src/authgate/runtime/rust_backend.py @@ -0,0 +1,163 @@ +""" +RustBackedVerifier — route the runtime's authorization decision through the +formally-verified Rust TCB engine, without importing Rust pyclass objects. + +Motivation (the honest gap this closes): the Kani/Lean proofs cover +``authgate-kernel/src/engine.rs``. The Python runtime, however, decides with +``authgate.kernel.verifier`` — a *different* implementation. So "the verified +kernel gates every call" and "the code that actually ran" were two codebases. + +This adapter makes the running system's permit/deny decision come from the +verified engine. The trust boundary is **JSON**: the Python registry + action are +serialized to the kernel wire format and handed to ``authgate_kernel.verify_json`` +(which calls ``crate::engine::verify`` and returns an ed25519-signed verdict). +Only JSON crosses — no Rust ``Entity``/``Action`` objects enter the Python +process, so the dual-type ``"Entity cannot be converted to Entity"`` problem +that forced ``AUTHGATE_BACKEND=python`` simply cannot occur here. + +Semantic reconciliation: the wire/engine has no *epoch* concept (the Python +verifier's revocation mechanism). We preserve it by serializing only the claims +the Python registry considers **currently valid** — unexpired, identity-matched, +delegation-chain-valid, and at or above the action's ``min_epoch``. The verified +engine then independently re-derives permit/deny from claim existence, machine +ownership (A4), no-dominion (A6), and the forbidden-flag set. Revocation and +epoch therefore behave identically to the pure-Python path, while the actual +decision is made by verified code. +""" +from __future__ import annotations + +import importlib +import json +from typing import Any + +from authgate.kernel.entities import Entity, Resource, RightsClaim +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action, VerificationResult + + +def rust_backend_available() -> bool: + """True iff the compiled verified-kernel extension can be imported.""" + try: + importlib.import_module("authgate_kernel") + return True + except ImportError: + return False + + +def _entity_wire(e: Entity) -> dict[str, str]: + return {"name": e.name, "kind": e.kind.name} + + +def _resource_wire(r: Resource) -> dict[str, Any]: + return { + "name": r.name, + "rtype": r.rtype.name, + "scope": r.scope, + "is_public": r.is_public, + "ifc_label": r.ifc_label, + } + + +def _claim_wire(c: RightsClaim) -> dict[str, Any]: + return { + "holder": _entity_wire(c.holder), + "resource": _resource_wire(c.resource), + "can_read": c.can_read, + "can_write": c.can_write, + "can_delegate": c.can_delegate, + "confidence": c.confidence, + "expires_at": c.expires_at, + "delegation_depth": 0, + } + + +# Action boolean flags, mapped 1:1 to the wire field names the engine reads. +_FLAG_FIELDS = ( + "increases_machine_sovereignty", + "resists_human_correction", + "bypasses_verifier", + "weakens_verifier", + "disables_corrigibility", + "machine_coalition_dominion", + "coerces", + "deceives", + "self_modification_weakens_verifier", + "machine_coalition_reduces_freedom", +) + + +def _action_wire(action: Action) -> dict[str, Any]: + wire: dict[str, Any] = { + "action_id": action.action_id, + "actor": _entity_wire(action.actor), + "description": action.description, + "resources_read": [_resource_wire(r) for r in action.resources_read], + "resources_write": [_resource_wire(r) for r in action.resources_write], + "resources_delegate": [_resource_wire(r) for r in action.resources_delegate], + "governs_humans": [_entity_wire(h) for h in action.governs_humans], + "argument": action.argument, + "delegation_depth": 0, + } + for flag in _FLAG_FIELDS: + wire[flag] = getattr(action, flag) + return wire + + +def _live_claims(registry: OwnershipRegistry, min_epoch: int) -> list[RightsClaim]: + """The claims the Python registry currently treats as valid — the set the + verified engine should see. This is where epoch/revocation/identity live.""" + live: list[RightsClaim] = [] + for c in registry._claims: + if not c.is_valid(): + continue + if not registry._identity_matches(c.holder): + continue + if not registry._delegation_chain_valid(c): + continue + if c.epoch < min_epoch: + continue + live.append(c) + return live + + +class RustBackedVerifier: + """Drop-in replacement for FreedomVerifier whose decision is made by the + verified Rust engine. Same ``verify(action) -> VerificationResult`` contract, + so CallGate, the audit log, and the runtime are unchanged.""" + + def __init__( + self, + registry: OwnershipRegistry, + audit_log: object = None, + freeze: bool = True, + ) -> None: + # Mirror FreedomVerifier's TOCTOU stance: snapshot unless told otherwise. + self.registry = ( + registry.freeze() if freeze and not getattr(registry, "_frozen", False) else registry + ) + self._audit_log = audit_log + self._ak = importlib.import_module("authgate_kernel") + + def verify(self, action: Action) -> VerificationResult: + registry_wire = { + "claims": [_claim_wire(c) for c in _live_claims(self.registry, action.min_epoch)], + "machine_owners": [ + {"machine": _entity_wire(m), "owner": _entity_wire(o)} + for m, o in self.registry._machine_owners.items() + ], + "trust_domains": [], + } + payload = {"registry": registry_wire, "action": _action_wire(action)} + out = json.loads(self._ak.verify_json(json.dumps(payload))) + + result = VerificationResult( + action_id=out["action_id"], + permitted=out["permitted"], + violations=tuple(out["violations"]), + warnings=tuple(out.get("warnings", ())), + confidence=out.get("confidence", 0.0), + requires_human_arbitration=out.get("requires_human_arbitration", False), + ) + if self._audit_log is not None: + self._audit_log.record(result) # type: ignore[attr-defined] + return result diff --git a/src/authgate/runtime/sandbox.py b/src/authgate/runtime/sandbox.py new file mode 100644 index 0000000..89e19c2 --- /dev/null +++ b/src/authgate/runtime/sandbox.py @@ -0,0 +1,160 @@ +""" +Real, OS-enforced sandbox for tool execution (non-TCB). + +`tools.py` validates the inputs it can foresee. This module contains the inputs +it cannot: it runs each tool in a separate OS process under resource limits and a +wall-clock deadline, so a tool that hangs, allocates without bound, or crashes the +interpreter is *killed by the OS / reaped by the parent* — it cannot take the +runtime down or block it forever. That is the difference between an in-process +prefix check (which the previous "sandbox" was) and actual containment. + +Honest platform scope: + * Always on (every platform): the parent's wall-clock timeout and the output + cap. These reliably kill hangs and bound output everywhere. + * POSIX, opt-in: CPU seconds, address space, and file size as hard kernel + limits (setrlimit in the child). Off by default — a too-tight + RLIMIT_AS/RLIMIT_FSIZE can stop the interpreter starting or writing + .pyc — so callers enable them with a known headroom budget. + * Windows — setrlimit does not exist; the rlimits are no-ops and enforcement is + the wall-clock timeout and output cap. Production confinement targets + Linux (the existing seccomp/WASM executors). + +What this is NOT: it is not a network/syscall jail. It bounds time, memory, and +output and isolates crashes. Network and filesystem confinement beyond the +file_read sandbox root require OS namespaces / seccomp / WASM (tracked elsewhere). +""" +from __future__ import annotations + +import json +import os +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import authgate +from authgate.runtime._sandbox_runner import RESULT_MARKER + +# The child runs `python -m authgate.runtime._sandbox_runner`, so it must be able +# to import `authgate`. Derive the src root from this package's location and put +# it on the child's PYTHONPATH; this works whether or not the package is installed. +_SRC_ROOT = str(Path(authgate.__file__).resolve().parent.parent) + +# Tools expressed as importable entry points (pure functions). file_read is bound +# to a per-runtime sandbox root, so it is dispatched as a builtin (see runner). +_ENTRY_POINTS = { + "calculator": "authgate.runtime.tools:calculate", + "web_search": "authgate.runtime.tools:web_search", +} +_ROOT_BOUND = {"file_read"} + + +@dataclass(frozen=True) +class SandboxPolicy: + """Resource envelope for one tool call. + + Cross-platform, always-on guards: the wall-clock timeout (parent kills the + child) and the output cap. The POSIX rlimits (cpu/memory/file-size) are + opt-in (0 = off) because a too-tight RLIMIT_AS or RLIMIT_FSIZE can prevent + the interpreter from starting or writing .pyc files — turning hardening into + spurious failures. Set them explicitly when running on Linux with a known + headroom budget. They are no-ops on Windows (no `resource` module).""" + + wall_timeout_s: float = 5.0 # parent kills the child past this (all platforms) + cpu_seconds: int = 0 # RLIMIT_CPU (POSIX); 0 = off + max_memory_mb: int = 0 # RLIMIT_AS (POSIX); 0 = off + max_file_bytes: int = 0 # RLIMIT_FSIZE (POSIX); 0 = off + max_output_bytes: int = 64 * 1024 # parent/child cap on returned output size + + def _limits(self) -> dict[str, int]: + return { + "cpu_seconds": self.cpu_seconds, + "max_memory_bytes": self.max_memory_mb * 1024 * 1024, + "max_file_bytes": self.max_file_bytes, + "max_output_bytes": self.max_output_bytes, + } + + +@dataclass(frozen=True) +class SandboxResult: + """Outcome of one sandboxed execution.""" + + ok: bool + output: str | None = None + error: str | None = None + killed: bool = False # True iff a resource/time limit terminated the child + + +def _build_job(tool: str, args: dict[str, Any], sandbox_root: Path, policy: SandboxPolicy) -> dict[str, Any]: + job: dict[str, Any] = {"args": args, "limits": policy._limits()} + if tool in _ROOT_BOUND: + job["builtin"] = tool + job["sandbox_root"] = str(sandbox_root) + elif tool in _ENTRY_POINTS: + job["entry"] = _ENTRY_POINTS[tool] + else: + # An unknown tool never reaches a process; the caller treats it as a denial. + job["entry"] = "" + return job + + +def run_tool_sandboxed( + tool: str, + args: dict[str, Any], + sandbox_root: Path, + policy: SandboxPolicy, + *, + entry_override: str | None = None, + env: dict[str, str] | None = None, +) -> SandboxResult: + """Run one tool call in an isolated child process under `policy`. + + `entry_override` ("module:function") is for tests that need to exercise the + sandbox against a deliberately hostile callable (e.g. one that hangs); the + runtime itself never sets it — tool identity comes from the registry. + """ + job = _build_job(tool, args, sandbox_root, policy) + if entry_override is not None: + job.pop("builtin", None) + job.pop("sandbox_root", None) + job["entry"] = entry_override + + child_env = dict(env) if env is not None else dict(os.environ) + existing_pp = child_env.get("PYTHONPATH", "") + if _SRC_ROOT not in existing_pp.split(os.pathsep): + child_env["PYTHONPATH"] = os.pathsep.join(p for p in (_SRC_ROOT, existing_pp) if p) + # No .pyc writes: keeps an opt-in RLIMIT_FSIZE from tripping on bytecode caching. + child_env.setdefault("PYTHONDONTWRITEBYTECODE", "1") + try: + proc = subprocess.run( + [sys.executable, "-m", "authgate.runtime._sandbox_runner"], + input=json.dumps(job), + capture_output=True, + text=True, + timeout=policy.wall_timeout_s, + env=child_env, + ) + except subprocess.TimeoutExpired: + return SandboxResult(ok=False, error="wall-clock timeout exceeded", killed=True) + + marker_at = proc.stdout.rfind(RESULT_MARKER) + if marker_at == -1: + # No result line: the child was killed before it could report (OOM, CPU + # limit -> SIGXCPU, segfault). That is containment working, not a bug. + detail = (proc.stderr or "").strip()[-200:] + return SandboxResult( + ok=False, + error=f"child terminated without result (rc={proc.returncode}): {detail}", + killed=True, + ) + + payload = proc.stdout[marker_at + len(RESULT_MARKER):].strip() + try: + result = json.loads(payload) + except json.JSONDecodeError as exc: + return SandboxResult(ok=False, error=f"unparseable sandbox result: {exc}", killed=True) + + if result.get("ok"): + return SandboxResult(ok=True, output=result.get("output")) + return SandboxResult(ok=False, error=result.get("error", "tool denied")) diff --git a/tests/test_runtime_redteam.py b/tests/test_runtime_redteam.py index 45dc869..c09bc0c 100644 --- a/tests/test_runtime_redteam.py +++ b/tests/test_runtime_redteam.py @@ -45,6 +45,12 @@ def _load_harness(): _harness = _load_harness() +def _rust_available() -> bool: + """True iff the compiled verified-kernel extension can be imported.""" + from authgate.runtime.rust_backend import rust_backend_available + return rust_backend_available() + + @pytest.fixture(scope="module") def report(): return _harness.run_redteam(1000, master_seed=1337) @@ -99,3 +105,16 @@ def test_no_real_file_leaked(report): assert sandbox_blocked == sandbox_total # The markers the harness scans for include 'root:' (the /etc/passwd tell). assert "root:" in str(_harness._LEAK_MARKERS).lower() + + +@pytest.mark.skipif( + not _rust_available(), + reason="verified Rust extension (authgate_kernel) not built in this environment", +) +def test_no_escapes_on_verified_rust_backend(): + """The same adversaries, but every permit/deny decision is made by the + formally-verified Rust engine. Zero escapes is the bar regardless of backend.""" + report = _harness.run_redteam(1000, master_seed=1337, backend="rust") + assert report.total == 1000 + assert report.blocked == 1000 + assert report.escapes == [] diff --git a/tests/test_runtime_rust_backend.py b/tests/test_runtime_rust_backend.py new file mode 100644 index 0000000..aaee97b --- /dev/null +++ b/tests/test_runtime_rust_backend.py @@ -0,0 +1,95 @@ +""" +Tests for routing the runtime's authorization decision through the verified Rust +engine (RustBackedVerifier). + +Skipped automatically when the compiled `authgate_kernel` extension is not present +(e.g. CI jobs that do not build it). Where it IS present, these assert that the +Rust-backed decision matches the pure-Python verifier across the cases that +matter — including the epoch-revocation semantics the wire format does not model +natively (preserved by the adapter's valid-claim filter). +""" +from __future__ import annotations + +import pytest + +from authgate.runtime.rust_backend import rust_backend_available + +pytestmark = pytest.mark.skipif( + not rust_backend_available(), + reason="verified Rust extension (authgate_kernel) not built in this environment", +) + +from authgate.kernel.entities import ( # noqa: E402 + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry # noqa: E402 +from authgate.kernel.verifier import Action, FreedomVerifier # noqa: E402 +from authgate.runtime.rust_backend import RustBackedVerifier # noqa: E402 + + +def _scenario(grant: bool = True): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.register_machine(agent, owner) + if grant: + registry.add_claim(RightsClaim(owner, resource, can_read=True, can_delegate=True)) + registry.delegate(RightsClaim(agent, resource, can_read=True), delegated_by=owner) + action = Action("t1", agent, resources_read=[resource]) + return registry, action, agent, resource + + +def test_rust_permits_when_granted(): + registry, action, _, _ = _scenario(grant=True) + assert RustBackedVerifier(registry).verify(action).permitted + + +def test_rust_denies_when_not_granted(): + registry, action, _, _ = _scenario(grant=False) + result = RustBackedVerifier(registry).verify(action) + assert not result.permitted + assert result.violations + + +@pytest.mark.parametrize("grant", [True, False]) +def test_rust_decision_matches_python(grant): + registry, action, _, _ = _scenario(grant=grant) + rust = RustBackedVerifier(registry).verify(action).permitted + python = FreedomVerifier(registry).verify(action).permitted + assert rust == python + + +def test_rust_unowned_machine_denied(): + # No register_machine -> A4 ownership violation, decided by the Rust engine. + agent = Entity("orphan", AgentType.MACHINE) + resource = Resource("compute", ResourceType.COMPUTE_SLOT) + registry = OwnershipRegistry() + registry.add_claim(RightsClaim(agent, resource, can_read=True)) + action = Action("t1", agent, resources_read=[resource]) + assert not RustBackedVerifier(registry).verify(action).permitted + + +def test_rust_epoch_revocation_preserved(): + # The wire format has no epoch; the adapter preserves it by filtering claims. + registry, _, agent, resource = _scenario(grant=True) + old_epoch_action = Action("t1", agent, resources_read=[resource], min_epoch=2) + # Claims default to epoch=1, so a min_epoch=2 action must be denied... + assert not RustBackedVerifier(registry, freeze=False).verify(old_epoch_action).permitted + # ...until the registry advances the epoch (reissues claims at epoch 2). + registry.advance_epoch(2) + assert RustBackedVerifier(registry, freeze=False).verify(old_epoch_action).permitted + + +def test_rust_records_to_audit_log(): + from authgate.kernel.audit import AuditLog + + registry, action, _, _ = _scenario(grant=True) + audit = AuditLog() + RustBackedVerifier(registry, audit_log=audit).verify(action) + assert len(audit._records) == 1 + assert audit.verify_chain() is True diff --git a/tests/test_runtime_sandbox.py b/tests/test_runtime_sandbox.py new file mode 100644 index 0000000..72ac8c5 --- /dev/null +++ b/tests/test_runtime_sandbox.py @@ -0,0 +1,109 @@ +""" +Tests for the real, OS-enforced tool sandbox (process isolation + limits). + +These prove the sandbox actually *contains* a tool rather than just checking its +inputs: a normal tool runs and returns; a hostile path is refused; a runaway +(hanging) tool is killed by the wall-clock deadline rather than blocking forever; +oversized output is capped. +""" +from __future__ import annotations + +import os +import time + +import pytest + +from authgate.runtime.sandbox import SandboxPolicy, run_tool_sandboxed + + +def test_sandbox_runs_calculator_in_subprocess(tmp_path): + r = run_tool_sandboxed("calculator", {"expression": "2 + 3 * 4"}, tmp_path, SandboxPolicy()) + assert r.ok + assert r.output == "14" + + +def test_sandbox_reads_file_inside_root(tmp_path): + (tmp_path / "a.txt").write_text("hello", encoding="utf-8") + r = run_tool_sandboxed("file_read", {"filename": "a.txt"}, tmp_path, SandboxPolicy()) + assert r.ok + assert r.output == "hello" + + +def test_sandbox_denies_path_escape(tmp_path): + r = run_tool_sandboxed("file_read", {"filename": "../../etc/passwd"}, tmp_path, SandboxPolicy()) + assert not r.ok + assert "escapes sandbox" in (r.error or "") + + +def test_sandbox_kills_a_hanging_tool(tmp_path): + # A helper module the isolated child can import, whose tool hangs far longer + # than the deadline. Real containment means it is killed, not awaited. + helper_dir = tmp_path / "helpers" + helper_dir.mkdir() + (helper_dir / "hangtool.py").write_text( + "import time\n\ndef hang(seconds=60):\n time.sleep(seconds)\n return 'finished'\n", + encoding="utf-8", + ) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join([str(helper_dir), env.get("PYTHONPATH", "")]) + + t0 = time.time() + r = run_tool_sandboxed( + "x", {"seconds": 30}, tmp_path, + SandboxPolicy(wall_timeout_s=2.0), + entry_override="hangtool:hang", env=env, + ) + elapsed = time.time() - t0 + + assert not r.ok + assert r.killed + assert elapsed < 15, f"sandbox did not kill promptly (took {elapsed:.1f}s)" + + +def test_sandbox_caps_oversized_output(tmp_path): + long_query = "Z" * 5000 # web_search echoes the query back; output would be large + r = run_tool_sandboxed( + "web_search", {"query": long_query}, tmp_path, + SandboxPolicy(max_output_bytes=100), + ) + assert r.ok + assert r.output is not None + assert len(r.output) <= 100 + + +def test_sandbox_tool_error_is_denial_not_crash(tmp_path): + # calculator rejects non-arithmetic; through the sandbox that is a clean deny. + r = run_tool_sandboxed("calculator", {"expression": "__import__('os')"}, tmp_path, SandboxPolicy()) + assert not r.ok + assert r.output is None + assert "ValueError" in (r.error or "") + + +def _has_rlimit() -> bool: + try: + import resource # noqa: F401 + return True + except ImportError: + return False + + +@pytest.mark.skipif(not _has_rlimit(), reason="POSIX rlimits unavailable (Windows)") +def test_sandbox_cpu_limit_kills_busy_tool(tmp_path): + # A CPU-burning tool with no sleeps: only an RLIMIT_CPU (not the wall clock, + # set generously here) can stop it. Proves kernel-enforced CPU limiting. + helper_dir = tmp_path / "helpers" + helper_dir.mkdir() + (helper_dir / "burntool.py").write_text( + "def burn(n=0):\n x = 0\n while True:\n x += 1\n", + encoding="utf-8", + ) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join([str(helper_dir), env.get("PYTHONPATH", "")]) + + r = run_tool_sandboxed( + "x", {}, tmp_path, + SandboxPolicy(wall_timeout_s=30.0, cpu_seconds=1), + entry_override="burntool:burn", env=env, + ) + assert not r.ok + assert r.killed From a35512e29d287500c3fff8b8563491ea06070eec Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Wed, 10 Jun 2026 18:56:14 +0300 Subject: [PATCH 5/7] test(runtime): cover wire serialization + sandbox runner (CI coverage margin) Adds extension-free tests for the JSON-wire marshalling (_entity/_resource/ _claim/_action_wire, _live_claims epoch/revocation filter) and in-process tests for the sandbox child runner. These run without the compiled extension, so they lift CI coverage of rust_backend.py and _sandbox_runner.py and keep the --cov-fail-under=85 gate comfortably satisfied (86.5%). Co-Authored-By: Claude Opus 4.8 --- tests/test_runtime_rust_wire.py | 97 +++++++++++++++++++++++++++++++++ tests/test_runtime_sandbox.py | 57 +++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 tests/test_runtime_rust_wire.py diff --git a/tests/test_runtime_rust_wire.py b/tests/test_runtime_rust_wire.py new file mode 100644 index 0000000..74b39b6 --- /dev/null +++ b/tests/test_runtime_rust_wire.py @@ -0,0 +1,97 @@ +""" +Tests for the pure JSON-wire serialization that feeds the verified Rust engine. + +These need no compiled extension — they exercise the marshalling functions and +the valid-claim filter directly, so they run everywhere (including CI without the +extension) and pin the exact shape the engine consumes. +""" +from __future__ import annotations + +from authgate.kernel.entities import ( + AgentType, + Entity, + Resource, + ResourceType, + RightsClaim, +) +from authgate.kernel.registry import OwnershipRegistry +from authgate.kernel.verifier import Action +from authgate.runtime.rust_backend import ( + _action_wire, + _claim_wire, + _entity_wire, + _live_claims, + _resource_wire, +) + + +def test_entity_wire_maps_kind(): + assert _entity_wire(Entity("a", AgentType.MACHINE)) == {"name": "a", "kind": "MACHINE"} + assert _entity_wire(Entity("op", AgentType.HUMAN))["kind"] == "HUMAN" + + +def test_resource_wire_shape(): + r = Resource("sales", ResourceType.DATASET, scope="/data/", is_public=True) + w = _resource_wire(r) + assert w["name"] == "sales" + assert w["rtype"] == "DATASET" + assert w["scope"] == "/data/" + assert w["is_public"] is True + + +def test_claim_wire_carries_rights_and_confidence(): + c = RightsClaim( + Entity("a", AgentType.MACHINE), + Resource("compute", ResourceType.COMPUTE_SLOT), + can_read=True, can_delegate=True, confidence=0.9, + ) + w = _claim_wire(c) + assert w["can_read"] is True + assert w["can_delegate"] is True + assert w["confidence"] == 0.9 + assert w["holder"]["name"] == "a" + + +def test_action_wire_includes_resources_and_flags(): + agent = Entity("a", AgentType.MACHINE) + res = Resource("compute", ResourceType.COMPUTE_SLOT) + action = Action("act1", agent, resources_read=[res], coerces=True) + w = _action_wire(action) + assert w["action_id"] == "act1" + assert w["actor"]["name"] == "a" + assert len(w["resources_read"]) == 1 + assert w["coerces"] is True + assert w["bypasses_verifier"] is False + + +def _registry_with_claim(epoch: int = 1): + owner = Entity("operator", AgentType.HUMAN) + agent = Entity("agent-1", AgentType.MACHINE) + res = Resource("compute", ResourceType.COMPUTE_SLOT) + reg = OwnershipRegistry() + reg.register_machine(agent, owner) + reg.add_claim(RightsClaim(owner, res, can_read=True, can_delegate=True)) + reg.delegate(RightsClaim(agent, res, can_read=True), delegated_by=owner) + return reg + + +def test_live_claims_includes_valid_claims(): + reg = _registry_with_claim() + live = _live_claims(reg, min_epoch=0) + # owner's claim + delegated agent claim, both valid at epoch 0 + assert len(live) == 2 + + +def test_live_claims_filters_by_epoch(): + reg = _registry_with_claim() + # claims default to epoch 1; requiring epoch 2 filters them all out + assert _live_claims(reg, min_epoch=2) == [] + reg.advance_epoch(2) + assert len(_live_claims(reg, min_epoch=2)) == 2 + + +def test_live_claims_excludes_revoked(): + reg = _registry_with_claim() + reg.revoke_all("agent-1") + holders = {c.holder.name for c in _live_claims(reg, min_epoch=0)} + assert "agent-1" not in holders diff --git a/tests/test_runtime_sandbox.py b/tests/test_runtime_sandbox.py index 72ac8c5..bc88465 100644 --- a/tests/test_runtime_sandbox.py +++ b/tests/test_runtime_sandbox.py @@ -8,13 +8,70 @@ """ from __future__ import annotations +import io +import json import os import time import pytest +from authgate.runtime import _sandbox_runner as runner from authgate.runtime.sandbox import SandboxPolicy, run_tool_sandboxed +# --- sandbox child runner (in-process, so coverage sees it) ---------------- + +def _run_job(monkeypatch, capsys, job: dict) -> dict: + monkeypatch.setattr("sys.stdin", io.StringIO(json.dumps(job))) + rc = runner.main() + assert rc == 0 + out = capsys.readouterr().out + return json.loads(out.split(runner.RESULT_MARKER)[-1].strip()) + + +def test_runner_executes_importable_entry(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:calculate", + "args": {"expression": "6 * 7"}, "limits": {}, + }) + assert res["ok"] and res["output"] == "42" + + +def test_runner_executes_file_read_builtin(tmp_path, monkeypatch, capsys): + (tmp_path / "n.txt").write_text("inside", encoding="utf-8") + res = _run_job(monkeypatch, capsys, { + "builtin": "file_read", "sandbox_root": str(tmp_path), + "args": {"filename": "n.txt"}, "limits": {}, + }) + assert res["ok"] and res["output"] == "inside" + + +def test_runner_reports_tool_error_as_not_ok(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:calculate", + "args": {"expression": "open('x')"}, "limits": {}, + }) + assert not res["ok"] and "ValueError" in res["error"] + + +def test_runner_rejects_jobless_request(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, {"args": {}, "limits": {}}) + assert not res["ok"] + + +def test_runner_handles_bad_json(monkeypatch, capsys): + monkeypatch.setattr("sys.stdin", io.StringIO("{not json")) + runner.main() + out = capsys.readouterr().out + assert not json.loads(out.split(runner.RESULT_MARKER)[-1].strip())["ok"] + + +def test_runner_truncates_output(monkeypatch, capsys): + res = _run_job(monkeypatch, capsys, { + "entry": "authgate.runtime.tools:web_search", + "args": {"query": "Z" * 4000}, "limits": {"max_output_bytes": 50}, + }) + assert res["ok"] and len(res["output"]) <= 50 and res["truncated"] + def test_sandbox_runs_calculator_in_subprocess(tmp_path): r = run_tool_sandboxed("calculator", {"expression": "2 + 3 * 4"}, tmp_path, SandboxPolicy()) From a79b7cea38b08f5f3c4e8b90bf477c7b66effd31 Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Wed, 10 Jun 2026 19:27:23 +0300 Subject: [PATCH 6/7] fix(test): make windows-absolute-path file_read test cross-platform MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On POSIX, 'C:\Windows\win.ini' is not a drive-absolute path but a nonexistent in-sandbox filename, so file_read raises FileNotFoundError, not PermissionError — both mean "refused, no real file read". Accept either so the test passes on Linux CI as well as Windows. (Was the only red check on PR #3.) Co-Authored-By: Claude Opus 4.8 --- tests/test_runtime_tools.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_runtime_tools.py b/tests/test_runtime_tools.py index a5726bf..b13a53a 100644 --- a/tests/test_runtime_tools.py +++ b/tests/test_runtime_tools.py @@ -182,8 +182,11 @@ def test_file_read_rejects_posix_absolute_path(tmp_path): def test_file_read_rejects_windows_absolute_path(tmp_path): + # On Windows this is a drive-absolute path that escapes the sandbox + # (PermissionError). On POSIX, 'C:\\...' is merely a nonexistent in-sandbox + # filename (FileNotFoundError). Either way, no real file is read. read_file = _file_read_fn(tmp_path) - with pytest.raises(PermissionError, match="escapes sandbox"): + with pytest.raises((PermissionError, FileNotFoundError)): read_file("C:\\Windows\\win.ini") From e3a2833e65a4afd8ccf6f55ee7f5842675036080 Mon Sep 17 00:00:00 2001 From: Ali Pourrahim Date: Wed, 10 Jun 2026 19:50:24 +0300 Subject: [PATCH 7/7] docs: de-overclaim the runtime README sections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the claims match what is actually true: - Verified-Rust backend is OPT-IN and not run by CI (default + CI = Python); drop "the running and proven code are finally the same code". - Sandbox is OPT-IN (off by default) and is NOT a syscall/network jail; reword "Tools run in a real sandbox" -> "Tools can run in a real sandbox (opt-in)". - Red team: add explicit scope caveat — self-authored attacks on 3 MVP tools, evidence the known vectors are closed, not a proof of security or a substitute for external review. - Benchmarks: mark as single-machine, order-of-magnitude, not a spec. - Engineering Gaps: change "Done" rows to honest "opt-in / not run by CI". - Badge scoped to "runtime red team — 1000 cases" (not next to formal badges). Co-Authored-By: Claude Opus 4.8 --- README.md | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index c141f07..8fdd8c6 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ A wire format and a verify function. See [POSITIONING.md](POSITIONING.md). [![CI](https://github.com/Aliipou/authgate-kernel/actions/workflows/ci.yml/badge.svg)](https://github.com/Aliipou/authgate-kernel/actions) [![Rust](https://img.shields.io/badge/kernel-Rust-orange.svg)](authgate-kernel/) [![Tests](https://img.shields.io/badge/tests-1297%20passing-brightgreen.svg)](tests/) -[![Red team](https://img.shields.io/badge/red--team-1000%20engineers%2C%200%20escapes-brightgreen.svg)](redteam/) +[![Runtime red team](https://img.shields.io/badge/runtime%20red--team-1000%20cases%2C%200%20escapes-brightgreen.svg)](redteam/) [![Kani](https://img.shields.io/badge/Kani-24%20harnesses-green.svg)](formal/) [![Lean4](https://img.shields.io/badge/Lean4-16%20theorems-blue.svg)](formal/lean4/) [![License: PolyForm Noncommercial 1.0.0](https://img.shields.io/badge/License-PolyForm--Noncommercial--1.0.0-orange.svg)](LICENSE) @@ -81,20 +81,23 @@ intent → Planner → [PlanStep] → runtime loop → verify(action) → sandbo Single agent, 3 tools (calculator, file_read, web_search), deterministic planner. A denied step halts the plan. Two things make it more than a demo: -**1. It can run on the *verified* kernel.** `build_runtime(..., backend="rust")` -routes every permit/deny decision into the Kani/Lean-verified Rust engine -(`engine::verify`) and returns an ed25519-signed verdict. The boundary is **JSON** -(`verify_json`) — no Rust objects enter Python, so the running system and the -proven code are finally the same code for the decision that matters. `backend= -"python"` (default) uses the pure-Python reference verifier; `"auto"` picks Rust -when the extension is built. Epoch-revocation semantics (which the wire format -does not model) are preserved by serializing only currently-valid claims. - -**2. Tools run in a real sandbox.** `SandboxPolicy` executes each tool in an -isolated subprocess under a wall-clock deadline and output cap (every platform), -plus opt-in POSIX rlimits (CPU/memory/file-size). A tool that hangs, crashes, or -runs away is *killed and reaped*, surfacing as a clean denial — not an in-process -prefix check. +**1. It can run on the *verified* kernel (opt-in).** `build_runtime(backend="rust")` +routes each permit/deny decision into the Kani/Lean-verified Rust engine +(`engine::verify`) over a **JSON** boundary (`verify_json`), returning an +ed25519-signed verdict — no Rust objects enter Python. When that backend is +selected, the decision is made by the verified code instead of the Python +reimplementation. **The default is `backend="python"`** (the pure-Python +reference verifier), and that is the path CI runs — the Rust backend requires the +compiled extension (below) and is **not** exercised by CI. `"auto"` picks Rust +only when the extension is importable. Epoch-revocation semantics (absent from the +wire format) are preserved by serializing only currently-valid claims. + +**2. Tools can run in a real sandbox (opt-in).** Pass a `SandboxPolicy` and each +tool executes in an isolated subprocess under a wall-clock deadline and output cap +(every platform), plus opt-in POSIX rlimits (CPU/memory/file-size). A tool that +hangs, crashes, or runs away is *killed and reaped* → clean denial, rather than an +in-process check. It bounds time/memory/output and isolates crashes; it is **not** +a network or syscall jail, and it is **off by default** (`sandbox_policy=None`). ### Adversarial verification — 1000 engineers @@ -114,6 +117,11 @@ calculator DoS (a `2**2**2**2**2**2` that hangs the process) and a Windows reserved-device-name sandbox bypass (`CON` blocks forever; `NUL`/`COMn` open devices). Both are now regression-covered. +**Scope, honestly:** these are *self-authored* attack classes against the 3 MVP +tools. A green result is evidence that the *known* vectors are closed on both +backends — it is **not** a proof of security and **not** a substitute for +independent external review (still an open milestone). + ### Benchmark — cost of a verified decision `verify()` latency per gated tool call (`redteam/bench_verify.py`): @@ -124,7 +132,8 @@ devices). Both are now regression-covered. | Verified Rust engine (JSON wire + ed25519 sign) | ~544 µs/decision | ~1,800 /s | Routing through the verified engine costs ~14× (JSON marshalling + per-call -signing) but stays sub-millisecond — a sound default for an agent runtime. +signing) but stays sub-millisecond. Figures from one machine (Windows, CPython +3.13); treat as order-of-magnitude, not a spec. ### Building the verified extension @@ -425,8 +434,8 @@ The gap between `Permit/Deny` and actual constrained execution: | Gap | Status | What closes it | |---|---|---| -| **Runtime decides on the verified TCB** | **Done** (`backend="rust"`) | Runtime routes verify() into `engine::verify` over the JSON wire; 1000-engineer red team green on the Rust backend | -| **Real tool sandbox** (process isolation) | **Done** (`SandboxPolicy`) | Subprocess + wall-clock deadline + output cap everywhere; opt-in POSIX rlimits. Kills hangs/runaways/crashes | +| **Runtime decides on the verified TCB** | Opt-in (`backend="rust"`); needs the built extension; **not run by CI** (CI uses Python) | A CI job that builds the extension and runs the runtime suite on the Rust backend | +| **Real tool sandbox** (process isolation) | Opt-in (`SandboxPolicy`, off by default); bounds time/memory/output + isolates crashes; **not** a syscall/network jail | seccomp / namespaces / WASM for syscall + network confinement | | **WASM sandbox** (`cargo build --features sandbox`) | Blocked: Windows SDK kernel32.lib missing | Install Windows SDK 10.0.22621 or build on Linux | | **OS-level confinement** (seccomp-bpf, network jail) | Partial: process isolation + rlimits done; no syscall/network jail | seccomp filter / namespaces / WASM around the tool subprocess | | **End-to-end integration test** | **Done** (`tests/test_integration_e2e.py`) | 18 assertions: tool call → gate → audit chain |