Skip to content

Commit 8f31f87

Browse files
kabirclaude
andauthored
fix: return UnsupportedOperationError when subscribing to terminal tasks (#784)
Per A2A spec, attempting to subscribe to a task in a terminal state (completed, failed, canceled, rejected) must return UnsupportedOperationError. The issue had two parts: 1. DefaultRequestHandler checked terminal state AFTER queue operations began, causing errors to be delivered via SSE stream instead of immediately. 2. For streaming endpoints (subscribeToTask), immediate errors must be wrapped in SSE format so the client's SSE parser can process them and deliver to the error handler callback. Previously they were sent as plain JSON/HTTP responses which the SSE parser couldn't handle. Changes: - Move terminal state validation BEFORE queue operations in DefaultRequestHandler - Wrap all immediate errors in SSE format for JSON-RPC routing layer - Wrap all immediate errors in SSE format for REST transport - Add integration test to verify fix across all three transports Applied to HTTP+JSON, JSON-RPC, and gRPC transports. Fixes #767 --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent d3f88cb commit 8f31f87

5 files changed

Lines changed: 95 additions & 45 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,15 +439,21 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
439439
*/
440440
private Multi<? extends A2AResponse<?>> processStreamingRequest(
441441
A2ARequest<?> request, ServerCallContext context) {
442-
Flow.Publisher<? extends A2AResponse<?>> publisher;
443-
if (request instanceof SendStreamingMessageRequest req) {
444-
publisher = jsonRpcHandler.onMessageSendStream(req, context);
445-
} else if (request instanceof SubscribeToTaskRequest req) {
446-
publisher = jsonRpcHandler.onSubscribeToTask(req, context);
447-
} else {
448-
return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError()));
442+
try {
443+
Flow.Publisher<? extends A2AResponse<?>> publisher;
444+
if (request instanceof SendStreamingMessageRequest req) {
445+
publisher = jsonRpcHandler.onMessageSendStream(req, context);
446+
} else if (request instanceof SubscribeToTaskRequest req) {
447+
publisher = jsonRpcHandler.onSubscribeToTask(req, context);
448+
} else {
449+
return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError()));
450+
}
451+
return Multi.createFrom().publisher(publisher);
452+
} catch (A2AError error) {
453+
// For streaming endpoints, wrap immediate errors (like TaskNotFoundError,
454+
// UnsupportedOperationError) in error response and send as first SSE event
455+
return Multi.createFrom().item(generateErrorResponse(request, error));
449456
}
450-
return Multi.createFrom().publisher(publisher);
451457
}
452458

