Skip to content

Commit 57a6624

Browse files
authored
fix(samples): emit Task(TASK_STATE_SUBMITTED) as first streaming event (#970)
# Description Updates the sample agent and CLI to correctly follow the A2A streaming event contract, where the first event in a stream must be a `Task` or a `Message` object in `TASK_STATE_SUBMITTED` state. # Changes **hello_world_agent.py** `SampleAgentExecutor.execute()` now enqueues a `Task(TASK_STATE_SUBMITTED)` object as its very first event, before any TaskUpdater calls. The initial user message is included in the Task's history field, since the consumer sets message_to_save = None upon receiving a Task event (expecting the task to carry the message itself). **cli.py** Updates `_handle_stream` to match the new event contract: The first event is now expected to be a `Message` or a `Task` (not an (event, task) tuple), and its id is used to initialize `current_task_id`. **README.md** Adds a `README.md` for the samples. # Tested ``` uv run samples/cli.py Connecting to http://127.0.0.1:41241 (preferred transport: Any) ✓ Agent Card Found: Name: Sample Agent Picked Transport: JsonRpcTransport Connected! Send a message or type /quit to exit. You: hi Task [state=TASK_STATE_SUBMITTED] TaskStatusUpdate [state=TASK_STATE_WORKING]: Processing your question... TaskArtifactUpdate [name=response]: Hello World! Nice to meet you! TaskStatusUpdate [state=TASK_STATE_COMPLETED]: --- Task Finished --- You: /quit ``` Related issue #965 🦕
1 parent 6b56511 commit 57a6624

3 files changed

Lines changed: 110 additions & 31 deletions

File tree

samples/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# A2A Python SDK — Samples
2+
3+
This directory contains runnable examples demonstrating how to build and interact with an A2A-compliant agent using the Python SDK.
4+
5+
## Contents
6+
7+
| File | Role | Description |
8+
|---|---|---|
9+
| `hello_world_agent.py` | **Server** | A2A agent server |
10+
| `cli.py` | **Client** | Interactive terminal client |
11+
12+
The samples are designed to work together out of the box: the agent listens on `http://127.0.0.1:41241`, which is the default URL used by the client.
13+
---
14+
15+
## `hello_world_agent.py` — Agent Server
16+
17+
Implements an A2A agent that responds to simple greeting messages (e.g., "hello", "how are you", "bye") with text replies, simulating a 1-second processing delay.
18+
19+
Demonstrates:
20+
- Subclassing `AgentExecutor` and implementing `execute()` / `cancel()`
21+
- Publishing streaming status updates and artifacts via `TaskUpdater`
22+
- Exposing all three transports in both protocol versions (v1.0 and v0.3 compat) simultaneously:
23+
- **JSON-RPC** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/jsonrpc`
24+
- **HTTP+JSON (REST)** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/rest`
25+
- **gRPC v1.0** on port `50051`
26+
- **gRPC v0.3 (compat)** on port `50052`
27+
- Serving the agent card at `http://127.0.0.1:41241/.well-known/agent-card.json`
28+
29+
**Run:**
30+
31+
```bash
32+
uv run python samples/hello_world_agent.py
33+
```
34+
35+
---
36+
37+
## `cli.py` — Client
38+
39+
An interactive terminal client with full visibility into the streaming event flow. Each `TaskStatusUpdate` and `TaskArtifactUpdate` event is printed as it arrives.
40+
41+
Features:
42+
- Transport selection via `--transport` flag (`JSONRPC`, `HTTP+JSON`, `GRPC`)
43+
- Session management (`context_id` persisted across messages, `task_id` per task)
44+
- Graceful error handling for HTTP and gRPC failures
45+
46+
**Run:**
47+
48+
```bash
49+
# Connect to the local hello_world_agent (default):
50+
uv run python samples/cli.py
51+
52+
# Connect to a different URL, using gRPC:
53+
uv run python samples/cli.py --url http://192.168.1.10:41241 --transport GRPC
54+
```
55+
56+
Then type a message like `hello` and press Enter.
57+
58+
Type `/quit` or `/exit` to stop, or press `Ctrl+C`.

samples/cli.py

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,51 @@
1313
from a2a.types import Message, Part, Role, SendMessageRequest, TaskState
1414

1515

16-
async def _handle_stream(
16+
async def _handle_stream( # noqa: PLR0912
1717
stream: Any, current_task_id: str | None
1818
) -> str | None:
19-
async for event, task in stream:
20-
if not task:
21-
continue
19+
async for event in stream:
20+
if event.HasField('message'):
21+
print('Message:', end=' ')
22+
for part in event.message.parts:
23+
if part.text:
24+
print(part.text, end=' ')
25+
print()
26+
return None
27+
2228
if not current_task_id:
23-
current_task_id = task.id
24-
25-
if event:
26-
if event.HasField('status_update'):
27-
state_name = TaskState.Name(event.status_update.status.state)
28-
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
29-
if event.status_update.status.HasField('message'):
30-
for part in event.status_update.status.message.parts:
31-
if part.text:
32-
print(part.text, end=' ')
33-
print()
34-
35-
if (
36-
event.status_update.status.state
37-
== TaskState.TASK_STATE_COMPLETED
38-
):
39-
current_task_id = None
40-
print('--- Task Completed ---')
41-
42-
elif event.HasField('artifact_update'):
43-
print(
44-
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
45-
end=' ',
46-
)
47-
for part in event.artifact_update.artifact.parts:
29+
if event.HasField('task'):
30+
current_task_id = event.task.id
31+
print('--- Task Started ---')
32+
print(f'Task [state={TaskState.Name(event.task.status.state)}]')
33+
else:
34+
raise ValueError(f'Unexpected first event: {event}')
35+
36+
if event.HasField('status_update'):
37+
state_name = TaskState.Name(event.status_update.status.state)
38+
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
39+
if event.status_update.status.HasField('message'):
40+
for part in event.status_update.status.message.parts:
4841
if part.text:
4942
print(part.text, end=' ')
50-
print()
51-
43+
print()
44+
if state_name in (
45+
'TASK_STATE_COMPLETED',
46+
'TASK_STATE_FAILED',
47+
'TASK_STATE_CANCELED',
48+
'TASK_STATE_REJECTED',
49+
):
50+
current_task_id = None
51+
print('--- Task Finished ---')
52+
elif event.HasField('artifact_update'):
53+
print(
54+
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
55+
end=' ',
56+
)
57+
for part in event.artifact_update.artifact.parts:
58+
if part.text:
59+
print(part.text, end=' ')
60+
print()
5261
return current_task_id
5362

5463

samples/hello_world_agent.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
AgentProvider,
2828
AgentSkill,
2929
Part,
30+
Task,
31+
TaskState,
32+
TaskStatus,
3033
a2a_pb2_grpc,
3134
)
3235

@@ -75,6 +78,15 @@ async def execute(
7578
context_id,
7679
)
7780

81+
await event_queue.enqueue_event(
82+
Task(
83+
id=task_id,
84+
context_id=context_id,
85+
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
86+
history=[user_message],
87+
)
88+
)
89+
7890
updater = TaskUpdater(
7991
event_queue=event_queue,
8092
task_id=task_id,

0 commit comments

Comments
 (0)