Skip to content

Commit 786e5fd

Browse files
authored
fix: return TaskNotFoundError when SendMessage references non-existent taskId (#788)
Per A2A spec section 3.4.2, when a client includes a taskId in a Message, it MUST reference an existing task. The SDK was creating a new task with that id instead. Added a guard in DefaultRequestHandler.initMessageSend() that throws TaskNotFoundError when the provided taskId does not reference an existing task. This fixes the issue on all three transports (JSON-RPC, gRPC, HTTP+JSON) since they all use the same DefaultRequestHandler code path. Also fixes a related contextId bug: when a follow-up message had a taskId but no contextId, RequestContext.Builder.build() was generating a fresh UUID, so the agent executor saw the wrong contextId. The rebuild now seeds contextId from the stored task. Existing tests that relied on the old taskId behavior have been updated. Fixes #766
1 parent 98d2bdb commit 786e5fd

10 files changed

Lines changed: 438 additions & 258 deletions

File tree

extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
import java.util.Queue;
1212
import java.util.concurrent.CountDownLatch;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicReference;
1415

1516
import jakarta.inject.Inject;
1617
import jakarta.transaction.Transactional;
1718

1819
import org.a2aproject.sdk.client.Client;
20+
import org.a2aproject.sdk.client.TaskEvent;
21+
import org.a2aproject.sdk.client.TaskUpdateEvent;
1922
import org.a2aproject.sdk.client.config.ClientConfig;
2023
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
2124
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder;
@@ -98,23 +101,33 @@ public void testDirectNotificationTrigger() {
98101

99102
@Test
100103
public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Exception {
101-
final String taskId = "push-notify-test-" + System.currentTimeMillis();
102-
final String contextId = "test-context";
103-
104-
// Step 1: Create the task
104+
// Step 1: Create the task (no client-provided taskId — server generates it)
105105
Message createMessage = Message.builder()
106106
.role(Message.Role.ROLE_USER)
107107
.parts(List.of(new TextPart("create"))) // Send the "create" command
108-
.taskId(taskId)
109108
.messageId("test-msg-1")
110-
.contextId(contextId)
111109
.build();
112110

113-
// Use a latch to wait for the first operation to complete
111+
// Use a latch to wait for the first operation to complete and capture server-generated ids
114112
CountDownLatch createLatch = new CountDownLatch(1);
115-
client.sendMessage(createMessage, List.of((event, card) -> createLatch.countDown()), (e) -> createLatch.countDown());
113+
AtomicReference<Task> createdTaskRef = new AtomicReference<>();
114+
client.sendMessage(createMessage, List.of((event, card) -> {
115+
if (event instanceof TaskEvent taskEvent) {
116+
createdTaskRef.set(taskEvent.getTask());
117+
createLatch.countDown();
118+
return;
119+
}
120+
if (event instanceof TaskUpdateEvent taskUpdateEvent) {
121+
createdTaskRef.set(taskUpdateEvent.getTask());
122+
createLatch.countDown();
123+
}
124+
}), (e) -> createLatch.countDown());
116125
assertTrue(createLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for task creation");
117126

127+
assertNotNull(createdTaskRef.get(), "Task should have been created");
128+
final String taskId = createdTaskRef.get().id();
129+
final String contextId = createdTaskRef.get().contextId();
130+
118131
// Step 2: Set the push notification configuration
119132
TaskPushNotificationConfig taskPushConfig = TaskPushNotificationConfig.builder()
120133
.id("test-config-1")

extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import java.time.Duration;
1010
import java.util.Collections;
1111
import java.util.List;
12+
import java.util.concurrent.CountDownLatch;
1213
import java.util.concurrent.CopyOnWriteArrayList;
14+
import java.util.concurrent.TimeUnit;
1315
import java.util.concurrent.atomic.AtomicBoolean;
1416
import java.util.concurrent.atomic.AtomicInteger;
1517
import java.util.concurrent.atomic.AtomicReference;
@@ -19,6 +21,7 @@
1921
import org.a2aproject.sdk.A2A;
2022
import org.a2aproject.sdk.client.Client;
2123
import org.a2aproject.sdk.client.ClientEvent;
24+
import org.a2aproject.sdk.client.TaskEvent;
2225
import org.a2aproject.sdk.client.config.ClientConfig;
2326
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
2427
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
@@ -254,15 +257,13 @@ public void testInfrastructureStartup() {
254257
*/
255258
@Test
256259
public void testMultiInstanceEventReplication() throws Exception {
257-
final String taskId = "replication-test-task-" + System.currentTimeMillis();
258-
final String contextId = "replication-test-context";
259-
260260
Throwable testFailure = null;
261+
final String[] taskIdHolder = {null};
262+
final String[] contextIdHolder = {null};
263+
261264
try {
262-
// Step 1: Send initial message NON-streaming to create task
265+
// Step 1: Send initial message NON-streaming to create task (no client-provided taskId)
263266
Message initialMessage = Message.builder(A2A.toUserMessage("Initial test message"))
264-
.taskId(taskId)
265-
.contextId(contextId)
266267
.build();
267268

268269
// Use NON-streaming client to create the task
@@ -272,13 +273,22 @@ public void testMultiInstanceEventReplication() throws Exception {
272273
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
273274
.build();
274275

275-
Task createdTask = null;
276276
try {
277-
nonStreamingClient.sendMessage(initialMessage, null);
277+
CountDownLatch createLatch = new CountDownLatch(1);
278+
AtomicReference<Task> taskRef = new AtomicReference<>();
279+
nonStreamingClient.sendMessage(initialMessage, List.of((ClientEvent event, AgentCard card) -> {
280+
if (event instanceof TaskEvent te) {
281+
taskRef.set(te.getTask());
282+
}
283+
createLatch.countDown();
284+
}), (Throwable err) -> createLatch.countDown());
285+
assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
278286

279-
// Retrieve the task to verify it was created
280-
createdTask = nonStreamingClient.getTask(new TaskQueryParams(taskId), null);
287+
Task createdTask = taskRef.get();
281288
assertNotNull(createdTask, "Task should be created");
289+
taskIdHolder[0] = createdTask.id();
290+
contextIdHolder[0] = createdTask.contextId();
291+
assertNotNull(taskIdHolder[0], "Server-generated task ID should not be null");
282292

283293
// Task should be in a non-final state (SUBMITTED or WORKING are both valid)
284294
TaskState state = createdTask.status().state();
@@ -297,6 +307,9 @@ public void testMultiInstanceEventReplication() throws Exception {
297307
throw e;
298308
}
299309

310+
final String taskId = taskIdHolder[0];
311+
final String contextId = contextIdHolder[0];
312+
300313
// Step 2: Subscribe from both app1 and app2 with proper latches
301314

302315
// We need to wait for at least 3 new events after resubscription:

extras/queue-manager-replicated/tests-single-instance/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,14 @@ public void tearDown() throws Exception {
130130

131131
@Test
132132
public void testA2AMessageReplicatedToKafka() throws Exception {
133-
String taskId = "kafka-replication-test-" + System.currentTimeMillis();
134-
String contextId = "test-context-" + System.currentTimeMillis();
135-
136133
// Clear any previous events
137134
testConsumer.clear();
138135

139-
// Send A2A message that should trigger events and replication
136+
// Send A2A message that should trigger events and replication (no client-provided taskId)
140137
Message message = Message.builder()
141138
.role(Message.Role.ROLE_USER)
142139
.parts(List.of(new TextPart("create")))
143-
.taskId(taskId)
144140
.messageId("test-msg-" + System.currentTimeMillis())
145-
.contextId(contextId)
146141
.build();
147142

148143
CountDownLatch a2aLatch = new CountDownLatch(1);
@@ -166,7 +161,9 @@ public void testA2AMessageReplicatedToKafka() throws Exception {
166161

167162
Task task = createdTask.get();
168163
assertNotNull(task, "Task should be created");
169-
assertEquals(taskId, task.id());
164+
String taskId = task.id();
165+
String contextId = task.contextId();
166+
assertNotNull(taskId, "Server-generated task ID should not be null");
170167
assertEquals(TaskState.TASK_STATE_SUBMITTED, task.status().state());
171168

172169
// Wait for the event to be replicated to Kafka
@@ -193,19 +190,14 @@ public void testA2AMessageReplicatedToKafka() throws Exception {
193190

194191
@Test
195192
public void testKafkaEventReceivedByA2AServer() throws Exception {
196-
String taskId = "kafka-to-a2a-test-" + System.currentTimeMillis();
197-
String contextId = "test-context-" + System.currentTimeMillis();
198-
199193
// Clear any previous events
200194
testConsumer.clear();
201195

202-
// First create a task in the A2A system using non-streaming client
196+
// First create a task in the A2A system using non-streaming client (no client-provided taskId)
203197
Message createMessage = Message.builder()
204198
.role(Message.Role.ROLE_USER)
205199
.parts(List.of(new TextPart("create")))
206-
.taskId(taskId)
207200
.messageId("create-msg-" + System.currentTimeMillis())
208-
.contextId(contextId)
209201
.build();
210202

211203
CountDownLatch createLatch = new CountDownLatch(1);
@@ -223,6 +215,9 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {
223215
assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
224216
Task initialTask = createdTask.get();
225217
assertNotNull(initialTask, "Task should be created");
218+
String taskId = initialTask.id();
219+
String contextId = initialTask.contextId();
220+
assertNotNull(taskId, "Server-generated task ID should not be null");
226221
assertEquals(TaskState.TASK_STATE_SUBMITTED, initialTask.status().state(), "Initial task should be in SUBMITTED state");
227222

228223
// Add a small delay to ensure the task is fully processed before resubscription
@@ -312,20 +307,15 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {
312307

313308
@Test
314309
public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
315-
String taskId = "queue-closed-test-" + System.currentTimeMillis();
316-
String contextId = "test-context-" + System.currentTimeMillis();
317-
318310
// Clear any previous events
319311
testConsumer.clear();
320312

321-
// Use polling (non-blocking) client with "working" command
313+
// Use polling (non-blocking) client with "working" command (no client-provided taskId)
322314
// This creates task in WORKING state (non-final) and keeps queue alive
323315
Message workingMessage = Message.builder()
324316
.role(Message.Role.ROLE_USER)
325317
.parts(List.of(new TextPart("working")))
326-
.taskId(taskId)
327318
.messageId("working-msg-" + System.currentTimeMillis())
328-
.contextId(contextId)
329319
.build();
330320

331321
CountDownLatch workingLatch = new CountDownLatch(1);
@@ -348,7 +338,7 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
348338
assertTrue(workingLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
349339
String createdTaskId = taskIdRef.get();
350340
assertNotNull(createdTaskId, "Task should be created");
351-
assertEquals(taskId, createdTaskId);
341+
String taskId = createdTaskId;
352342

353343
// Set up streaming resubscription to listen for the QueueClosedEvent
354344
CountDownLatch streamCompletedLatch = new CountDownLatch(1);
@@ -408,19 +398,14 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
408398

409399
@Test
410400
public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
411-
String taskId = "poison-pill-gen-test-" + System.currentTimeMillis();
412-
String contextId = "test-context-" + System.currentTimeMillis();
413-
414401
// Clear any previous events
415402
testConsumer.clear();
416403

417-
// Create a task that will be completed (finalized)
404+
// Create a task that will be completed (finalized) - no client-provided taskId
418405
Message completeMessage = Message.builder()
419406
.role(Message.Role.ROLE_USER)
420407
.parts(List.of(new TextPart("complete")))
421-
.taskId(taskId)
422408
.messageId("complete-msg-" + System.currentTimeMillis())
423-
.contextId(contextId)
424409
.build();
425410

426411
CountDownLatch completeLatch = new CountDownLatch(1);
@@ -439,6 +424,8 @@ public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
439424
assertTrue(completeLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
440425
Task createdTask = finalTask.get();
441426
assertNotNull(createdTask, "Task should be created");
427+
String taskId = createdTask.id();
428+
assertNotNull(taskId, "Server-generated task ID should not be null");
442429

443430
// The task should complete very quickly since it's a simple operation
444431
// Wait a moment to ensure all events have been enqueued

extras/task-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/taskstore/database/jpa/JpaDatabaseTaskStoreIntegrationTest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
55
import static org.junit.jupiter.api.Assertions.assertNotNull;
6-
import static org.junit.jupiter.api.Assertions.assertNull;
76
import static org.junit.jupiter.api.Assertions.assertTrue;
87

98
import java.util.Collections;
@@ -69,17 +68,11 @@ public void testIsJpaDatabaseTaskStore() {
6968

7069
@Test
7170
public void testJpaDatabaseTaskStore() throws Exception {
72-
final String taskId = "test-task-1";
73-
final String contextId = "contextId";
74-
75-
// Send a message creating the Task
76-
assertNull(taskStore.get(taskId));
71+
// Send a message creating the Task (no client-provided taskId — server generates it)
7772
Message userMessage = Message.builder()
7873
.role(Message.Role.ROLE_USER)
7974
.parts(Collections.singletonList(new TextPart("create")))
80-
.taskId(taskId)
8175
.messageId("test-msg-1")
82-
.contextId(contextId)
8376
.build();
8477

8578
CountDownLatch latch = new CountDownLatch(1);
@@ -102,6 +95,9 @@ public void testJpaDatabaseTaskStore() throws Exception {
10295
assertEquals(0, createdTask.artifacts().size());
10396
assertEquals(TaskState.TASK_STATE_SUBMITTED, createdTask.status().state());
10497

98+
final String taskId = createdTask.id();
99+
final String contextId = createdTask.contextId();
100+
105101
// Send a message updating the Task
106102
userMessage = Message.builder()
107103
.role(Message.Role.ROLE_USER)

0 commit comments

Comments
 (0)