1+ package io .a2a .tck .server ;
2+
3+ import jakarta .annotation .PreDestroy ;
4+ import jakarta .enterprise .context .ApplicationScoped ;
5+ import jakarta .enterprise .inject .Produces ;
6+
7+ import io .a2a .server .agentexecution .AgentExecutor ;
8+ import io .a2a .server .agentexecution .RequestContext ;
9+ import io .a2a .server .events .EventQueue ;
10+ import io .a2a .server .tasks .TaskUpdater ;
11+ import io .a2a .spec .JSONRPCError ;
12+ import io .a2a .spec .Task ;
13+ import io .a2a .spec .TaskNotCancelableError ;
14+ import io .a2a .spec .TaskState ;
15+ import io .a2a .spec .TaskStatus ;
16+ import io .a2a .spec .TaskStatusUpdateEvent ;
17+
18+ @ ApplicationScoped
19+ public class AgentExecutorProducer {
20+
21+ @ Produces
22+ public AgentExecutor agentExecutor () {
23+ return new FireAndForgetAgentExecutor ();
24+ }
25+
26+ private static class FireAndForgetAgentExecutor implements AgentExecutor {
27+ @ Override
28+ public void execute (RequestContext context , EventQueue eventQueue ) throws JSONRPCError {
29+ Task task = context .getTask ();
30+
31+ if (task == null ) {
32+ task = new Task .Builder ()
33+ .id (context .getTaskId ())
34+ .contextId (context .getContextId ())
35+ .status (new TaskStatus (TaskState .SUBMITTED ))
36+ .history (context .getMessage ())
37+ .build ();
38+ eventQueue .enqueueEvent (task );
39+ }
40+
41+ // Sleep to allow task state persistence before TCK resubscribe test
42+ if (context .getMessage ().getMessageId ().startsWith ("test-resubscribe-message-id" )) {
43+ int timeoutMs = Integer .parseInt (System .getenv ().getOrDefault ("RESUBSCRIBE_TIMEOUT_MS" , "3000" ));
44+ System .out .println ("====> task id starts with test-resubscribe-message-id, sleeping for " + timeoutMs + " ms" );
45+ try {
46+ Thread .sleep (timeoutMs );
47+ } catch (InterruptedException e ) {
48+ Thread .currentThread ().interrupt ();
49+ }
50+ }
51+ TaskUpdater updater = new TaskUpdater (context , eventQueue );
52+
53+ // Immediately set to WORKING state
54+ updater .startWork ();
55+ System .out .println ("====> task set to WORKING, starting background execution" );
56+
57+ // Method returns immediately - task continues in background
58+ System .out .println ("====> execute() method returning immediately, task running in background" );
59+ }
60+
61+ @ Override
62+ public void cancel (RequestContext context , EventQueue eventQueue ) throws JSONRPCError {
63+ System .out .println ("====> task cancel request received" );
64+ Task task = context .getTask ();
65+
66+ if (task .getStatus ().state () == TaskState .CANCELED ) {
67+ System .out .println ("====> task already canceled" );
68+ throw new TaskNotCancelableError ();
69+ }
70+
71+ if (task .getStatus ().state () == TaskState .COMPLETED ) {
72+ System .out .println ("====> task already completed" );
73+ throw new TaskNotCancelableError ();
74+ }
75+
76+ TaskUpdater updater = new TaskUpdater (context , eventQueue );
77+ updater .cancel ();
78+ eventQueue .enqueueEvent (new TaskStatusUpdateEvent .Builder ()
79+ .taskId (task .getId ())
80+ .contextId (task .getContextId ())
81+ .status (new TaskStatus (TaskState .CANCELED ))
82+ .isFinal (true )
83+ .build ());
84+
85+ System .out .println ("====> task canceled" );
86+ }
87+
88+ /**
89+ * Cleanup method for proper resource management
90+ */
91+ @ PreDestroy
92+ public void cleanup () {
93+ System .out .println ("====> shutting down task executor" );
94+ }
95+ }
96+ }
0 commit comments