Skip to content

Commit cc27228

Browse files
authored
fix: Fixing last TCK issues (#795)
Fixes #791 🦕 --------- Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent e3e436a commit cc27228

12 files changed

Lines changed: 434 additions & 127 deletions

File tree

extras/http-client-vertx/src/main/java/org/a2aproject/sdk/client/http/VertxA2AHttpClient.java

Lines changed: 208 additions & 65 deletions
Large diffs are not rendered by default.

extras/http-client-vertx/src/test/java/org/a2aproject/sdk/client/http/VertxA2AHttpClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void testVertxParameterConstructor() {
2525

2626
@Test
2727
public void testVertxParameterConstructorNullThrows() {
28-
assertThrows(NullPointerException.class, () -> {
28+
assertThrows(IllegalArgumentException.class, () -> {
2929
new VertxA2AHttpClient(null);
3030
});
3131
}

extras/opentelemetry/server/src/main/java/org/a2aproject/sdk/extras/opentelemetry/OpenTelemetryRequestHandlerDecorator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import jakarta.enterprise.inject.Any;
4444
import jakarta.inject.Inject;
4545
import java.util.concurrent.Flow;
46+
import org.jspecify.annotations.Nullable;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -455,7 +456,12 @@ public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigP
455456
span.end();
456457
}
457458
}
458-
459+
460+
@Override
461+
public void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError {
462+
delegate.validateRequestedTask(requestedTaskId);
463+
}
464+
459465
private boolean extractRequest() {
460466
return Boolean.getBoolean(EXTRACT_REQUEST_SYS_PROPERTY);
461467
}

http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ protected CompletableFuture<Void> asyncRequest(
142142
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
143143
private Flow.@Nullable Subscription subscription;
144144
private volatile boolean errorRaised = false;
145+
private boolean isSseStream = false;
146+
private boolean firstMeaningfulLineSeen = false;
145147

146148
@Override
147149
public void onSubscribe(Flow.Subscription subscription) {
@@ -151,10 +153,23 @@ public void onSubscribe(Flow.Subscription subscription) {
151153

152154
@Override
153155
public void onNext(String item) {
154-
// SSE messages sometimes start with "data:". Strip that off
155-
if (item != null && item.startsWith("data:")) {
156-
item = item.substring(5).trim();
157-
if (!item.isEmpty()) {
156+
if (item != null && !item.isEmpty()) {
157+
if (!firstMeaningfulLineSeen) {
158+
firstMeaningfulLineSeen = true;
159+
isSseStream = item.startsWith("data:") || item.startsWith(":")
160+
|| item.startsWith("event:") || item.startsWith("id:")
161+
|| item.startsWith("retry:");
162+
}
163+
if (isSseStream) {
164+
if (item.startsWith("data:")) {
165+
String data = item.substring(5).trim();
166+
if (!data.isEmpty()) {
167+
messageConsumer.accept(data);
168+
}
169+
}
170+
// Other SSE control lines (event:, id:, retry:, :) are ignored
171+
} else {
172+
// Plain error body: deliver so SSEEventListener can parse the typed error
158173
messageConsumer.accept(item);
159174
}
160175
}
@@ -300,7 +315,6 @@ private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException
300315
@Override
301316
public A2AHttpResponse post() throws IOException, InterruptedException {
302317
HttpRequest request = createRequestBuilder(false)
303-
.POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8))
304318
.build();
305319
HttpResponse<String> response =
306320
httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8));

reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,12 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
438438
* @return a Multi stream of JSON-RPC responses
439439
*/
440440
private Multi<? extends A2AResponse<?>> processStreamingRequest(
441-
A2ARequest<?> request, ServerCallContext context) {
441+
A2ARequest<?> request, ServerCallContext context) throws A2AError {
442+
if (request instanceof SendStreamingMessageRequest req) {
443+
jsonRpcHandler.validateRequestedTask(req.getParams().message().taskId());
444+
} else if (request instanceof SubscribeToTaskRequest req) {
445+
jsonRpcHandler.validateRequestedTask(req.getParams().id());
446+
}
442447
try {
443448
Flow.Publisher<? extends A2AResponse<?>> publisher;
444449
if (request instanceof SendStreamingMessageRequest req) {
@@ -450,8 +455,6 @@ private Multi<? extends A2AResponse<?>> processStreamingRequest(
450455
}
451456
return Multi.createFrom().publisher(publisher);
452457
} catch (A2AError error) {
453-
// For streaming endpoints, wrap immediate errors (like TaskNotFoundError,
454-
// UnsupportedOperationError) in error response and send as first SSE event
455458
return Multi.createFrom().item(generateErrorResponse(request, error));
456459
}
457460
}
@@ -601,7 +604,7 @@ private String extractTenant(RoutingContext rc) {
601604
* "error": {
602605
* "code": -32602,
603606
* "message": "Invalid params",
604-
* "details": { ... }
607+
* "data": [ ... ]
605608
* }
606609
* }
607610
* }</pre>

server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con
339339
Instant now = Instant.now();
340340
if (params.statusTimestampAfter().isAfter(now)) {
341341
Map<String, Object> errorData = new HashMap<>();
342-
errorData.put("parameter", "lastUpdatedAfter");
342+
errorData.put("parameter", "statusTimestampAfter");
343343
errorData.put("reason", "Timestamp cannot be in the future");
344344
throw new InvalidParamsError(null, "Invalid params", errorData);
345345
}
@@ -1061,6 +1061,23 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon
10611061
return new MessageSendSetup(taskManager, task, requestContext);
10621062
}
10631063

1064+
@Override
1065+
public void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError {
1066+
if (requestedTaskId == null) {
1067+
return;
1068+
}
1069+
Task task = taskStore.get(requestedTaskId);
1070+
if (task == null) {
1071+
throw new TaskNotFoundError();
1072+
}
1073+
1074+
if (task.status().state().isFinal()) {
1075+
throw new UnsupportedOperationError(null, String.format(
1076+
"Cannot send message to task %s: task is in terminal state %s and cannot accept further messages",
1077+
task.id(), task.status().state()), null);
1078+
}
1079+
}
1080+
10641081
private @Nullable Task validateRequestedTask(MessageSendParams params) throws A2AError {
10651082
String requestedTaskId = params.message().taskId();
10661083
if (requestedTaskId == null) {
@@ -1162,24 +1179,5 @@ private void logThreadStats(String label) {
11621179
THREAD_STATS_LOGGER.debug("=== END THREAD STATS ===");
11631180
}
11641181

1165-
/**
1166-
* Check if an event represents a final task state.
1167-
*
1168-
* @param eventKind the event to check
1169-
* @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN)
1170-
*/
1171-
private boolean isFinalEvent(EventKind eventKind) {
1172-
if (!(eventKind instanceof Event event)) {
1173-
return false;
1174-
}
1175-
if (event instanceof Task task) {
1176-
return task.status() != null && task.status().state() != null
1177-
&& task.status().state().isFinal();
1178-
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
1179-
return statusUpdate.isFinal();
1180-
}
1181-
return false;
1182-
}
1183-
11841182
private record MessageSendSetup(TaskManager taskManager, @Nullable Task task, RequestContext requestContext) {}
11851183
}

server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/RequestHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.a2aproject.sdk.spec.TaskIdParams;
1919
import org.a2aproject.sdk.spec.TaskPushNotificationConfig;
2020
import org.a2aproject.sdk.spec.TaskQueryParams;
21+
import org.jspecify.annotations.Nullable;
2122

2223
public interface RequestHandler {
2324
Task onGetTask(
@@ -59,4 +60,6 @@ ListTaskPushNotificationConfigsResult onListTaskPushNotificationConfigs(
5960
void onDeleteTaskPushNotificationConfig(
6061
DeleteTaskPushNotificationConfigParams params,
6162
ServerCallContext context) throws A2AError;
63+
64+
void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError;
6265
}

spec-grpc/src/main/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtils.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package org.a2aproject.sdk.grpc.utils;
22

33
import org.a2aproject.sdk.spec.A2AErrorCodes;
4+
45
import static org.a2aproject.sdk.spec.A2AMethods.CANCEL_TASK_METHOD;
56
import static org.a2aproject.sdk.spec.A2AMethods.GET_EXTENDED_AGENT_CARD_METHOD;
67
import static org.a2aproject.sdk.spec.A2AMethods.SEND_STREAMING_MESSAGE_METHOD;
78

89
import java.io.IOException;
910
import java.io.StringWriter;
11+
import java.util.List;
1012
import java.util.Map;
1113
import java.util.UUID;
1214
import java.util.logging.Level;
@@ -66,6 +68,7 @@
6668
import org.a2aproject.sdk.spec.TaskNotFoundError;
6769
import org.a2aproject.sdk.spec.UnsupportedOperationError;
6870
import org.a2aproject.sdk.spec.VersionNotSupportedError;
71+
import org.a2aproject.sdk.util.ErrorDetail;
6972
import org.a2aproject.sdk.util.Utils;
7073
import org.jspecify.annotations.Nullable;
7174

@@ -284,11 +287,12 @@ public static StreamResponse parseResponseEvent(String body) throws JsonMappingE
284287
JsonElement jelement = JsonParser.parseString(body);
285288
JsonObject jsonRpc = jelement.getAsJsonObject();
286289
String version = getAndValidateJsonrpc(jsonRpc);
287-
Object id = getAndValidateId(jsonRpc);
288-
JsonElement paramsNode = jsonRpc.get("result");
290+
// Check for error before validating id: per JSON-RPC spec, error responses may have null id
289291
if (jsonRpc.has("error")) {
290292
throw processError(jsonRpc.getAsJsonObject("error"));
291293
}
294+
Object id = getAndValidateId(jsonRpc);
295+
JsonElement paramsNode = jsonRpc.get("result");
292296
StreamResponse.Builder builder = StreamResponse.newBuilder();
293297
parseRequestBody(paramsNode, builder, id);
294298
return builder.build();
@@ -392,8 +396,16 @@ private static A2AError processError(JsonObject error) {
392396
String message = error.has("message") ? error.get("message").getAsString() : null;
393397
Integer code = error.has("code") ? error.get("code").getAsInt() : null;
394398
Map<String, Object> details = null;
395-
if (error.has("data") && error.get("data").isJsonObject()) {
396-
details =GSON.fromJson(error.get("data"), Map.class);
399+
if (error.has("data")) {
400+
JsonElement data = error.get("data");
401+
if (data.isJsonObject()) {
402+
details = GSON.fromJson(data, Map.class);
403+
} else if (data.isJsonArray() && !data.getAsJsonArray().isEmpty()) {
404+
JsonElement first = data.getAsJsonArray().get(0);
405+
if (first.isJsonObject()) {
406+
details = GSON.fromJson(first.getAsJsonObject(), Map.class);
407+
}
408+
}
397409
}
398410
if (code != null) {
399411
A2AErrorCodes errorCode = A2AErrorCodes.fromCode(code);
@@ -605,10 +617,10 @@ public static String toJsonRPCErrorResponse(Object requestId, A2AError error) {
605617
output.beginObject();
606618
output.name("code").value(error.getCode());
607619
output.name("message").value(error.getMessage());
608-
if (!error.getDetails().isEmpty()) {
609-
output.name("data");
610-
GSON.toJson(error.getDetails(), Map.class, output);
611-
}
620+
A2AErrorCodes a2aErrorCode = A2AErrorCodes.fromCode(error.getCode());
621+
String reason = a2aErrorCode != null ? a2aErrorCode.name() : A2AErrorCodes.INTERNAL.name();
622+
output.name("data");
623+
GSON.toJson(List.of(ErrorDetail.of(reason, error.getDetails())), List.class, output);
612624
output.endObject();
613625
output.endObject();
614626
return result.toString();

0 commit comments

Comments
 (0)