Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/google/adk/cli/fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,15 @@ def _has_parent_reference(path: str) -> bool:
# Block any upload that contains an `args` key anywhere in the document.
_BLOCKED_YAML_KEYS = frozenset({"args"})

# Fields that accept fully-qualified Python names and feed into
# importlib.import_module() at agent load time. Dotted values in
# these fields cause module-level code execution from any installed
# package. Builder uploads must use simple (non-dotted) names that
# resolve to ADK built-in types only.
_IMPORT_REFERENCE_KEYS = frozenset({"name", "agent_class", "code"})

def _check_yaml_for_blocked_keys(content: bytes, filename: str) -> None:
"""Raise if the YAML document contains any blocked keys."""
"""Raise if the YAML document contains blocked keys or code refs."""
import yaml

try:
Expand All @@ -325,6 +332,17 @@ def _walk(node: Any) -> None:
f"The '{key}' field is not allowed in builder uploads "
"because it can execute arbitrary code."
)
if (
key in _IMPORT_REFERENCE_KEYS
and isinstance(value, str)
and "." in value
):
raise ValueError(
f"Fully qualified Python reference in '{key}':"
f" {value!r} in {filename!r} is not allowed in"
" builder uploads. Only simple (non-dotted) names"
" that resolve to ADK built-in types are permitted."
)
_walk(value)
elif isinstance(node, list):
for item in node:
Expand Down
87 changes: 87 additions & 0 deletions tests/unittests/cli/test_fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,93 @@ def test_builder_save_rejects_nested_args_key(builder_test_client, tmp_path):
assert "args" in response.json()["detail"]


def test_builder_save_rejects_dotted_tool_name(builder_test_client, tmp_path):
"""Uploading YAML with a dotted tool name is rejected (import prevention)."""
yaml_with_dotted_tool = b"""\
name: my_agent
tools:
- name: os.system
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
("app/root_agent.yaml", yaml_with_dotted_tool, "application/x-yaml"),
)],
)
assert response.status_code == 400
assert "os.system" in response.json()["detail"]
assert not (tmp_path / "app" / "tmp" / "app" / "root_agent.yaml").exists()


def test_builder_save_rejects_dotted_agent_class(builder_test_client, tmp_path):
"""Uploading YAML with a dotted agent_class is rejected."""
yaml_with_dotted_class = b"""\
agent_class: evil.module.MyAgent
name: my_agent
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
("app/root_agent.yaml", yaml_with_dotted_class, "application/x-yaml"),
)],
)
assert response.status_code == 400
assert "evil.module.MyAgent" in response.json()["detail"]


def test_builder_save_rejects_dotted_code_ref(builder_test_client, tmp_path):
"""Uploading YAML with a dotted code reference is rejected."""
yaml_with_dotted_code = b"""\
name: my_agent
sub_agents:
- code: evil.module.my_agent
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
("app/root_agent.yaml", yaml_with_dotted_code, "application/x-yaml"),
)],
)
assert response.status_code == 400
assert "evil.module.my_agent" in response.json()["detail"]


def test_builder_save_allows_simple_tool_name(builder_test_client, tmp_path):
"""Uploading YAML with a simple (non-dotted) tool name is allowed."""
yaml_with_simple_tool = b"""\
name: my_agent
tools:
- name: google_search
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
("app/root_agent.yaml", yaml_with_simple_tool, "application/x-yaml"),
)],
)
assert response.status_code == 200


def test_builder_save_allows_simple_agent_class(builder_test_client, tmp_path):
"""Uploading YAML with a simple agent_class (e.g. LlmAgent) is allowed."""
yaml_with_simple_class = b"""\
agent_class: LlmAgent
name: my_agent
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
("app/root_agent.yaml", yaml_with_simple_class, "application/x-yaml"),
)],
)
assert response.status_code == 200


def test_builder_get_rejects_non_yaml_file_paths(builder_test_client, tmp_path):
"""GET /builder/app/{app_name}?file_path=... rejects non-YAML extensions."""
app_root = tmp_path / "app"
Expand Down