diff --git a/src/services/responses_pipeline.rs b/src/services/responses_pipeline.rs index 24d2fe57..143e791d 100644 --- a/src/services/responses_pipeline.rs +++ b/src/services/responses_pipeline.rs @@ -852,6 +852,11 @@ pub async fn collect_streaming_response_to_json(response: Response) -> Res return Response::from_parts(parts, Body::from(bytes)); } }; + // Persist `shell_call_output` after its paired `shell_call`. The shell tool + // emits the output's `done` before the call's, so `output_items` holds them + // result-before-call; replaying that order is an invalid upstream transcript + // (see `order_shell_outputs_after_calls`). + let output_items = crate::services::shell_tool::order_shell_outputs_after_calls(output_items); response_obj["output"] = serde_json::Value::Array(output_items); if let Some(usage) = usage_sum && let Ok(usage_val) = serde_json::to_value(&usage) diff --git a/src/services/server_tools/runner.rs b/src/services/server_tools/runner.rs index dd170458..1322e168 100644 --- a/src/services/server_tools/runner.rs +++ b/src/services/server_tools/runner.rs @@ -662,14 +662,21 @@ fn event_type_of(event: &[u8]) -> Option { struct StreamRewriter { seq: u64, next_output_index: u64, - /// item id → assigned `output_index`, so added/done/delta events for - /// one item share a slot. + /// slot key (the item id, or `(type, id)` for the hosted-shell pair) → + /// assigned `output_index`, so an item's added/done/delta events share + /// one slot. See [`Self::index_for`] for why the shell pair is qualified. item_index: HashMap, /// Output items captured in forward order, for terminal-output /// reconstruction. output_items: Vec, - /// item id → position in `output_items`, to dedupe a re-emitted item. - output_pos: HashMap, + /// `(item type, item id)` → position in `output_items`, to dedupe a + /// re-emitted item. Keyed on type *and* id because a `shell_call` and + /// its `shell_call_output` deliberately share one id (the model's + /// `call_id` doubles as the item id); keying on id alone let the + /// second-emitted item overwrite the first, silently dropping the tool + /// result from the persisted `response.output` and corrupting + /// `previous_response_id` replay. + output_pos: HashMap<(String, String), usize>, /// Stable Hadrian `resp_…` id stamped onto every lifecycle event's /// `response.id`, replacing the provider's per-turn id (e.g. /// OpenRouter's `gen-…`). Matches the persisted/retrievable id so a @@ -983,7 +990,11 @@ impl StreamRewriter { .or_else(|| obj.get("item_id").and_then(|v| v.as_str())) .map(str::to_string); if let Some(id) = id { - let idx = self.index_for(&id); + let item_type = obj + .get("item") + .and_then(|i| i.get("type")) + .and_then(|v| v.as_str()); + let idx = self.index_for(item_type, &id); obj.insert("output_index".into(), serde_json::Value::from(idx)); } } @@ -1002,7 +1013,7 @@ impl StreamRewriter { { resp.insert( "output".into(), - serde_json::Value::Array(self.output_items.clone()), + serde_json::Value::Array(self.ordered_output_items()), ); } @@ -1058,26 +1069,61 @@ impl StreamRewriter { s } - fn index_for(&mut self, id: &str) -> u64 { - if let Some(i) = self.item_index.get(id) { + /// The `output_index` for an item, stable across its lifecycle events. + /// + /// Keyed on the item `id`, which is normally unique — and *must* stay the + /// key for delta events (`output_text.delta`, `function_call_arguments.delta`, + /// …) that reference their item by `item_id` alone (no `type`), so they + /// land on the slot their `output_item.added`/`done` already claimed. + /// + /// The sole exception is the hosted-shell pair: a `shell_call` and its + /// `shell_call_output` deliberately share one id (the model's `call_id` + /// doubles as the item id), so keying on id alone collapses them onto a + /// single `output_index` — a streaming client tracking items by slot then + /// renders only the last-arriving one. Those two types ride exclusively on + /// full `output_item.added`/`done` events (never deltas), so qualifying + /// just them by `(type, id)` splits the pair into two slots safely, and + /// (the call's added arrives first) in call-before-output order to match + /// the terminal reordering. Mirrors the `(type, id)` keying + /// [`Self::record_output_item`] uses for `output_pos`. + fn index_for(&mut self, item_type: Option<&str>, id: &str) -> u64 { + let key = match item_type { + Some(t @ ("shell_call" | "shell_call_output")) => format!("{t}\u{0}{id}"), + _ => id.to_string(), + }; + if let Some(i) = self.item_index.get(&key) { return *i; } let i = self.next_output_index; self.next_output_index += 1; - self.item_index.insert(id.to_string(), i); + self.item_index.insert(key, i); i } fn record_output_item(&mut self, item: serde_json::Value) { if let Some(id) = item.get("id").and_then(|v| v.as_str()).map(str::to_string) { - if let Some(&pos) = self.output_pos.get(&id) { + let kind = item + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let key = (kind, id); + if let Some(&pos) = self.output_pos.get(&key) { self.output_items[pos] = item; return; } - self.output_pos.insert(id, self.output_items.len()); + self.output_pos.insert(key, self.output_items.len()); } self.output_items.push(item); } + + /// The captured output items for the persisted `response.output`, with + /// every `shell_call_output` reordered to follow its paired `shell_call`. + /// See [`crate::services::shell_tool::order_shell_outputs_after_calls`] for + /// why this matters for `previous_response_id` replay. + fn ordered_output_items(&self) -> Vec { + crate::services::shell_tool::order_shell_outputs_after_calls(self.output_items.clone()) + } } /// True iff `event` is the SSE `[DONE]` end-of-stream sentinel @@ -1290,6 +1336,50 @@ mod rewriter_tests { assert_eq!(data(&delta)["output_index"], 1); } + #[test] + fn shell_call_and_output_get_distinct_live_output_index() { + let mut r = StreamRewriter::new(None, Vec::new()); + // The shell_call and its shell_call_output share one id (the model's + // call_id), but the live stream must still give them two distinct + // output_index slots — keyed on (type, id) — with the call's the lower + // one so the live order matches the terminal reordering. A following + // message's delta must keep sharing its own (id-keyed) slot. + let added_call = r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.added","output_index":0, + "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"} + }))); + let added_output = r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.added","output_index":0, + "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"} + }))); + // Output `done` arrives before the call `done` (the shell tool's wire + // ordering); each must land back on the slot its `added` claimed. + let done_output = r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":0, + "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"} + }))); + let done_call = r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":0, + "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"} + }))); + let added_msg = r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.added","output_index":0, + "item":{"type":"message","id":"msg_1"} + }))); + let msg_delta = r.rewrite(ev(serde_json::json!({ + "type":"response.output_text.delta","output_index":0,"item_id":"msg_1","delta":"x" + }))); + // Two distinct slots for the pair, call below its output. + assert_eq!(data(&added_call)["output_index"], 0); + assert_eq!(data(&added_output)["output_index"], 1); + assert_eq!(data(&done_output)["output_index"], 1); + assert_eq!(data(&done_call)["output_index"], 0); + // The message takes the next slot after the pair, and its delta — which + // carries only `item_id`, no `type` — still resolves to that same slot. + assert_eq!(data(&added_msg)["output_index"], 2); + assert_eq!(data(&msg_delta)["output_index"], 2); + } + #[test] fn reconstructs_terminal_output_from_done_items() { let mut r = StreamRewriter::new(None, Vec::new()); @@ -1321,6 +1411,77 @@ mod rewriter_tests { assert_eq!(items[2]["type"], "message"); } + #[test] + fn shell_call_and_output_both_persist_in_call_then_output_order() { + let mut r = StreamRewriter::new(None, Vec::new()); + // A signed reasoning block, then a shell call whose OUTPUT `done` is + // emitted before the CALL `done` (the shell tool's wire-ordering + // invariant), then the final message. The shell_call and its + // shell_call_output share one id (the model's call_id doubles as the + // item id) — the regression dropped one of the pair via that shared id. + r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":0, + "item":{"type":"reasoning","id":"rs_1","summary":[], + "content":[{"type":"reasoning_text","text":"think"}], + "signature":"sig","format":"anthropic-claude-v1","status":"completed"} + }))); + // output BEFORE call, both id == call_id == "toolu_1". + r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":1, + "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1", + "status":"completed", + "output":[{"stdout":"hi\n","stderr":"", + "outcome":{"type":"exit","exit_code":0}}], + "output_files":[]} + }))); + r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":1, + "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1", + "status":"completed","action":{"commands":["echo hi"]}} + }))); + r.rewrite(ev(serde_json::json!({ + "type":"response.output_item.done","output_index":2, + "item":{"type":"message","id":"msg_1","role":"assistant"} + }))); + let terminal = r.rewrite(ev(serde_json::json!({ + "type":"response.completed", + "response":{"id":"resp_1","output":[{"type":"message","id":"msg_1"}]} + }))); + let out = data(&terminal); + let items = out["response"]["output"].as_array().unwrap(); + // Both the call and its output survive the shared id ... + assert_eq!(items.len(), 4, "got {items:#?}"); + assert_eq!(items[0]["type"], "reasoning"); + // ... and the call is persisted BEFORE its output for valid replay. + assert_eq!(items[1]["type"], "shell_call"); + assert_eq!(items[2]["type"], "shell_call_output"); + assert_eq!(items[1]["call_id"], "toolu_1"); + assert_eq!(items[2]["call_id"], "toolu_1"); + assert_eq!(items[3]["type"], "message"); + } + + #[test] + fn persisted_shell_call_output_deserializes_as_output_item() { + // `previous_response_id` replay (services/responses_chain.rs) parses + // the stored `output` array as `Vec`; a shape that + // fails would turn the (previously silently dropped) tool result into a + // hard `CorruptRecord` error. Lock the round-trip for the exact JSON + // `format_shell_call_output_item` emits. + use crate::api_types::responses::ResponsesOutputItem; + let item = serde_json::json!({ + "type":"shell_call_output","id":"toolu_1","call_id":"toolu_1", + "status":"completed", + "output":[{"stdout":"hi\n","stderr":"", + "outcome":{"type":"exit","exit_code":0},"created_by":"gateway"}], + "output_files":[], + "max_output_length":1000, + "created_by":"gateway" + }); + let parsed: ResponsesOutputItem = + serde_json::from_value(item).expect("shell_call_output must parse as an output item"); + assert!(matches!(parsed, ResponsesOutputItem::ShellCallOutput(_))); + } + #[test] fn accumulates_usage_across_suppressed_turns() { let mut r = StreamRewriter::new(None, Vec::new()); diff --git a/src/services/shell_tool.rs b/src/services/shell_tool.rs index c99ac4bc..c1ee825c 100644 --- a/src/services/shell_tool.rs +++ b/src/services/shell_tool.rs @@ -796,6 +796,70 @@ fn rewrite_shell_history_to_function_calls(payload: &mut CreateResponsesPayload) } } +/// Reorder a rebuilt `response.output` array so every `shell_call_output` +/// immediately follows its paired `shell_call` (matched by `call_id`). +/// +/// The shell tool emits the output item's terminal `done` *before* the call's +/// (see `emit_success_done`'s wire-ordering invariant), so any consumer that +/// rebuilds `response.output` from the forwarded `output_item.done` events +/// captures the result *ahead* of its call. Persisting that order breaks +/// `previous_response_id` replay: `responses_chain` → +/// [`rewrite_shell_history_to_function_calls`] maps the array 1:1 to +/// `function_call` / `function_call_output`, and a tool result preceding its +/// call is an invalid upstream transcript — for Anthropic-family models the +/// per-step assistant messages collapse, which relocates the signed `thinking` +/// blocks and trips "`thinking` … blocks cannot be modified". Reordering only +/// the rebuilt array keeps the live wire order intact while persisting a +/// replayable transcript. Items already in call-then-output order (e.g. a +/// passthrough upstream) are left untouched, as are non-shell items. +pub(crate) fn order_shell_outputs_after_calls( + items: Vec, +) -> Vec { + fn type_of(item: &serde_json::Value) -> Option<&str> { + item.get("type").and_then(|v| v.as_str()) + } + fn call_id_of(item: &serde_json::Value) -> Option<&str> { + item.get("call_id").and_then(|v| v.as_str()) + } + + let mut result: Vec = Vec::with_capacity(items.len()); + // shell_call_output items whose paired shell_call hasn't been placed yet + // (the normal case: the wire emits the output first). + let mut pending: Vec = Vec::new(); + for item in items { + if type_of(&item) == Some("shell_call_output") { + // Slot it straight after its call if already placed; otherwise hold + // it until the call arrives. + let placed_at = call_id_of(&item).and_then(|cid| { + result.iter().rposition(|it| { + type_of(it) == Some("shell_call") && call_id_of(it) == Some(cid) + }) + }); + match placed_at { + Some(pos) => result.insert(pos + 1, item), + None => pending.push(item), + } + continue; + } + let is_call = type_of(&item) == Some("shell_call"); + let cid = call_id_of(&item).map(str::to_string); + result.push(item); + // A call whose output was emitted first: attach it now, right after. + if is_call + && let Some(cid) = cid + && let Some(p) = pending + .iter() + .position(|o| call_id_of(o) == Some(cid.as_str())) + { + let out = pending.remove(p); + result.push(out); + } + } + // Any outputs whose call never appeared — keep, don't drop. + result.append(&mut pending); + result +} + /// Reconstruct the model-emitted `function_call` from a stored `shell_call`. /// The arguments mirror the `shell` function-tool schema /// (`{ "action": { … } }`). `id` is left unset: the stored `shell_call` @@ -3130,6 +3194,54 @@ mod tests { ))); } + #[test] + fn order_shell_outputs_handles_output_before_call_and_interleaving() { + let reasoning = serde_json::json!({"type":"reasoning","id":"rs_1"}); + let call = |c: &str| serde_json::json!({"type":"shell_call","id":c,"call_id":c}); + let out = |c: &str| serde_json::json!({"type":"shell_call_output","id":c,"call_id":c}); + let msg = serde_json::json!({"type":"message","id":"msg_1"}); + + // Wire order from the shell tool: each output's `done` precedes its + // call's `done` (output-before-call). + let input = vec![ + reasoning.clone(), + out("a"), + call("a"), + out("b"), + call("b"), + msg.clone(), + ]; + let ordered = super::order_shell_outputs_after_calls(input); + let types: Vec<&str> = ordered + .iter() + .map(|i| i["type"].as_str().unwrap()) + .collect(); + assert_eq!( + types, + vec![ + "reasoning", + "shell_call", + "shell_call_output", + "shell_call", + "shell_call_output", + "message", + ] + ); + // Each output still sits next to the call it pairs with. + assert_eq!(ordered[1]["call_id"], "a"); + assert_eq!(ordered[2]["call_id"], "a"); + assert_eq!(ordered[3]["call_id"], "b"); + assert_eq!(ordered[4]["call_id"], "b"); + + // Already call-then-output (e.g. a passthrough upstream) is preserved. + let already_ordered = vec![call("a"), out("a"), msg.clone()]; + let types: Vec = super::order_shell_outputs_after_calls(already_ordered) + .iter() + .map(|i| i["type"].as_str().unwrap().to_string()) + .collect(); + assert_eq!(types, vec!["shell_call", "shell_call_output", "message"]); + } + #[test] fn shell_hint_describes_client_passthrough() { let hint = ShellToolHint {