Skip to content

Commit 041bbe1

Browse files
committed
AgentExecutor documentation cleanup.
1 parent 3468180 commit 041bbe1

2 files changed

Lines changed: 129 additions & 17 deletions

File tree

src/a2a/server/agent_execution/agent_executor.py

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,42 @@ async def execute(
2323
return once the agent's execution for this request is complete or
2424
yields control (e.g., enters an input-required state).
2525
26-
TODO: Document request lifecycle and AgentExecutor responsibilities:
27-
- Should not close the event_queue.
28-
- Guarantee single execution per request (no concurrent execution).
29-
- Throwing exception will result in TaskState.TASK_STATE_ERROR (CHECK!)
30-
- Once call is completed it should not access context or event_queue
31-
- Before completing the call it SHOULD update task status to terminal or interrupted state.
32-
- Explain AUTH_REQUIRED workflow.
33-
- Explain INPUT_REQUIRED workflow.
34-
- Explain how cancelation work (executor task will be canceled, cancel() is called, order of calls, etc)
35-
- Explain if execute can wait for cancel and if cancel can wait for execute.
36-
- Explain behaviour of streaming / not-immediate when execute() returns in active state.
37-
- Possible workflows:
38-
- Enqueue a SINGLE Message object
39-
- Enqueue TaskStatusUpdateEvent (TASK_STATE_SUBMITTED or TASK_STATE_REJECTED) and continue with TaskStatusUpdateEvent / TaskArtifactUpdateEvent.
26+
Request Lifecycle & AgentExecutor Responsibilities:
27+
- **Concurrency**: The framework guarantees single execution per request;
28+
`execute()` will not be called concurrently for the same request context.
29+
- **Exception Handling**: Unhandled exceptions raised by `execute()` will be
30+
caught by the framework and result in the task transitioning to
31+
`TaskState.TASK_STATE_ERROR`.
32+
- **Post-Completion**: Once `execute()` completes (returns or raises), the
33+
executor must not access the `context` or `event_queue` anymore.
34+
- **Terminal States**: Before completing the call normally, the executor
35+
SHOULD publish a `TaskStatusUpdateEvent` to transition the task to a
36+
terminal state (e.g., `TASK_STATE_COMPLETED`) or an interrupted state
37+
(`TASK_STATE_INPUT_REQUIRED` or `TASK_STATE_AUTH_REQUIRED`).
38+
- **Interrupted Workflows**:
39+
- `TASK_STATE_INPUT_REQUIRED`: The executor publishes a `TaskStatusUpdateEvent` with
40+
`TaskState.TASK_STATE_INPUT_REQUIRED` and returns to yield control.
41+
The request will resume once user input is provided.
42+
- `TASK_STATE_AUTH_REQUIRED`: There are in-bound and out-of-bound auth models.
43+
In both scenarios, the agent publishes a `TaskStatusUpdateEvent` with
44+
`TaskState.TASK_STATE_AUTH_REQUIRED`.
45+
- In-bound: The agent should return from `execute()`. The framework will
46+
call `execute()` again once the user response is received.
47+
- Out-of-bound: The agent should not return from `execute()`. The framework
48+
will allow the agent to wait for user response on a separate channel.
49+
50+
- **Cancellation Workflow**: When a cancellation request is received, the
51+
async task running `execute()` is cancelled (raising an `asyncio.CancelledError`),
52+
and `cancel()` is explicitly called by the framework. `cancel()`.
53+
54+
Allowed Workflows:
55+
- Immediate response: Enqueue a SINGLE `Message` object.
56+
- Asynchronous/Long-running: Enqueue a `Task` object, perform work, and emit
57+
multiple `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` objects over time.
58+
59+
Note that the framework waits with response to the request with
60+
`return_immediately=True` parameter until the first event
61+
(Message or Task) is enqueued by AgentExecutor.
4062
4163
Args:
4264
context: The request context containing the message, task ID, etc.
@@ -53,9 +75,12 @@ async def cancel(
5375
in the context and publish a `TaskStatusUpdateEvent` with state
5476
`TaskState.TASK_STATE_CANCELED` to the `event_queue`.
5577
56-
TODO: Document cancelation workflow.
57-
- What if TaskState.TASK_STATE_CANCELED is not set by cancel() ?
58-
- How it can interact with execute() ?
78+
Cancellation Workflow & Interactions:
79+
- **Framework Interaction**: The framework issues a cancellation to the asyncio Task
80+
running `execute()` (which will raise `asyncio.CancelledError` inside `execute()`),
81+
and awaits this `cancel()` method.
82+
- **State Transition**: The `cancel()` method MUST publish a `TaskStatusUpdateEvent`
83+
with `TaskState.TASK_STATE_CANCELED` to definitively end the task.
5984
6085
Args:
6186
context: The request context containing the task ID to cancel.

tests/integration/test_scenarios.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,6 +1265,93 @@ async def cancel(
12651265
)
12661266

12671267

1268+
# Scenario: Auth required and in channel unblocking
1269+
@pytest.mark.timeout(2.0)
1270+
@pytest.mark.asyncio
1271+
@pytest.mark.parametrize('use_legacy', [False, True], ids=['v2', 'legacy'])
1272+
@pytest.mark.parametrize(
1273+
'streaming', [False, True], ids=['blocking', 'streaming']
1274+
)
1275+
async def test_scenario_auth_required_in_channel(use_legacy, streaming):
1276+
class AuthAgent(AgentExecutor):
1277+
async def execute(
1278+
self, context: RequestContext, event_queue: EventQueue
1279+
):
1280+
message = context.message
1281+
if message and message.parts and message.parts[0].text == 'start':
1282+
await event_queue.enqueue_event(
1283+
TaskStatusUpdateEvent(
1284+
task_id=context.task_id,
1285+
context_id=context.context_id,
1286+
status=TaskStatus(
1287+
state=TaskState.TASK_STATE_AUTH_REQUIRED
1288+
),
1289+
)
1290+
)
1291+
elif (
1292+
message
1293+
and message.parts
1294+
and message.parts[0].text == 'credentials'
1295+
):
1296+
await event_queue.enqueue_event(
1297+
TaskStatusUpdateEvent(
1298+
task_id=context.task_id,
1299+
context_id=context.context_id,
1300+
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED),
1301+
)
1302+
)
1303+
else:
1304+
raise ValueError(f'Unexpected message {message}')
1305+
1306+
async def cancel(
1307+
self, context: RequestContext, event_queue: EventQueue
1308+
):
1309+
pass
1310+
1311+
handler = create_handler(AuthAgent(), use_legacy)
1312+
client = await create_client(
1313+
handler, agent_card=agent_card(), streaming=streaming
1314+
)
1315+
1316+
msg1 = Message(
1317+
message_id='msg-start', role=Role.ROLE_USER, parts=[Part(text='start')]
1318+
)
1319+
1320+
it = client.send_message(
1321+
SendMessageRequest(
1322+
message=msg1,
1323+
configuration=SendMessageConfiguration(return_immediately=False),
1324+
)
1325+
)
1326+
1327+
events1 = [event async for event in it]
1328+
assert [get_state(event) for event in events1] == [
1329+
TaskState.TASK_STATE_AUTH_REQUIRED,
1330+
]
1331+
task_id = events1[0].status_update.task_id
1332+
context_id = events1[0].status_update.context_id
1333+
1334+
# Now send another message with credentials
1335+
msg2 = Message(
1336+
task_id=task_id,
1337+
context_id=context_id,
1338+
message_id='msg-creds',
1339+
role=Role.ROLE_USER,
1340+
parts=[Part(text='credentials')],
1341+
)
1342+
1343+
it2 = client.send_message(
1344+
SendMessageRequest(
1345+
message=msg2,
1346+
configuration=SendMessageConfiguration(return_immediately=False),
1347+
)
1348+
)
1349+
1350+
assert [get_state(event) async for event in it2] == [
1351+
TaskState.TASK_STATE_COMPLETED,
1352+
]
1353+
1354+
12681355
# Scenario: Parallel subscribe attach detach
12691356
# Migrated from: test_parallel_subscribe_attach_detach in test_handler_comparison
12701357
@pytest.mark.timeout(5.0)

0 commit comments

Comments
 (0)