Skip to content

PROGRESSIVE_SSE_STREAMING is not honored by the LiteLlm adapter β€” function-call argument deltas are buffered until finish_reasonΒ #5342

@murhaf-masri

Description

@murhaf-masri

πŸ”΄ Required Information

Describe the Bug:

FeatureName.PROGRESSIVE_SSE_STREAMING is default-on in ADK 1.28+ and utils/streaming_utils.py:266-296 correctly emits
function-call parts as partial=True LlmResponses as they arrive β€” for the Gemini-native path. But models/lite_llm.py has its
own inline streaming loop in _generate_content_async (~line 2321) that bypasses StreamingResponseAggregator entirely and
silently buffers FunctionChunks into a local dict until LiteLLM sends finish_reason="tool_calls". The result: when an LLM is
routed through LiteLlm (any non-Gemini model β€” Anthropic, OpenAI, Mistral, etc.), there are zero events emitted between the last
text chunk and the final aggregated function call. For tools with large argument schemas (e.g. a LongRunningFunctionTool whose
arg is a deeply nested object), that gap is regularly 10–20 seconds of dead air, with no signal that the model is still
working. The TextChunk branch of the same loop yields per-token, so the asymmetry is visible side-by-side.

Steps to Reproduce:

  1. pip install google-adk==1.28.1 litellm==1.83.0
  2. Configure any non-Gemini model via LiteLlm (we hit it on openai/gpt-4o-mini and anthropic/claude-sonnet-4-5 routed
    through a LiteLLM proxy).
  3. Define an LlmAgent whose tool list contains a LongRunningFunctionTool with a non-trivial argument schema (a nested object
    with ~7 fields reproduces it every time).
  4. Wrap the agent in App(resumability_config=ResumabilityConfig(is_resumable=True)) so the long-running pause path is active.
  5. Run the agent via runner.run_async(...) and print the inter-event delta for each yielded event.
  6. Send a message that causes the model to call the long-running tool.

Full minimal reproduction code is in the Minimal Reproduction Code section below.

Expected Behavior:

Function-call argument deltas should stream as partial=True events while the model is generating them, the same way the
Gemini-native path already behaves under PROGRESSIVE_SSE_STREAMING. Concretely, between the last text token and the final
aggregated function call, there should be a sequence of partial events whose event.get_function_calls() is truthy and whose
event.partial is True. The final aggregated event (partial=False) should still fire on finish_reason="tool_calls",
identical to today.

The Gemini-native trace (for comparison) looks like:

+0.00s  partial=True   fcs=False  text='I'             
... text deltas ...                                                                                                               
+0.05s  partial=True   fcs=False  text=' details.'                         
+0.21s  partial=True   fcs=True   text=''   ← partial function call as args stream                                                
+0.18s  partial=True   fcs=True   text=''                                                                                         
... ~80 partial function-call events over ~17s ...                                                                                
+0.20s  partial=False  fcs=True   text=''                                                                                         
+0.01s  done                                                                                                                      

Observed Behavior:

Through the LiteLlm adapter, no events fire between the last text chunk and the final aggregated function call. Every reproduction
shows 10–20 seconds of total silence on the event stream:

+0.00s  partial=True   fcs=False  text='I'
+0.05s  partial=True   fcs=False  text=' need'
... (text deltas stream normally over ~1.4s) ...                                                                                  
+0.05s  partial=True   fcs=False  text=' details.'                                                                                
+16.78s partial=False  fcs=True   text=''           ← 16.78s of dead air, no events                                               
+0.01s  done                                                                                                                      

The function call arrives intact in the final event β€” the data is fine. The problem is the absence of any partial events during
the model's argument-generation phase. From a user-facing application's perspective, the agent appears frozen.

Environment Details:

  • ADK Library Version (pip show google-adk): 1.28.1
  • Desktop OS: Linux
  • Python Version (python -V): Python 3.14.3

