From 767a6ed6103360a36f828d8b6a9a82641a3693a7 Mon Sep 17 00:00:00 2001 From: ScriptSmith Date: Sun, 31 May 2026 21:51:49 +1000 Subject: [PATCH 1/2] Re-order shell call events --- src/services/shell_tool.rs | 114 +++++++++++++++++++++++++++---------- 1 file changed, 84 insertions(+), 30 deletions(-) diff --git a/src/services/shell_tool.rs b/src/services/shell_tool.rs index 7a216bd..bd0abfa 100644 --- a/src/services/shell_tool.rs +++ b/src/services/shell_tool.rs @@ -1116,6 +1116,10 @@ fn format_shell_call_output_item( /// before producing real output (boot failure, passthrough misconfig, /// exec error). Both items carry `status: "incomplete"`; the output /// item's stderr surfaces the error message so the model can react. +/// +/// The output item's `done` is sent before the call's `done` for the +/// same reason as the success path: the call resolving must reliably +/// mean its paired output is already on the wire. #[allow(clippy::too_many_arguments)] async fn emit_failure_done( event_tx: &mpsc::Sender, @@ -1128,34 +1132,34 @@ async fn emit_failure_done( stderr: &str, ) { let _ = event_tx - .send(format_shell_call_item( + .send(format_shell_call_output_item( ItemLifecycle::Done, id, id, 0, - commands, - timeout_ms, + -1, + "", + stderr, + &[], + true, max_output_length, - env, - working_directory, - "incomplete", - None, - Some("model"), + Some("gateway"), )) .await; let _ = event_tx - .send(format_shell_call_output_item( + .send(format_shell_call_item( ItemLifecycle::Done, id, id, 0, - -1, - "", - stderr, - &[], - true, + commands, + timeout_ms, max_output_length, - Some("gateway"), + env, + working_directory, + "incomplete", + None, + Some("model"), )) .await; } @@ -2259,35 +2263,44 @@ impl ServerExecutedTool for ShellExecutor { "type": "container_reference", "container_id": container_id_for_items, }); + // Ordering invariant: the `shell_call_output` item's terminal + // `done` is sent BEFORE the `shell_call`'s. Hadrian overloads + // `shell_call.status` with the execution outcome (`completed` + // vs `incomplete`), so consumers reasonably read the call's + // `done(completed)` as "execution finished — output available". + // Emitting the output first makes that a reliable signal: by + // the time the call resolves, its paired output (same + // `call_id`) is already on the wire. (Events share one ordered + // channel, so this is a strict wire ordering, not a hint.) let _ = event_tx - .send(format_shell_call_item( + .send(format_shell_call_output_item( ItemLifecycle::Done, &id_for_task, &id_for_task, 0, - &commands_for_task, - action_timeout_ms_for_task, + exit_for_report, + &stdout_render, + &stderr_render, + &new_files, + killed, model_max_output_length, - action_env_for_task.as_ref(), - working_directory_for_task.as_deref(), - shell_call_status, - Some(&environment_val), - Some("model"), + Some("gateway"), )) .await; let _ = event_tx - .send(format_shell_call_output_item( + .send(format_shell_call_item( ItemLifecycle::Done, &id_for_task, &id_for_task, 0, - exit_for_report, - &stdout_render, - &stderr_render, - &new_files, - killed, + &commands_for_task, + action_timeout_ms_for_task, model_max_output_length, - Some("gateway"), + action_env_for_task.as_ref(), + working_directory_for_task.as_deref(), + shell_call_status, + Some(&environment_val), + Some("model"), )) .await; info!( @@ -3331,4 +3344,45 @@ mod tests { assert_eq!(files[0]["file_id"], "cfile_abc"); assert_eq!(files[0]["container_id"], "cntr_test"); } + + /// Parse an SSE `data:` frame into the carried `output_item.` + /// event JSON. + fn parse_sse_event(bytes: &Bytes) -> Value { + let s = std::str::from_utf8(bytes).unwrap(); + let json_str = s.trim().strip_prefix("data: ").unwrap(); + serde_json::from_str(json_str).unwrap() + } + + /// The terminal `done` events must arrive output-then-call so that a + /// consumer treating `shell_call → done` as "fully resolved" never + /// races the paired `shell_call_output`. Covers the failure path + /// directly; the success path emits the same pair in the same order. + #[tokio::test] + async fn failure_done_emits_output_before_call() { + let (tx, mut rx) = mpsc::channel::(8); + emit_failure_done( + &tx, + "call_1", + &["echo hi".to_string()], + None, + None, + None, + None, + "boot failed", + ) + .await; + drop(tx); + + let first = parse_sse_event(&rx.recv().await.unwrap()); + let second = parse_sse_event(&rx.recv().await.unwrap()); + assert!(rx.recv().await.is_none(), "exactly two events expected"); + + assert_eq!(first["type"], "response.output_item.done"); + assert_eq!(first["item"]["type"], "shell_call_output"); + assert_eq!(first["item"]["call_id"], "call_1"); + + assert_eq!(second["type"], "response.output_item.done"); + assert_eq!(second["item"]["type"], "shell_call"); + assert_eq!(second["item"]["call_id"], "call_1"); + } } From 03e4668cd23b855ee14e64a1545a3ba13edb0e7d Mon Sep 17 00:00:00 2001 From: ScriptSmith Date: Sun, 31 May 2026 22:04:22 +1000 Subject: [PATCH 2/2] Review fixes --- src/services/shell_tool.rs | 167 +++++++++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 44 deletions(-) diff --git a/src/services/shell_tool.rs b/src/services/shell_tool.rs index bd0abfa..1c656d2 100644 --- a/src/services/shell_tool.rs +++ b/src/services/shell_tool.rs @@ -1164,6 +1164,71 @@ async fn emit_failure_done( .await; } +/// Emit the spec-canonical `output_item.done` events for a shell call +/// that produced real output (normal exit, non-zero exit, or timeout). +/// +/// Ordering invariant: the `shell_call_output` item's terminal `done` is +/// sent BEFORE the `shell_call`'s. Hadrian overloads `shell_call.status` +/// with the execution outcome (`completed` vs `incomplete`), so consumers +/// reasonably read the call's `done(completed)` as "execution finished — +/// output available". Emitting the output first makes that a reliable +/// signal: by the time the call resolves, its paired output (same +/// `call_id`) is already on the wire. (Events share one ordered channel, +/// so this is a strict wire ordering, not a hint.) +/// +/// `killed` drives both terminal statuses (`incomplete` on a timeout / +/// no-exit, `completed` otherwise) so the call and its output can never +/// disagree. +#[allow(clippy::too_many_arguments)] +async fn emit_success_done( + event_tx: &mpsc::Sender, + id: &str, + commands: &[String], + timeout_ms: Option, + max_output_length: Option, + env: Option<&HashMap>, + working_directory: Option<&str>, + exit_code: i32, + stdout: &str, + stderr: &str, + files: &[ContainerFileRef], + killed: bool, + environment: Option<&serde_json::Value>, +) { + let _ = event_tx + .send(format_shell_call_output_item( + ItemLifecycle::Done, + id, + id, + 0, + exit_code, + stdout, + stderr, + files, + killed, + max_output_length, + Some("gateway"), + )) + .await; + let shell_call_status = if killed { "incomplete" } else { "completed" }; + let _ = event_tx + .send(format_shell_call_item( + ItemLifecycle::Done, + id, + id, + 0, + commands, + timeout_ms, + max_output_length, + env, + working_directory, + shell_call_status, + environment, + Some("model"), + )) + .await; +} + // ───────────────────────────────────────────────────────────────────────────── // Per-call env / working_directory threading // ───────────────────────────────────────────────────────────────────────────── @@ -2255,54 +2320,28 @@ impl ServerExecutedTool for ShellExecutor { std::mem::replace(&mut stdout_buf, BoundedHeadTail::new(max_chars)).into_trimmed(); let stderr_render = std::mem::replace(&mut stderr_buf, BoundedHeadTail::new(max_chars)).into_trimmed(); - // Spec status: `incomplete` when we killed/timed out, - // `completed` otherwise (a non-zero exit code still counts - // as `completed` per spec). - let shell_call_status = if killed { "incomplete" } else { "completed" }; let environment_val = serde_json::json!({ "type": "container_reference", "container_id": container_id_for_items, }); - // Ordering invariant: the `shell_call_output` item's terminal - // `done` is sent BEFORE the `shell_call`'s. Hadrian overloads - // `shell_call.status` with the execution outcome (`completed` - // vs `incomplete`), so consumers reasonably read the call's - // `done(completed)` as "execution finished — output available". - // Emitting the output first makes that a reliable signal: by - // the time the call resolves, its paired output (same - // `call_id`) is already on the wire. (Events share one ordered - // channel, so this is a strict wire ordering, not a hint.) - let _ = event_tx - .send(format_shell_call_output_item( - ItemLifecycle::Done, - &id_for_task, - &id_for_task, - 0, - exit_for_report, - &stdout_render, - &stderr_render, - &new_files, - killed, - model_max_output_length, - Some("gateway"), - )) - .await; - let _ = event_tx - .send(format_shell_call_item( - ItemLifecycle::Done, - &id_for_task, - &id_for_task, - 0, - &commands_for_task, - action_timeout_ms_for_task, - model_max_output_length, - action_env_for_task.as_ref(), - working_directory_for_task.as_deref(), - shell_call_status, - Some(&environment_val), - Some("model"), - )) - .await; + // Terminal `done` pair, output-before-call — see + // `emit_success_done` for the ordering invariant. + emit_success_done( + &event_tx, + &id_for_task, + &commands_for_task, + action_timeout_ms_for_task, + model_max_output_length, + action_env_for_task.as_ref(), + working_directory_for_task.as_deref(), + exit_for_report, + &stdout_render, + &stderr_render, + &new_files, + killed, + Some(&environment_val), + ) + .await; info!( stage = "shell_completed", call_id = %id_for_task, @@ -3385,4 +3424,44 @@ mod tests { assert_eq!(second["item"]["type"], "shell_call"); assert_eq!(second["item"]["call_id"], "call_1"); } + + /// The production success path (every normal shell execution) must + /// emit the output item's `done` before the call's `done`, with both + /// carrying `completed` for a clean exit. Guards against a future + /// re-inversion of the `send` order inside `ShellExecutor`. + #[tokio::test] + async fn success_done_emits_output_before_call() { + let (tx, mut rx) = mpsc::channel::(8); + emit_success_done( + &tx, + "call_1", + &["echo hi".to_string()], + None, + None, + None, + None, + 0, + "hi\n", + "", + &[], + false, + None, + ) + .await; + drop(tx); + + let first = parse_sse_event(&rx.recv().await.unwrap()); + let second = parse_sse_event(&rx.recv().await.unwrap()); + assert!(rx.recv().await.is_none(), "exactly two events expected"); + + assert_eq!(first["type"], "response.output_item.done"); + assert_eq!(first["item"]["type"], "shell_call_output"); + assert_eq!(first["item"]["status"], "completed"); + assert_eq!(first["item"]["output"][0]["stdout"], "hi\n"); + + assert_eq!(second["type"], "response.output_item.done"); + assert_eq!(second["item"]["type"], "shell_call"); + assert_eq!(second["item"]["status"], "completed"); + assert_eq!(second["item"]["call_id"], "call_1"); + } }