Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ dependencies = [
"python-dotenv>=1.2.2",
# Used for token estimation before LLM calls (LCORE-1569 / conversation compaction)
"tiktoken>=0.8.0",
# Used for Pydantic AI
"pydantic-ai>=1.99.0"
]


Expand Down
1 change: 1 addition & 0 deletions src/pydantic_ai_lightspeed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Pydantic AI integrations/extensions for Lightspeed Core Stack."""
5 changes: 5 additions & 0 deletions src/pydantic_ai_lightspeed/llamastack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Pydantic AI provider for Llama Stack."""

from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider

__all__ = ["LlamaStackProvider"]
123 changes: 123 additions & 0 deletions src/pydantic_ai_lightspeed/llamastack/_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Llama Stack provider implementation for Pydantic AI."""

from __future__ import annotations as _annotations

from typing import TYPE_CHECKING

import httpx
from openai import AsyncOpenAI
from pydantic_ai import ModelProfile
from pydantic_ai.models import create_async_http_client
from pydantic_ai.profiles.openai import openai_model_profile
from pydantic_ai.providers import Provider

from pydantic_ai_lightspeed.llamastack._transport import LlamaStackLibraryTransport

if TYPE_CHECKING:
from llama_stack.core.library_client import AsyncLlamaStackAsLibraryClient

DEFAULT_BASE_URL = "http://localhost:8321/v1"


class LlamaStackProvider(Provider[AsyncOpenAI]):
"""Provider for Llama Stack — connects to a Llama Stack server's OpenAI-compatible API.

Supports two modes:

1. **Server mode** — connect to a running Llama Stack server via HTTP
2. **Library mode** — run Llama Stack in-process via ``AsyncLlamaStackAsLibraryClient``
"""

@property
def name(self) -> str:
"""The provider name."""
return "llama-stack"

@property
def base_url(self) -> str:
"""The base URL for the provider API."""
return str(self._client.base_url)

@property
def client(self) -> AsyncOpenAI:
"""The OpenAI-compatible client for the provider."""
return self._client

@staticmethod
def model_profile(model_name: str) -> ModelProfile | None:
"""Return the model profile for the named model, if available."""
return openai_model_profile(model_name)

def __init__(
self,
*,
base_url: str | None = None,
api_key: str | None = None,
library_client: AsyncLlamaStackAsLibraryClient | None = None,
http_client: httpx.AsyncClient | None = None,
) -> None:
"""Create a new Llama Stack provider.

Args:
base_url: The base URL for the Llama Stack server (OpenAI-compatible endpoint).
Defaults to ``http://localhost:8321/v1``.
Must be ``None`` when ``library_client`` is provided.
api_key: The API key for authentication. Defaults to ``'not-needed'`` since
local Llama Stack servers typically don't require one.
Must be ``None`` when ``library_client`` is provided.
library_client: An initialized ``AsyncLlamaStackAsLibraryClient`` for library mode.
When provided, requests are dispatched in-process (no server needed).
Mutually exclusive with ``base_url``, ``api_key``, and ``http_client``.
http_client: An existing ``httpx.AsyncClient`` to use for making HTTP requests.
Must be ``None`` when ``library_client`` is provided.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
"""
if library_client is not None:
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
if base_url is not None:
raise ValueError("Cannot provide both `library_client` and `base_url`")
if api_key is not None:
raise ValueError("Cannot provide both `library_client` and `api_key`")
if http_client is not None:
raise ValueError(
"Cannot provide both `library_client` and `http_client`"
)

self._library_client = library_client
transport = LlamaStackLibraryTransport(library_client)
lib_http_client = httpx.AsyncClient(
transport=transport,

Check warning

Code scanning / Bandit

Call to httpx without timeout Warning

Call to httpx without timeout
Comment on lines +86 to +87
base_url="http://llama-stack-library",
timeout=httpx.Timeout(None),
)
self._client = AsyncOpenAI(
http_client=lib_http_client,
base_url="http://llama-stack-library/v1",
api_key="not-needed",
)
else:
base_url = base_url or DEFAULT_BASE_URL
api_key = api_key or "not-needed"

if http_client is not None:
self._client = AsyncOpenAI(
base_url=base_url, api_key=api_key, http_client=http_client
)
else:
oai_http_client = create_async_http_client()
self._client = AsyncOpenAI(
base_url=base_url, api_key=api_key, http_client=oai_http_client
)

def __repr__(self) -> str:
"""Return a string representation of the provider."""
return f"LlamaStackProvider(name={self.name!r}, base_url={self.base_url!r})"

def _set_http_client(self, http_client: httpx.AsyncClient) -> None:
"""Inject an httpx.AsyncClient into the underlying OpenAI client.