453459
/**

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -832,16 +832,23 @@ public Flow.Publisher<StreamingEventKind> onSubscribeToTask(TaskIdParams params,
832832
throw new TaskNotFoundError();
833833
}
834834

835+
// Per A2A spec: subscription to tasks in terminal state (completed, failed, canceled,
836+
// rejected) MUST return UnsupportedOperationError.
837+
// Check BEFORE any queue operations to ensure immediate error response.
838+
if (task.status().state().isFinal()) {
839+
throw new UnsupportedOperationError(
840+
null,
841+
String.format("Cannot subscribe to task %s - task is in terminal state: %s",
842+
task.id(), task.status().state()),
843+
null);
844+
}
845+
835846
TaskManager taskManager = new TaskManager(task.id(), task.contextId(), taskStore, null);
836847
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor);
837848
EventQueue queue = queueManager.tap(task.id());
838849
LOGGER.debug("onSubscribeToTask - tapped queue: {}", queue != null ? System.identityHashCode(queue) : "null");
839850

840851
if (queue == null) {
841-
// If task is in final state, queue legitimately doesn't exist anymore
842-
if (task.status().state().isFinal()) {
843-
throw new TaskNotFoundError();
844-
}
845852
// For non-final tasks, recreate the queue so client can receive future events
846853
// (Note: historical events from before queue closed are not available)
847854
LOGGER.debug("Queue not found for active task {}, creating new queue for future events", task.id());

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,66 @@ public void testSubscribeNoExistingTaskError() throws Exception {
11301130
}
11311131
}
11321132

1133+
@Test
1134+
public void testSubscribeToTerminalTaskError() throws Exception {
1135+
// Create a task in terminal state (COMPLETED)
1136+
Task completedTask = Task.builder()
1137+
.id("terminal-task-test")
1138+
.contextId("session-xyz")
1139+
.status(new TaskStatus(TaskState.TASK_STATE_COMPLETED))
1140+
.build();
1141+
saveTaskInTaskStore(completedTask);
1142+
1143+
try {
1144+
CountDownLatch errorLatch = new CountDownLatch(1);
1145+
AtomicReference<Throwable> errorRef = new AtomicReference<>();
1146+
1147+
// Create error handler to capture the UnsupportedOperationError
1148+
Consumer<Throwable> errorHandler = error -> {
1149+
if (error == null) {
1150+
// Stream completed successfully - ignore, we're waiting for an error
1151+
return;
1152+
}
1153+
if (!isStreamClosedError(error)) {
1154+
errorRef.set(error);
1155+
}
1156+
errorLatch.countDown();
1157+
};
1158+
1159+
getClient().subscribeToTask(new TaskIdParams(completedTask.id()), List.of(), errorHandler);
1160+
1161+
// Wait for error to be captured
1162+
boolean errorReceived = errorLatch.await(10, TimeUnit.SECONDS);
1163+
1164+
if (errorReceived) {
1165+
// Error came via error handler
1166+
Throwable error = errorRef.get();
1167+
assertNotNull(error, "Should receive an error when subscribing to terminal task");
1168+
1169+
// Per spec STREAM-SUB-003: should return UnsupportedOperationError for terminal tasks
1170+
if (error instanceof A2AClientException) {
1171+
assertInstanceOf(UnsupportedOperationError.class, ((A2AClientException) error).getCause(),
1172+
"Error should be UnsupportedOperationError for terminal task");
1173+
} else {
1174+
// Check if it's directly an UnsupportedOperationError or walk the cause chain
1175+
Throwable cause = error;
1176+
boolean foundUnsupportedOperation = false;
1177+
while (cause != null && !foundUnsupportedOperation) {
1178+
if (cause instanceof UnsupportedOperationError) {
1179+
foundUnsupportedOperation = true;
1180+
}
1181+
cause = cause.getCause();
1182+
}
1183+
assertTrue(foundUnsupportedOperation, "Expected UnsupportedOperationError in error chain");
1184+
}
1185+
} else {
1186+
fail("Expected UnsupportedOperationError when subscribing to terminal task");
1187+
}
1188+
} finally {
1189+
deleteTaskInTaskStore(completedTask.id());
1190+
}
1191+
}
1192+
11331193
/**
11341194
* Regression test for race condition where MainQueue closed when first ChildQueue closed,
11351195
* preventing resubscription. With reference counting, MainQueue stays alive while any

transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,11 @@ public Flow.Publisher<SendStreamingMessageResponse> onSubscribeToTask(
388388
// We can't use the convertingProcessor convenience method since that propagates any errors as an error handled
389389
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
390390
return convertToSendStreamingMessageResponse(request.getId(), publisher);
391+
} catch (TaskNotFoundError | UnsupportedOperationError e) {
392+
// Re-throw initial validation errors for routing layer to wrap in SSE format
393+
throw e;
391394
} catch (A2AError e) {
395+
// Other A2AError types - wrap inline as part of the stream
392396
return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), e));
393397
} catch (Throwable throwable) {
394398
return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), new InternalError(throwable.getMessage())));

transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -985,41 +985,14 @@ public void testOnSubscribeNoExistingTaskError() {
985985
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
986986

987987
SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id()));
988-
Flow.Publisher<SendStreamingMessageResponse> response = handler.onSubscribeToTask(request, callContext);
989-
990-
List<SendStreamingMessageResponse> results = new ArrayList<>();
991-
AtomicReference<Throwable> error = new AtomicReference<>();
992-
993-
response.subscribe(new Flow.Subscriber<>() {
994-
private Flow.Subscription subscription;
995-
996-
@Override
997-
public void onSubscribe(Flow.Subscription subscription) {
998-
this.subscription = subscription;
999-
subscription.request(1);
1000-
}
1001-
1002-
@Override
1003-
public void onNext(SendStreamingMessageResponse item) {
1004-
results.add(item);
1005-
subscription.request(1);
1006-
}
1007-
1008-
@Override
1009-
public void onError(Throwable throwable) {
1010-
error.set(throwable);
1011-
subscription.cancel();
1012-
}
1013988

1014-
@Override
1015-
public void onComplete() {
1016-
subscription.cancel();
1017-
}
1018-
});
989+
// Per spec: TaskNotFoundError should be thrown immediately for non-existent tasks
990+
// The routing layer will catch this and convert to JSON-RPC error response
991+
TaskNotFoundError thrown = Assertions.assertThrows(
992+
TaskNotFoundError.class,
993+
() -> handler.onSubscribeToTask(request, callContext));
1019994

1020-
assertEquals(1, results.size());
1021-
assertNull(results.get(0).getResult());
1022-
assertInstanceOf(TaskNotFoundError.class, results.get(0).getError());
995+
Assertions.assertNotNull(thrown);
1023996
}
1024997

1025998
@Test

0 commit comments

Comments
 (0)