Skip to content

Commit 87b9fd6

Browse files
committed
chore: merge main into 1.0-dev
2 parents 1c4838f + 697ab8e commit 87b9fd6

12 files changed

Lines changed: 430 additions & 361 deletions

File tree

.github/workflows/python-publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
run: uv build
2727

2828
- name: Upload distributions
29-
uses: actions/upload-artifact@v6
29+
uses: actions/upload-artifact@v7
3030
with:
3131
name: release-dists
3232
path: dist/
@@ -40,7 +40,7 @@ jobs:
4040

4141
steps:
4242
- name: Retrieve release distributions
43-
uses: actions/download-artifact@v7
43+
uses: actions/download-artifact@v8
4444
with:
4545
name: release-dists
4646
path: dist/

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
## [0.3.25](https://github.com/a2aproject/a2a-python/compare/v0.3.24...v0.3.25) (2026-03-10)
4+
5+
6+
### Features
7+
8+
* Implement a vertex based task store ([#752](https://github.com/a2aproject/a2a-python/issues/752)) ([fa14dbf](https://github.com/a2aproject/a2a-python/commit/fa14dbf46b603f288a1f1c474401483bf53950e4))
9+
10+
11+
### Bug Fixes
12+
13+
* return background task from consume_and_break_on_interrupt to prevent GC ([#775](https://github.com/a2aproject/a2a-python/issues/775)) ([a236d4d](https://github.com/a2aproject/a2a-python/commit/a236d4df8dceb2db1e1170e0b57599f3837ebd71))
14+
* use default_factory for mutable field defaults in ServerCallContext ([#744](https://github.com/a2aproject/a2a-python/issues/744)) ([22b25d6](https://github.com/a2aproject/a2a-python/commit/22b25d653e57e2d1453bbc282052e51dbd904ac6))
15+
316
## [0.3.24](https://github.com/a2aproject/a2a-python/compare/v0.3.23...v0.3.24) (2026-02-20)
417

518

src/a2a/contrib/__init__.py

Whitespace-only changes.

src/a2a/server/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ class ServerCallContext(BaseModel):
1919

2020
model_config = ConfigDict(arbitrary_types_allowed=True)
2121

22-
state: State = Field(default={})
23-
user: User = Field(default=UnauthenticatedUser())
22+
state: State = Field(default_factory=dict)
23+
user: User = Field(default_factory=UnauthenticatedUser)
2424
tenant: str = Field(default='')
2525
requested_extensions: set[str] = Field(default_factory=set)
2626
activated_extensions: set[str] = Field(default_factory=set)

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,17 @@ async def push_notification_callback(event: Event) -> None:
351351
(
352352
result,
353353
interrupted_or_non_blocking,
354+
bg_consume_task,
354355
) = await result_aggregator.consume_and_break_on_interrupt(
355356
consumer,
356357
blocking=blocking,
357358
event_callback=push_notification_callback,
358359
)
359360

361+
if bg_consume_task is not None:
362+
bg_consume_task.set_name(f'continue_consuming:{task_id}')
363+
self._track_background_task(bg_consume_task)
364+
360365
except Exception:
361366
logger.exception('Agent execution failed')
362367
producer_task.cancel()

src/a2a/server/tasks/result_aggregator.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ async def consume_and_break_on_interrupt(
9999
consumer: EventConsumer,
100100
blocking: bool = True,
101101
event_callback: Callable[[Event], Awaitable[None]] | None = None,
102-
) -> tuple[Task | Message | None, bool]:
102+
) -> tuple[Task | Message | None, bool, asyncio.Task | None]:
103103
"""Processes the event stream until completion or an interruptible state is encountered.
104104
105105
If `blocking` is False, it returns after the first event that creates a Task or Message.
@@ -119,16 +119,23 @@ async def consume_and_break_on_interrupt(
119119
A tuple containing:
120120
- The current aggregated result (`Task` or `Message`) at the point of completion or interruption.
121121
- A boolean indicating whether the consumption was interrupted (`True`) or completed naturally (`False`).
122+
- The background ``asyncio.Task`` that continues consuming events
123+
after an interruption, or ``None`` when no background work was
124+
spawned. **Callers must hold a strong reference** to this task
125+
(e.g. in a ``set``) to prevent the garbage collector from
126+
collecting it before it finishes — the event loop only keeps
127+
weak references to tasks.
122128
123129
Raises:
124130
BaseException: If the `EventConsumer` raises an exception during consumption.
125131
"""
126132
event_stream = consumer.consume_all()
127133
interrupted = False
134+
bg_task: asyncio.Task | None = None
128135
async for event in event_stream:
129136
if isinstance(event, Message):
130137
self._message = event
131-
return event, False
138+
return event, False, None
132139
await self.task_manager.process(event)
133140

134141
if event_callback:
@@ -161,13 +168,13 @@ async def consume_and_break_on_interrupt(
161168

162169
if should_interrupt:
163170
# Continue consuming the rest of the events in the background.
164-
# TODO: We should track all outstanding tasks to ensure they eventually complete.
165-
asyncio.create_task( # noqa: RUF006
171+
# The caller is responsible for tracking this task to prevent GC.
172+
bg_task = asyncio.create_task(
166173
self._continue_consuming(event_stream, event_callback)
167174
)
168175
interrupted = True
169176
break
170-
return await self.task_manager.get_task(), interrupted
177+
return await self.task_manager.get_task(), interrupted, bg_task
171178

172179
async def _continue_consuming(
173180
self,

tck/sut_agent.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
DefaultRequestHandler,
1616
)
1717
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
18+
from a2a.server.tasks.task_store import TaskStore
1819
from a2a.types import (
1920
AgentCapabilities,
2021
AgentCard,
@@ -128,8 +129,8 @@ async def execute(
128129
await event_queue.enqueue_event(final_update)
129130

130131

131-
def main() -> None:
132-
"""Main entrypoint."""
132+
def serve(task_store: TaskStore) -> None:
133+
"""Sets up the A2A service and starts the HTTP server."""
133134
http_port = int(os.environ.get('HTTP_PORT', '41241'))
134135

135136
agent_card = AgentCard(
@@ -168,7 +169,7 @@ def main() -> None:
168169

169170
request_handler = DefaultRequestHandler(
170171
agent_executor=SUTAgentExecutor(),
171-
task_store=InMemoryTaskStore(),
172+
task_store=task_store,
172173
)
173174

174175
server = A2AStarletteApplication(
@@ -182,5 +183,10 @@ def main() -> None:
182183
uvicorn.run(app, host='127.0.0.1', port=http_port, log_level='info')
183184

184185

186+
def main() -> None:
187+
"""Main entrypoint."""
188+
serve(InMemoryTaskStore())
189+
190+
185191
if __name__ == '__main__':
186192
main()

tests/contrib/__init__.py

Whitespace-only changes.

tests/server/request_handlers/test_default_request_handler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ async def test_on_message_send_with_push_notification():
541541
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
542542
final_task_result,
543543
False,
544+
None,
544545
)
545546

546547
# Mock the current_result async property to return the final task result
@@ -643,6 +644,7 @@ async def test_on_message_send_with_push_notification_in_non_blocking_request():
643644
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
644645
initial_task,
645646
True, # interrupted = True for non-blocking
647+
MagicMock(spec=asyncio.Task), # background task
646648
)
647649

648650
# Mock the current_result async property to return the final task
@@ -666,7 +668,11 @@ async def mock_consume_and_break_on_interrupt(
666668
event_callback_received = event_callback
667669
if event_callback_received:
668670
await event_callback_received(final_task)
669-
return initial_task, True # interrupted = True for non-blocking
671+
return (
672+
initial_task,
673+
True,
674+
MagicMock(spec=asyncio.Task),
675+
) # interrupted = True for non-blocking
670676

671677
mock_result_aggregator_instance.consume_and_break_on_interrupt = (
672678
mock_consume_and_break_on_interrupt
@@ -758,6 +764,7 @@ async def test_on_message_send_with_push_notification_no_existing_Task():
758764
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
759765
final_task_result,
760766
False,
767+
None,
761768
)
762769

763770
# Mock the current_result async property to return the final task result
@@ -815,6 +822,7 @@ async def test_on_message_send_no_result_from_aggregator():
815822
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
816823
None,
817824
False,
825+
None,
818826
)
819827

820828
with (
@@ -864,6 +872,7 @@ async def test_on_message_send_task_id_mismatch():
864872
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
865873
mismatched_task,
866874
False,
875+
None,
867876
)
868877

869878
with (
@@ -1069,6 +1078,7 @@ async def test_on_message_send_interrupted_flow():
10691078
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
10701079
interrupt_task_result,
10711080
True,
1081+
MagicMock(spec=asyncio.Task), # background task
10721082
) # Interrupted = True
10731083

10741084
# Collect coroutines passed to create_task so we can close them

tests/server/request_handlers/test_jsonrpc_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ async def test_on_message_new_message_success(
323323

324324
with patch(
325325
'a2a.server.tasks.result_aggregator.ResultAggregator.consume_and_break_on_interrupt',
326-
return_value=(mock_task, False),
326+
return_value=(mock_task, False, None),
327327
):
328328
request = SendMessageRequest(
329329
message=create_message(
@@ -352,7 +352,7 @@ async def test_on_message_new_message_with_existing_task_success(
352352

353353
with patch(
354354
'a2a.server.tasks.result_aggregator.ResultAggregator.consume_and_break_on_interrupt',
355-
return_value=(mock_task, False),
355+
return_value=(mock_task, False, None),
356356
):
357357
request = SendMessageRequest(
358358
message=create_message(
@@ -1021,7 +1021,7 @@ async def test_on_message_send_task_id_mismatch(self) -> None:
10211021
# Task returned has task_id='task_123' but request_context will have generated UUID
10221022
with patch(
10231023
'a2a.server.tasks.result_aggregator.ResultAggregator.consume_and_break_on_interrupt',
1024-
return_value=(mock_task, False),
1024+
return_value=(mock_task, False, None),
10251025
):
10261026
request = SendMessageRequest(
10271027
message=create_message(), # No task_id, so UUID is generated

0 commit comments

Comments
 (0)