Replaces the internal HTTP transport by assigning directly to the
protected ``self._client._client`` attribute of the AsyncOpenAI instance.

Args:
http_client: The async HTTP client to use for subsequent requests.
"""
self._client._client = http_client # pyright: ignore[reportPrivateUsage] # pylint: disable=protected-access
197 changes: 197 additions & 0 deletions src/pydantic_ai_lightspeed/llamastack/_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""httpx transport that routes OpenAI-compatible requests through a Llama Stack library client."""

from __future__ import annotations as _annotations

import json
from collections.abc import AsyncGenerator, AsyncIterator
from typing import Any

import httpx
from llama_stack.core.library_client import (
AsyncLlamaStackAsLibraryClient,
convert_pydantic_to_json_value,
)
from llama_stack.core.request_headers import (
PROVIDER_DATA_VAR,
request_provider_data_context,
)
from llama_stack.core.server.routes import find_matching_route
from llama_stack.core.utils.context import preserve_contexts_async_generator


class _AsyncByteStream(httpx.AsyncByteStream):
"""Wraps an async byte generator as an httpx AsyncByteStream."""

def __init__(self, gen: AsyncGenerator[bytes, None]) -> None:
"""Store an async generator that yields raw bytes for streaming.

Args:
gen: An async generator producing byte chunks to stream.
"""
self._gen = gen

async def __aiter__(self) -> AsyncIterator[bytes]:
"""Yield bytes chunks from the wrapped generator.

Returns:
An async iterator of bytes fulfilling the httpx.AsyncByteStream contract.
"""
async for chunk in self._gen:
yield chunk
Comment on lines +22 to +40
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In httpx 0.27.0, what is the expected lifecycle for custom AsyncByteStream implementations? Specifically, should closing an httpx.Response propagate an aclose() call to the stream, and is implementing aclose() recommended when wrapping an async generator?

💡 Result:

In httpx 0.27.0, the expected lifecycle for custom AsyncByteStream implementations is that consuming the response body to completion automatically closes the stream, and explicit Response.aclose is what closes the response and releases the connection. When a custom stream needs cleanup (e.g., releasing resources), it should implement aclose (and ensure it’s safe to call). 1) Should closing an httpx.Response propagate an aclose call to the stream? Yes. - Response.aclose is implemented to “Close the response and release the connection” and it explicitly calls await self.stream.aclose for AsyncByteStream instances [1]. - The AsyncByteStream base class’s aread method ensures that, when the stream is read, it calls await self.aclose in a finally block [2]. - The Response streaming methods are documented as being closed automatically when used inside the client’s.stream context manager; alternatively, in manual mode you must call Response.aclose eventually, otherwise connections may be left open [3]. So: Response close/aclose drives stream cleanup via AsyncByteStream.aclose. 2) Is implementing aclose recommended when wrapping an async generator? Yes, if your wrapper is a custom AsyncByteStream that owns resources or needs generator cleanup. The AsyncByteStream interface includes an aclose method specifically for cleanup, and the framework relies on it to be awaited at the end of reading [2]. Additionally, httpx’s own AsyncIteratorByteStream wrapper records whether the provided iterable is an async generator and enforces “StreamConsumed” semantics for async generators after they’ve been iterated once [4]. That wrapper’s code also treats aread/iteration completion as a point at which aclose is expected to happen (via the base-class aread finally calling aclose) [2][4]. Practical guidance for custom wrappers around async generators: - Implement AsyncByteStream.aclose and ensure it properly closes the underlying generator (e.g., by calling await agen.aclose in your wrapper’s aclose, or using a pattern equivalent to that), so that early Response.aclose or early loop exit triggers generator cleanup via stream.aclose [1]. - If your wrapper doesn’t implement aclose, you won’t get cleanup when Response.aclose calls into the stream, beyond whatever implicit behavior happens to occur during iteration [2][1]. References (from httpx code/docs): AsyncByteStream.aclose contract and aread finally behavior [2]; Response.aclose calling await self.stream.aclose [1]; Response streaming context/manual-mode cleanup expectations [3]; AsyncIteratorByteStream’s handling of async generators [4].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the target file and print the relevant sections around _AsyncByteStream and header logic.
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"

echo "== File exists =="
ls -l "$FILE"

echo
echo "== _AsyncByteStream definition =="
rg -n "_AsyncByteStream" "$FILE"
rg -n "class _AsyncByteStream|def __aiter__" "$FILE"

echo
# Print a reasonable window around the class definition
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-20))
END=$((LINE+80))
sed -n "${START},${END}p" "$FILE" | nl -ba

echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true

echo
echo "== Any header casing / duplicate handling nearby =="
# Print around the first match if found
FIRST=$(rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" | head -n1 | cut -d: -f1 || true)
if [[ -n "${FIRST:-}" ]]; then
  START=$((FIRST-30))
  END=$((FIRST+80))
  sed -n "${START},${END}p" "$FILE" | nl -ba
fi

echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 505


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"

echo "== _AsyncByteStream definition =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n -w3 -s ':'

echo
echo "== Search for provider-data header injection =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true

echo
echo "== Any existing aclose implementation in this file =="
rg -n "def aclose|aclose\(" "$FILE" || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 194


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"

echo "== _AsyncByteStream definition (context) =="
LINE=$(rg -n "class _AsyncByteStream" "$FILE" | head -n1 | cut -d: -f1)
START=$((LINE-15))
END=$((LINE+90))
sed -n "${START},${END}p" "$FILE" | cat -n

echo
echo "== provider-data header injection hits =="
rg -n "provider-data|Provider-Data|provider_data|ProviderData" "$FILE" || true

echo
echo "== Any existing aclose implementation / calls =="
rg -n "def aclose|aclose\(" "$FILE" || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 4951


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="src/pydantic_ai_lightspeed/llamastack/_transport.py"
sed -n '1,40p' "$FILE" | cat -n

Repository: lightspeed-core/lightspeed-stack

Length of output: 1652


Add aclose() propagation for _AsyncByteStream and fix Provider-Data header casing normalization

  • _AsyncByteStream only implements __aiter__; implement aclose() to forward cleanup to the wrapped async generator so httpx.Response.aclose() (especially on early-abandoned streaming) triggers generator aclose().

    Proposed fix
     class _AsyncByteStream(httpx.AsyncByteStream):
         """Wraps an async byte generator as an httpx AsyncByteStream."""
    @@
         async def __aiter__(self) -> AsyncIterator[bytes]:
             """Yield bytes chunks from the wrapped generator.
    @@
             async for chunk in self._gen:
                 yield chunk
    +
    +    async def aclose(self) -> None:
    +        """Close the wrapped generator when the response is closed early."""
    +        aclose = getattr(self._gen, "aclose", None)
    +        if aclose is not None:
    +            await aclose()
  • LlamaStackLibraryTransport.handle_async_request checks for an existing X-LlamaStack-Provider-Data using only two literal casings (X-LlamaStack-Provider-Data and x-llamastack-provider-data); treat header names case-insensitively to avoid injecting a duplicate header when the caller uses different casing.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/pydantic_ai_lightspeed/llamastack/_transport.py` around lines 22 - 40,
The _AsyncByteStream currently yields from the wrapped async generator but
doesn't forward closure; implement an async aclose(self) on class
_AsyncByteStream that calls and awaits self._gen.aclose() if the wrapped
generator has an aclose attribute to ensure httpx.Response.aclose() triggers
generator cleanup. In LlamaStackLibraryTransport.handle_async_request change the
provider-data header existence check to be case-insensitive (e.g., inspect
request.headers keys lowercased or use any(k.lower() ==
"x-llamastack-provider-data" for k in request.headers)) before injecting
"X-LlamaStack-Provider-Data" so you don't add a duplicate header when callers
use different casing.



class LlamaStackLibraryTransport(httpx.AsyncBaseTransport):
"""Custom httpx transport that dispatches requests through a Llama Stack library client.

Instead of making real HTTP calls, this transport routes requests directly
to the Llama Stack's in-process route handlers via the library client's
route matching and body conversion logic.
"""

def __init__(self, client: AsyncLlamaStackAsLibraryClient) -> None:
"""Initialize the transport with a Llama Stack library client.

Args:
client: An initialized ``AsyncLlamaStackAsLibraryClient`` whose route
handlers will receive dispatched requests.
"""
self._client = client

async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
"""Dispatch an httpx request to the in-process Llama Stack route handlers.

Args:
request: The outgoing httpx request to route.

Returns:
An httpx response built from the matched route handler result.

Raises:
RuntimeError: If the library client has not been initialized.
"""
if self._client.route_impls is None:
raise RuntimeError(
"Llama Stack library client not initialized. Call initialize() first."
)

method = request.method
path = request.url.raw_path.decode("utf-8")

body = json.loads(request.content) if request.content else {}

headers: dict[str, str] = {
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
k.decode("utf-8") if isinstance(k, bytes) else k: (
v.decode("utf-8") if isinstance(v, bytes) else v
)
for k, v in request.headers.raw
}

if self._client.provider_data:
keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"]
if all(key not in headers for key in keys):
headers["X-LlamaStack-Provider-Data"] = json.dumps(
self._client.provider_data
)
Comment on lines +89 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Normalize header names before deciding whether to inject provider data.

HTTP header names are case-insensitive. This check only recognizes two spellings, so a request that already carries X-LLAMASTACK-PROVIDER-DATA or another casing variant will get a second header injected and may use the wrong provider data.

Proposed fix
         if self._client.provider_data:
-            keys = ["X-LlamaStack-Provider-Data", "x-llamastack-provider-data"]
-            if all(key not in headers for key in keys):
+            normalized_headers = {key.lower() for key in headers}
+            if "x-llamastack-provider-data" not in normalized_headers:
                 headers["X-LlamaStack-Provider-Data"] = json.dumps(
                     self._client.provider_data
                 )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/pydantic_ai_lightspeed/llamastack/_transport.py` around lines 89 - 94,
The header presence check treats names case-sensitively and only checks two
spellings, so normalize header names before deciding to inject provider data: in
the block that references self._client.provider_data and the headers dict,
compute a lowercase-key view (e.g., {k.lower(): v for k, v in headers.items()}
or use any(k.lower() == "x-llamastack-provider-data" for k in headers)) and
check for "x-llamastack-provider-data" in that normalized view; only if absent,
add the provider data using the canonical header "X-LlamaStack-Provider-Data" to
headers. Ensure you reference headers and self._client.provider_data when
applying the change.


with request_provider_data_context(headers):
is_stream = body.get("stream", False)

if is_stream:
return await self._handle_streaming(request, method, path, body)
return await self._handle_non_streaming(request, method, path, body)

async def _handle_non_streaming(
self,
request: httpx.Request,
method: str,
path: str,
body: dict[str, Any],
) -> httpx.Response:
"""Dispatch a non-streaming request to the matched route handler.

Args:
request: The original httpx request (attached to the response).
method: The HTTP method (e.g. ``"POST"``).
path: The decoded URL path used for route matching.
body: The parsed JSON request body.

Returns:
An httpx.Response containing the JSON-serialized handler result.

Raises:
RuntimeError: If route_impls is not initialized.
"""
if self._client.route_impls is None:
raise RuntimeError("route_impls is not initialized")

matched_func, path_params, _, _ = find_matching_route(
method, path, self._client.route_impls
)
merged_body = {**body, **path_params}
merged_body = self._client._convert_body( # pylint: disable=protected-access
matched_func, merged_body
)

result = await matched_func(**merged_body)

json_content = json.dumps(convert_pydantic_to_json_value(result))
status_code = httpx.codes.OK

if method.upper() == "DELETE" and result is None:
status_code = httpx.codes.NO_CONTENT
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
json_content = ""

return httpx.Response(
status_code=status_code,
content=json_content.encode("utf-8"),
headers={"Content-Type": "application/json"},
request=request,
)

async def _handle_streaming(
self,
request: httpx.Request,
method: str,
path: str,
body: dict[str, Any],
) -> httpx.Response:
"""Dispatch a streaming request and return an SSE event-stream response.

Args:
request: The original httpx request (attached to the response).
method: The HTTP method (e.g. ``"POST"``).
path: The decoded URL path used for route matching.
body: The parsed JSON request body (must contain ``stream: True``).

Returns:
An httpx.Response with a streaming body of SSE-formatted chunks.

Raises:
RuntimeError: If route_impls is not initialized.
"""
if self._client.route_impls is None:
raise RuntimeError("route_impls is not initialized")

func, path_params, _, _ = find_matching_route(
method, path, self._client.route_impls
)
merged_body = {**body, **path_params}
merged_body = self._client._convert_body( # pylint: disable=protected-access
func, merged_body
)

result = await func(**merged_body)

async def gen() -> AsyncGenerator[bytes, None]:
async for chunk in result:
data = json.dumps(convert_pydantic_to_json_value(chunk))
yield f"data: {data}\n\n".encode("utf-8")

wrapped_gen = preserve_contexts_async_generator(gen(), [PROVIDER_DATA_VAR])

return httpx.Response(
status_code=httpx.codes.OK,
stream=_AsyncByteStream(wrapped_gen),
headers={"Content-Type": "text/event-stream"},
request=request,
)
Loading
Loading