Model Information:

  • Are you using LiteLLM: Yes
  • Which model is being used: reproduced on openai/gpt-4o-mini and anthropic/claude-sonnet-4-5, both routed through a LiteLLM
    proxy

🟑 Optional Information

Regression:

N/A β€” this has never worked through the LiteLlm adapter. PROGRESSIVE_SSE_STREAMING was introduced and made default-on (see
#3974) but only the Gemini-native code path (utils/streaming_utils.py) was updated. models/lite_llm.py has its own parallel
implementation that was never wired to the feature flag.

Logs:

Real SSE event trace from a production run (timestamps are wall-clock, not deltas), captured at the application's SSE consumer
right outside runner.run_async:

message       {"content": " I"}                          10:22:01.425                                                           
message       {"content": " need"}                       10:22:01.425                                                             
message       {"content": " to"}                         10:22:01.453
message       {"content": " collect"}                    10:22:01.453                                                             
message       {"content": " some"}                       10:22:01.502     
message       {"content": " information"}                10:22:01.502                                                           
message       {"content": " first"}                      10:22:01.549                                                             
message       {"content": "."}                           10:22:01.549                                                           
message       {"content": " Let"}                        10:22:01.598                                                             
message       {"content": " me"}                         10:22:01.598                                                           
message       {"content": " ask"}                        10:22:01.625                                                             
message       {"content": " you"}                        10:22:01.625     
message       {"content": " a"}                          10:22:01.673                                                             
message       {"content": " few"}                        10:22:01.673                                                             
message       {"content": " questions"}                  10:22:01.684                                                           
message       {"content": " to"}                         10:22:01.684                                                             
message       {"content": " get"}                        10:22:01.773                                                           
message       {"content": " the"}                        10:22:01.773                                                             
message       {"content": " necessary"}                  10:22:01.781     
message       {"content": " details"}                    10:22:01.781                                                             
message       {"content": "."}                           10:22:01.837                                                             
question_flow {...full ask_user_question payload...}     10:22:18.623   ← 16.79s gap                                            
done          {"token_count": 5461, "iterations": 1}     10:22:18.623                                                             

The 16.79-second gap between 10:22:01.837 and 10:22:18.623 is the model streaming function-call argument deltas to LiteLLM.
LiteLLM is faithfully forwarding them; ADK's lite_llm.py is buffering them silently.

Screenshots / Video:

N/A

Additional Context:

Root cause

google/adk/models/lite_llm.py _generate_content_async (around line 2321 in 1.28.1) has its own streaming aggregation that
doesn't go through StreamingResponseAggregator.process_response:

# google/adk/models/lite_llm.py:2321                                                                                            
async for part in await self.llm_client.acompletion(**completion_args):                                                           
    for chunk, finish_reason in _model_response_to_chunk(part):
        if isinstance(chunk, FunctionChunk):                                                                                      
            index = chunk.index or fallback_index                         
            if index not in function_calls:                                                                                     
                function_calls[index] = {"name": "", "args": "", "id": None}                                                      
            if chunk.name:                                                                                                        
                function_calls[index]["name"] += chunk.name                                                                       
            if chunk.args:                                                                                                        
                function_calls[index]["args"] += chunk.args   # ← buffer only                                                     
            ...                                                                                                                 
            # NO yield here β€” the partial function call is invisible to callers                                                   
        elif isinstance(chunk, TextChunk):                                                                                        
            text += chunk.text                                                                                                    
            yield _message_to_generate_content_response(  # ← text DOES yield                                                     
                ChatCompletionAssistantMessage(role="assistant", content=chunk.text),                                             
                is_partial=True,                                                                                                
                model_version=part.model,                                                                                         
            )                                                                                                                     
        ...                                                                                                                       
                                                                                                                                  
    if function_calls and (                                                                                                       
        finish_reason == "tool_calls"                                                                                             
        or finish_reason == "length"                                                                                              
        or (finish_reason == "stop" and chunk is None)                    
    ):                                                                                                                            
        aggregated_llm_response_with_tool_call = _finalize_tool_call_response(...)                                                
        # ← only here, after finish_reason, does the function call become an event

The asymmetry is structural: TextChunk yields immediately (good), FunctionChunk accumulates silently (bad). The
PROGRESSIVE_SSE_STREAMING feature flag exists in the registry but is never checked in this file:

$ grep -n "PROGRESSIVE_SSE_STREAMING\|StreamingResponseAggregator" \                                                              
    google/adk/models/lite_llm.py                                                                                                 
# (no matches)                                                            

Why it matters

  1. Long-running tools become indistinguishable from a hung server. We use LongRunningFunctionTool to implement an
    interactive question-form pattern (the model generates a question_flow schema, the framework pauses on the long-running call,
    the frontend renders the form). The form's schema is ~1.5KB of JSON. The model spends 10–20s streaming those arg deltas, during
    which we receive zero events from runner.run_async β€” and there's no callback we can hook to detect "the LLM is now generating
    tool args." before_tool_callback fires after the args are complete, so it can't bracket the gap. From the frontend's
    perspective, the chat just freezes.

  2. It defeats the documented PROGRESSIVE_SSE_STREAMING contract. The feature is default-on, advertised as ADK's progressive
    streaming behavior, and works for Gemini. Users routing through LiteLlm reasonably expect the same behavior β€” instead they get
    silent buffering with no opt-out and no warning.

  3. No external workaround is robust. The cleanest user-side workaround is wrapping LiteLlm.llm_client to inspect the
    underlying LiteLLM stream chunks before they reach ADK's adapter β€” which works, but reaches into private SDK boundaries and is
    brittle across upgrades. Anything else (FE timers, prompt-engineered "I'm about to call a tool" markers, periodic heartbeats) is a
    guess about what's happening rather than a signal of what is happening.

Proposed fix

Make the FunctionChunk branch in _generate_content_async honor PROGRESSIVE_SSE_STREAMING by yielding a partial LlmResponse
containing the current accumulated state of the function call, the same way the TextChunk branch already yields per-token text:

from google.adk.features import FeatureName, is_feature_enabled                                                                 
# ...                                                                                                                             
                                        
elif isinstance(chunk, FunctionChunk):                                                                                            
    index = chunk.index or fallback_index                                 
    if index not in function_calls:                                                                                             
        function_calls[index] = {"name": "", "args": "", "id": None}                                                              
    if chunk.name:                                                                                                              
        function_calls[index]["name"] += chunk.name                                                                               
    if chunk.args:                                                        
        function_calls[index]["args"] += chunk.args                                                                               
    function_calls[index]["id"] = (                                       
        chunk.id or function_calls[index]["id"] or str(index)                                                                   
    )                                                                                                                             
                                            
    if is_feature_enabled(FeatureName.PROGRESSIVE_SSE_STREAMING):                                                                 
        # Mirror the TextChunk path: emit a partial LlmResponse so callers                                                      
        # can show progress while the model streams tool-call arguments.                                                          
        # Args may be partial JSON; consumers should only act on the                                                            
        # non-partial aggregated event downstream, the same as Gemini.                                                            
        yield _partial_function_call_response(                            
            function_calls[index],                                                                                                
            model_version=part.model,                                                                                             
        )                                                                                                                         

Plus a small helper:

def _partial_function_call_response(
    fc_state: dict, *, model_version: str | None                                                                                
) -> LlmResponse:                                                                                                                 
    return LlmResponse(                 
        content=types.Content(                                                                                                    
            role="model",                                                 
            parts=[                                                                                                             
                types.Part(                                                                                                       
                    function_call=types.FunctionCall(                                                                           
                        id=fc_state["id"],                                                                                        
                        name=fc_state["name"] or None,                    
                        # args may not yet be valid JSON β€” pass through as-is.                                                    
                        # Consumers that need parsed args should wait for the
                        # final non-partial event.                                                                              
                        args=fc_state["args"] or None,                                                                            
                    )           
                )                                                                                                                 
            ],                                                                                                                    
        ),                                                                                                                        
        partial=True,                                                                                                             
        model_version=model_version,                                                                                              
    )                                                                                                                             

Important properties this preserves:

  • The aggregated final event is unchanged. _finalize_tool_call_response still fires on finish_reason="tool_calls" and
    emits the same partial=False event with parsed args. The new partial events are additional, not replacements.
  • handle_function_calls_async is unaffected. It only fires on the non-partial event (per existing if model_response_event.partial: return guard at flows/llm_flows/base_llm_flow.py:926-927), so handlers are not double-invoked.
    This matches what xuanyang15 confirmed in ADK_ENABLE_PROGRESSIVE_SSE_STREAMING causes excessive tool use by Gemini 3Β #3974: "the first function call is a partial event, where ADK doesn't call the
    function. Only the second one (non partial) will trigger an actually function call."
  • The pause-and-resume path for LongRunningFunctionTool is unaffected. should_pause_invocation is checked against the
    non-partial final event, same as today.
  • Default-on, but disable-able. ADK_DISABLE_PROGRESSIVE_SSE_STREAMING=1 already exists for users who want the old behavior.
    No new env var or API needed.

Happy to put up the PR if the maintainers agree this is the right shape.

Related issues

Minimal Reproduction Code:

import asyncio                                                                                                                    
import time                                                                                                                       
                                                                          
from google.adk.agents import LlmAgent                                                                                            
from google.adk.apps.app import App, ResumabilityConfig                   
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner                                                                                           
from google.adk.sessions import InMemorySessionService                                                                            
from google.adk.tools import LongRunningFunctionTool
from google.genai import types                                                                                                    
                                                                                                                                  
                                                                                                                                
async def ask_user_question(question_flow: dict) -> dict | None:                                                                  
    """Long-running pause sentinel: returning None tells ADK this is still pending."""                                          
    return None                                                                                                                   
                                                                          
                                                                                                                                  
agent = LlmAgent(                                                                                                               
    name="demo",                                                                                                                  
    model=LiteLlm(model="openai/gpt-4o-mini"),  # or any non-Gemini model                                                         
    instruction=(                                                                                                                 
        "When the user says 'go', call ask_user_question with a question_flow "                                                   
        "object containing 7 questions covering subject, grade level, language, "
        "duration, chapter count, difficulty, and goals. Use a rich nested schema."                                             
    ),                                                                                                                            
    tools=[LongRunningFunctionTool(ask_user_question)],                                                                           
)                                                                                                                                 
                                                                                                                                  
app = App(                                                                                                                        
    name="demo",                                                                                                                  
    root_agent=agent,                                                                                                             
    resumability_config=ResumabilityConfig(is_resumable=True),                                                                    
)                                                                                                                                 
                                            
                                                                                                                                  
async def main():                                                         
    session_service = InMemorySessionService()
    await session_service.create_session(app_name="demo", user_id="u", session_id="s")                                          
    runner = Runner(app=app, session_service=session_service)                                                                     
                                        
    last = time.time()                                                                                                            
    async for event in runner.run_async(                                  
        user_id="u",                                                                                                            
        session_id="s",                                                                                                           
        new_message=types.Content(role="user", parts=[types.Part(text="go")]),
    ):                                                                                                                            
        now = time.time()                                                                                                         
        text = ""               
        if event.content and event.content.parts and event.content.parts[0].text:                                                 
            text = (event.content.parts[0].text or "")[:30]                                                                     
        print(                                                                                                                    
            f"+{now - last:6.2f}s  partial={event.partial!s:5}  "
            f"fcs={bool(event.get_function_calls())!s:5}  text={text!r}"                                                          
        )                                                                                                                       
        last = now                                                                                                                
                                                                                                                                  
                                                                                                                                  
asyncio.run(main())                                                                                                               

How often has this issue occurred?:

  • Always (100%)

Metadata

Metadata

Assignees

Labels

models[Component] Issues related to model support

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions