Skip to content

Commit ccbf489

Browse files
committed
AgentExecutor documentation cleanup.
1 parent 3468180 commit ccbf489

2 files changed

Lines changed: 146 additions & 17 deletions

File tree

src/a2a/server/agent_execution/agent_executor.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,43 @@ async def execute(
2323
return once the agent's execution for this request is complete or
2424
yields control (e.g., enters an input-required state).
2525
26-
TODO: Document request lifecycle and AgentExecutor responsibilities:
27-
- Should not close the event_queue.
28-
- Guarantee single execution per request (no concurrent execution).
29-
- Throwing exception will result in TaskState.TASK_STATE_ERROR (CHECK!)
30-
- Once call is completed it should not access context or event_queue
31-
- Before completing the call it SHOULD update task status to terminal or interrupted state.
32-
- Explain AUTH_REQUIRED workflow.
33-
- Explain INPUT_REQUIRED workflow.
34-
- Explain how cancelation work (executor task will be canceled, cancel() is called, order of calls, etc)
35-
- Explain if execute can wait for cancel and if cancel can wait for execute.
36-
- Explain behaviour of streaming / not-immediate when execute() returns in active state.
37-
- Possible workflows:
38-
- Enqueue a SINGLE Message object
39-
- Enqueue TaskStatusUpdateEvent (TASK_STATE_SUBMITTED or TASK_STATE_REJECTED) and continue with TaskStatusUpdateEvent / TaskArtifactUpdateEvent.
26+
Request Lifecycle & AgentExecutor Responsibilities:
27+
- **Concurrency**: The framework guarantees single execution per request;
28+
`execute()` will not be called concurrently for the same request context.
29+
- **Exception Handling**: Unhandled exceptions raised by `execute()` will be
30+
caught by the framework and result in the task transitioning to
31+
`TaskState.TASK_STATE_ERROR`.
32+
- **Post-Completion**: Once `execute()` completes (returns or raises), the
33+
executor must not access the `context` or `event_queue` anymore.
34+
- **Terminal States**: Before completing the call normally, the executor
35+
SHOULD publish a `TaskStatusUpdateEvent` to transition the task to a
36+
terminal state (e.g., `TASK_STATE_COMPLETED`) or an interrupted state
37+
(`TASK_STATE_INPUT_REQUIRED` or `TASK_STATE_AUTH_REQUIRED`).
38+
- **Interrupted Workflows**:
39+
- `TASK_STATE_INPUT_REQUIRED`: The executor publishes a `TaskStatusUpdateEvent` with
40+
`TaskState.TASK_STATE_INPUT_REQUIRED` and returns to yield control.
41+
The request will resume once user input is provided.
42+
- `TASK_STATE_AUTH_REQUIRED`: There are in-bound and out-of-bound auth models.
43+
In both scenarios, the agent publishes a `TaskStatusUpdateEvent` with
44+
`TaskState.TASK_STATE_AUTH_REQUIRED`.
45+
- In-bound: The agent should return from `execute()`. The framework will
46+
call `execute()` again once the user response is received.
47+
- Out-of-bound: The agent should not return from `execute()`. It should wait
48+
for the out-of-band auth provider to complete the authentication and then
49+
continue execution.
50+
51+
- **Cancellation Workflow**: When a cancellation request is received, the
52+
async task running `execute()` is cancelled (raising an `asyncio.CancelledError`),
53+
and `cancel()` is explicitly called by the framework.
54+
55+
Allowed Workflows:
56+
- Immediate response: Enqueue a SINGLE `Message` object.
57+
- Asynchronous/Long-running: Enqueue a `Task` object, perform work, and emit
58+
multiple `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` objects over time.
59+
60+
Note that the framework waits with response to the send_message request with
61+
`return_immediately=True` parameter until the first event (Message or Task)
62+
is enqueued by AgentExecutor.
4063
4164
Args:
4265
context: The request context containing the message, task ID, etc.
@@ -53,9 +76,12 @@ async def cancel(
5376
in the context and publish a `TaskStatusUpdateEvent` with state
5477
`TaskState.TASK_STATE_CANCELED` to the `event_queue`.
5578
56-
TODO: Document cancelation workflow.
57-
- What if TaskState.TASK_STATE_CANCELED is not set by cancel() ?
58-
- How it can interact with execute() ?
79+
Cancellation Workflow & Interactions:
80+
- **Framework Interaction**: The framework issues a cancellation to the asyncio Task
81+
running `execute()` (which will raise `asyncio.CancelledError` inside `execute()`),
82+
and awaits this `cancel()` method.
83+
- **State Transition**: The `cancel()` method MUST publish a `TaskStatusUpdateEvent`
84+
with `TaskState.TASK_STATE_CANCELED` to definitively end the task.
5985
6086
Args:
6187
context: The request context containing the task ID to cancel.

tests/integration/test_scenarios.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ def agent_card():
113113
)
114114

115115

116+
def get_task_id(event):
117+
if event.HasField('task'):
118+
return event.task.id
119+
if event.HasField('status_update'):
120+
return event.status_update.task_id
121+
assert False, f'Event {event} has no task_id'
122+
123+
124+
def get_task_context_id(event):
125+
if event.HasField('task'):
126+
return event.task.context_id
127+
if event.HasField('status_update'):
128+
return event.status_update.context_id
129+
assert False, f'Event {event} has no context_id'
130+
131+
116132
def get_state(event):
117133
if event.HasField('task'):
118134
return event.task.status.state
@@ -1265,6 +1281,93 @@ async def cancel(
12651281
)
12661282

12671283

1284+
# Scenario: Auth required and in channel unblocking
1285+
@pytest.mark.timeout(2.0)
1286+
@pytest.mark.asyncio
1287+
@pytest.mark.parametrize('use_legacy', [False, True], ids=['v2', 'legacy'])
1288+
@pytest.mark.parametrize(
1289+
'streaming', [False, True], ids=['blocking', 'streaming']
1290+
)
1291+
async def test_scenario_auth_required_in_channel(use_legacy, streaming):
1292+
class AuthAgent(AgentExecutor):
1293+
async def execute(
1294+
self, context: RequestContext, event_queue: EventQueue
1295+
):
1296+
message = context.message
1297+
if message and message.parts and message.parts[0].text == 'start':
1298+
await event_queue.enqueue_event(
1299+
TaskStatusUpdateEvent(
1300+
task_id=context.task_id,
1301+
context_id=context.context_id,
1302+
status=TaskStatus(
1303+
state=TaskState.TASK_STATE_AUTH_REQUIRED
1304+
),
1305+
)
1306+
)
1307+
elif (
1308+
message
1309+
and message.parts
1310+
and message.parts[0].text == 'credentials'
1311+
):
1312+
await event_queue.enqueue_event(
1313+
TaskStatusUpdateEvent(
1314+
task_id=context.task_id,
1315+
context_id=context.context_id,
1316+
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED),
1317+
)
1318+
)
1319+
else:
1320+
raise ValueError(f'Unexpected message {message}')
1321+
1322+
async def cancel(
1323+
self, context: RequestContext, event_queue: EventQueue
1324+
):
1325+
pass
1326+
1327+
handler = create_handler(AuthAgent(), use_legacy)
1328+
client = await create_client(
1329+
handler, agent_card=agent_card(), streaming=streaming
1330+
)
1331+
1332+
msg1 = Message(
1333+
message_id='msg-start', role=Role.ROLE_USER, parts=[Part(text='start')]
1334+
)
1335+
1336+
it = client.send_message(
1337+
SendMessageRequest(
1338+
message=msg1,
1339+
configuration=SendMessageConfiguration(return_immediately=False),
1340+
)
1341+
)
1342+
1343+
events1 = [event async for event in it]
1344+
assert [get_state(event) for event in events1] == [
1345+
TaskState.TASK_STATE_AUTH_REQUIRED,
1346+
]
1347+
task_id = get_task_id(events1[0])
1348+
context_id = get_task_context_id(events1[0])
1349+
1350+
# Now send another message with credentials
1351+
msg2 = Message(
1352+
task_id=task_id,
1353+
context_id=context_id,
1354+
message_id='msg-creds',
1355+
role=Role.ROLE_USER,
1356+
parts=[Part(text='credentials')],
1357+
)
1358+
1359+
it2 = client.send_message(
1360+
SendMessageRequest(
1361+
message=msg2,
1362+
configuration=SendMessageConfiguration(return_immediately=False),
1363+
)
1364+
)
1365+
1366+
assert [get_state(event) async for event in it2] == [
1367+
TaskState.TASK_STATE_COMPLETED,
1368+
]
1369+
1370+
12681371
# Scenario: Parallel subscribe attach detach
12691372
# Migrated from: test_parallel_subscribe_attach_detach in test_handler_comparison
12701373
@pytest.mark.timeout(5.0)

0 commit comments

Comments
 (0)