Skip to content

Commit d3f88cb

Browse files
authored
fix: resolve event stream race conditions in EventConsumer and SSE tr… (#782)
…ansport Fixed multiple race conditions causing intermittent test failures in streaming and subscription scenarios. These fixes reduced the failure rate from ~33% to 0% across 1,500+ test iterations in CI. Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready. Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller. Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay. Fixed race in onSubscribeToTask where initial task snapshot was enqueued but EventConsumer polling hadn't started yet. Added insertingProcessor() utility to AsyncUtils that prepends initial items synchronously to reactive streams using mutiny-zero ZeroPublisher, ensuring subscriber receives initial task snapshot immediately on subscription. Updated transport layer unit tests (JSONRPCHandlerTest, GrpcHandlerTest) to expect initial task snapshot per A2A Protocol Specification 3.1.6. Fixed insertingProcessor() to respect reactive streams semantics by sending inserted items in the source's onSubscribe() callback after subscription is established, rather than immediately in the ZeroPublisher creation lambda. Fixed two test race conditions where tests checked for received events immediately after subscription was established (server-side metric), without waiting for consumer callbacks to actually process events: - testSubscribeToTaskWithInterruptedStateKeepsStreamOpen: Added initialTaskLatch to wait for initial TaskEvent reception - testNonBlockingWithMultipleMessages: Added streamConsumerReadyLatch to wait for streaming consumer to start receiving events Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios. Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends. Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability. Tests fixed: - testNonBlockingWithMultipleMessages - testCancelTaskSuccess - testSubscribeToTaskWithInterruptedStateKeepsStreamOpen
1 parent c12888d commit d3f88cb

12 files changed

Lines changed: 422 additions & 257 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,15 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
299299
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
300300
.end(serializeResponse(error));
301301
} else if (streaming) {
302-
final Multi<? extends A2AResponse<?>> finalStreamingResponse = streamingResponse;
303-
executor.execute(() -> {
304-
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
305-
AtomicLong eventIdCounter = new AtomicLong(0);
306-
Multi<String> sseEvents = finalStreamingResponse
307-
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
308-
// Write SSE-formatted strings to HTTP response
309-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
310-
});
302+
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
303+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
304+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
305+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
306+
AtomicLong eventIdCounter = new AtomicLong(0);
307+
Multi<String> sseEvents = streamingResponse
308+
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
309+
// Write SSE-formatted strings to HTTP response
310+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
311311

312312
} else {
313313
rc.response()
@@ -783,7 +783,17 @@ public void onNext(String sseEvent) {
783783
if (headers.get(CONTENT_TYPE) == null) {
784784
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
785785
}
786+
// Additional SSE headers to prevent buffering
787+
headers.set("Cache-Control", "no-cache");
788+
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
786789
response.setChunked(true);
790+
791+
// CRITICAL: Disable write queue max size to prevent buffering
792+
// Vert.x buffers writes by default - we need immediate flushing for SSE
793+
response.setWriteQueueMaxSize(1);
794+
795+
// Send initial SSE comment to kickstart the stream
796+
response.write(": SSE stream started\n\n");
787797
}
788798

789799
// Write SSE-formatted string to response

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,15 @@ public void sendMessageStreaming(@Body String body, RoutingContext rc) {
221221
if (error != null) {
222222
sendResponse(rc, error);
223223
} else if (streamingResponse != null) {
224-
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
225-
executor.execute(() -> {
226-
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
227-
AtomicLong eventIdCounter = new AtomicLong(0);
228-
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
229-
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
230-
// Write SSE-formatted strings to HTTP response
231-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
232-
});
224+
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
225+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
226+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
227+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
228+
AtomicLong eventIdCounter = new AtomicLong(0);
229+
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
230+
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
231+
// Write SSE-formatted strings to HTTP response
232+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
233233
}
234234
}
235235
}
@@ -431,15 +431,15 @@ public void subscribeToTask(RoutingContext rc) {
431431
if (error != null) {
432432
sendResponse(rc, error);
433433
} else if (streamingResponse != null) {
434-
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
435-
executor.execute(() -> {
436-
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
437-
AtomicLong eventIdCounter = new AtomicLong(0);
438-
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
439-
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
440-
// Write SSE-formatted strings to HTTP response
441-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
442-
});
434+
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
435+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
436+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
437+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
438+
AtomicLong eventIdCounter = new AtomicLong(0);
439+
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
440+
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
441+
// Write SSE-formatted strings to HTTP response
442+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
443443
}
444444
}
445445
}

0 commit comments

Comments
 (0)