diff --git a/src/services/responses_pipeline.rs b/src/services/responses_pipeline.rs
index 24d2fe57..143e791d 100644
--- a/src/services/responses_pipeline.rs
+++ b/src/services/responses_pipeline.rs
@@ -852,6 +852,11 @@ pub async fn collect_streaming_response_to_json(response: Response
) -> Res
return Response::from_parts(parts, Body::from(bytes));
}
};
+ // Persist `shell_call_output` after its paired `shell_call`. The shell tool
+ // emits the output's `done` before the call's, so `output_items` holds them
+ // result-before-call; replaying that order is an invalid upstream transcript
+ // (see `order_shell_outputs_after_calls`).
+ let output_items = crate::services::shell_tool::order_shell_outputs_after_calls(output_items);
response_obj["output"] = serde_json::Value::Array(output_items);
if let Some(usage) = usage_sum
&& let Ok(usage_val) = serde_json::to_value(&usage)
diff --git a/src/services/server_tools/runner.rs b/src/services/server_tools/runner.rs
index dd170458..1322e168 100644
--- a/src/services/server_tools/runner.rs
+++ b/src/services/server_tools/runner.rs
@@ -662,14 +662,21 @@ fn event_type_of(event: &[u8]) -> Option {
struct StreamRewriter {
seq: u64,
next_output_index: u64,
- /// item id → assigned `output_index`, so added/done/delta events for
- /// one item share a slot.
+ /// slot key (the item id, or `(type, id)` for the hosted-shell pair) →
+ /// assigned `output_index`, so an item's added/done/delta events share
+ /// one slot. See [`Self::index_for`] for why the shell pair is qualified.
item_index: HashMap,
/// Output items captured in forward order, for terminal-output
/// reconstruction.
output_items: Vec,
- /// item id → position in `output_items`, to dedupe a re-emitted item.
- output_pos: HashMap,
+ /// `(item type, item id)` → position in `output_items`, to dedupe a
+ /// re-emitted item. Keyed on type *and* id because a `shell_call` and
+ /// its `shell_call_output` deliberately share one id (the model's
+ /// `call_id` doubles as the item id); keying on id alone let the
+ /// second-emitted item overwrite the first, silently dropping the tool
+ /// result from the persisted `response.output` and corrupting
+ /// `previous_response_id` replay.
+ output_pos: HashMap<(String, String), usize>,
/// Stable Hadrian `resp_…` id stamped onto every lifecycle event's
/// `response.id`, replacing the provider's per-turn id (e.g.
/// OpenRouter's `gen-…`). Matches the persisted/retrievable id so a
@@ -983,7 +990,11 @@ impl StreamRewriter {
.or_else(|| obj.get("item_id").and_then(|v| v.as_str()))
.map(str::to_string);
if let Some(id) = id {
- let idx = self.index_for(&id);
+ let item_type = obj
+ .get("item")
+ .and_then(|i| i.get("type"))
+ .and_then(|v| v.as_str());
+ let idx = self.index_for(item_type, &id);
obj.insert("output_index".into(), serde_json::Value::from(idx));
}
}
@@ -1002,7 +1013,7 @@ impl StreamRewriter {
{
resp.insert(
"output".into(),
- serde_json::Value::Array(self.output_items.clone()),
+ serde_json::Value::Array(self.ordered_output_items()),
);
}
@@ -1058,26 +1069,61 @@ impl StreamRewriter {
s
}
- fn index_for(&mut self, id: &str) -> u64 {
- if let Some(i) = self.item_index.get(id) {
+ /// The `output_index` for an item, stable across its lifecycle events.
+ ///
+ /// Keyed on the item `id`, which is normally unique — and *must* stay the
+ /// key for delta events (`output_text.delta`, `function_call_arguments.delta`,
+ /// …) that reference their item by `item_id` alone (no `type`), so they
+ /// land on the slot their `output_item.added`/`done` already claimed.
+ ///
+ /// The sole exception is the hosted-shell pair: a `shell_call` and its
+ /// `shell_call_output` deliberately share one id (the model's `call_id`
+ /// doubles as the item id), so keying on id alone collapses them onto a
+ /// single `output_index` — a streaming client tracking items by slot then
+ /// renders only the last-arriving one. Those two types ride exclusively on
+ /// full `output_item.added`/`done` events (never deltas), so qualifying
+ /// just them by `(type, id)` splits the pair into two slots safely, and
+ /// (the call's added arrives first) in call-before-output order to match
+ /// the terminal reordering. Mirrors the `(type, id)` keying
+ /// [`Self::record_output_item`] uses for `output_pos`.
+ fn index_for(&mut self, item_type: Option<&str>, id: &str) -> u64 {
+ let key = match item_type {
+ Some(t @ ("shell_call" | "shell_call_output")) => format!("{t}\u{0}{id}"),
+ _ => id.to_string(),
+ };
+ if let Some(i) = self.item_index.get(&key) {
return *i;
}
let i = self.next_output_index;
self.next_output_index += 1;
- self.item_index.insert(id.to_string(), i);
+ self.item_index.insert(key, i);
i
}
fn record_output_item(&mut self, item: serde_json::Value) {
if let Some(id) = item.get("id").and_then(|v| v.as_str()).map(str::to_string) {
- if let Some(&pos) = self.output_pos.get(&id) {
+ let kind = item
+ .get("type")
+ .and_then(|v| v.as_str())
+ .unwrap_or_default()
+ .to_string();
+ let key = (kind, id);
+ if let Some(&pos) = self.output_pos.get(&key) {
self.output_items[pos] = item;
return;
}
- self.output_pos.insert(id, self.output_items.len());
+ self.output_pos.insert(key, self.output_items.len());
}
self.output_items.push(item);
}
+
+ /// The captured output items for the persisted `response.output`, with
+ /// every `shell_call_output` reordered to follow its paired `shell_call`.
+ /// See [`crate::services::shell_tool::order_shell_outputs_after_calls`] for
+ /// why this matters for `previous_response_id` replay.
+ fn ordered_output_items(&self) -> Vec {
+ crate::services::shell_tool::order_shell_outputs_after_calls(self.output_items.clone())
+ }
}
/// True iff `event` is the SSE `[DONE]` end-of-stream sentinel
@@ -1290,6 +1336,50 @@ mod rewriter_tests {
assert_eq!(data(&delta)["output_index"], 1);
}
+ #[test]
+ fn shell_call_and_output_get_distinct_live_output_index() {
+ let mut r = StreamRewriter::new(None, Vec::new());
+ // The shell_call and its shell_call_output share one id (the model's
+ // call_id), but the live stream must still give them two distinct
+ // output_index slots — keyed on (type, id) — with the call's the lower
+ // one so the live order matches the terminal reordering. A following
+ // message's delta must keep sharing its own (id-keyed) slot.
+ let added_call = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.added","output_index":0,
+ "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"}
+ })));
+ let added_output = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.added","output_index":0,
+ "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"}
+ })));
+ // Output `done` arrives before the call `done` (the shell tool's wire
+ // ordering); each must land back on the slot its `added` claimed.
+ let done_output = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":0,
+ "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"}
+ })));
+ let done_call = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":0,
+ "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"}
+ })));
+ let added_msg = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.added","output_index":0,
+ "item":{"type":"message","id":"msg_1"}
+ })));
+ let msg_delta = r.rewrite(ev(serde_json::json!({
+ "type":"response.output_text.delta","output_index":0,"item_id":"msg_1","delta":"x"
+ })));
+ // Two distinct slots for the pair, call below its output.
+ assert_eq!(data(&added_call)["output_index"], 0);
+ assert_eq!(data(&added_output)["output_index"], 1);
+ assert_eq!(data(&done_output)["output_index"], 1);
+ assert_eq!(data(&done_call)["output_index"], 0);
+ // The message takes the next slot after the pair, and its delta — which
+ // carries only `item_id`, no `type` — still resolves to that same slot.
+ assert_eq!(data(&added_msg)["output_index"], 2);
+ assert_eq!(data(&msg_delta)["output_index"], 2);
+ }
+
#[test]
fn reconstructs_terminal_output_from_done_items() {
let mut r = StreamRewriter::new(None, Vec::new());
@@ -1321,6 +1411,77 @@ mod rewriter_tests {
assert_eq!(items[2]["type"], "message");
}
+ #[test]
+ fn shell_call_and_output_both_persist_in_call_then_output_order() {
+ let mut r = StreamRewriter::new(None, Vec::new());
+ // A signed reasoning block, then a shell call whose OUTPUT `done` is
+ // emitted before the CALL `done` (the shell tool's wire-ordering
+ // invariant), then the final message. The shell_call and its
+ // shell_call_output share one id (the model's call_id doubles as the
+ // item id) — the regression dropped one of the pair via that shared id.
+ r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":0,
+ "item":{"type":"reasoning","id":"rs_1","summary":[],
+ "content":[{"type":"reasoning_text","text":"think"}],
+ "signature":"sig","format":"anthropic-claude-v1","status":"completed"}
+ })));
+ // output BEFORE call, both id == call_id == "toolu_1".
+ r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":1,
+ "item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1",
+ "status":"completed",
+ "output":[{"stdout":"hi\n","stderr":"",
+ "outcome":{"type":"exit","exit_code":0}}],
+ "output_files":[]}
+ })));
+ r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":1,
+ "item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1",
+ "status":"completed","action":{"commands":["echo hi"]}}
+ })));
+ r.rewrite(ev(serde_json::json!({
+ "type":"response.output_item.done","output_index":2,
+ "item":{"type":"message","id":"msg_1","role":"assistant"}
+ })));
+ let terminal = r.rewrite(ev(serde_json::json!({
+ "type":"response.completed",
+ "response":{"id":"resp_1","output":[{"type":"message","id":"msg_1"}]}
+ })));
+ let out = data(&terminal);
+ let items = out["response"]["output"].as_array().unwrap();
+ // Both the call and its output survive the shared id ...
+ assert_eq!(items.len(), 4, "got {items:#?}");
+ assert_eq!(items[0]["type"], "reasoning");
+ // ... and the call is persisted BEFORE its output for valid replay.
+ assert_eq!(items[1]["type"], "shell_call");
+ assert_eq!(items[2]["type"], "shell_call_output");
+ assert_eq!(items[1]["call_id"], "toolu_1");
+ assert_eq!(items[2]["call_id"], "toolu_1");
+ assert_eq!(items[3]["type"], "message");
+ }
+
+ #[test]
+ fn persisted_shell_call_output_deserializes_as_output_item() {
+ // `previous_response_id` replay (services/responses_chain.rs) parses
+ // the stored `output` array as `Vec`; a shape that
+ // fails would turn the (previously silently dropped) tool result into a
+ // hard `CorruptRecord` error. Lock the round-trip for the exact JSON
+ // `format_shell_call_output_item` emits.
+ use crate::api_types::responses::ResponsesOutputItem;
+ let item = serde_json::json!({
+ "type":"shell_call_output","id":"toolu_1","call_id":"toolu_1",
+ "status":"completed",
+ "output":[{"stdout":"hi\n","stderr":"",
+ "outcome":{"type":"exit","exit_code":0},"created_by":"gateway"}],
+ "output_files":[],
+ "max_output_length":1000,
+ "created_by":"gateway"
+ });
+ let parsed: ResponsesOutputItem =
+ serde_json::from_value(item).expect("shell_call_output must parse as an output item");
+ assert!(matches!(parsed, ResponsesOutputItem::ShellCallOutput(_)));
+ }
+
#[test]
fn accumulates_usage_across_suppressed_turns() {
let mut r = StreamRewriter::new(None, Vec::new());
diff --git a/src/services/shell_tool.rs b/src/services/shell_tool.rs
index c99ac4bc..c1ee825c 100644
--- a/src/services/shell_tool.rs
+++ b/src/services/shell_tool.rs
@@ -796,6 +796,70 @@ fn rewrite_shell_history_to_function_calls(payload: &mut CreateResponsesPayload)
}
}
+/// Reorder a rebuilt `response.output` array so every `shell_call_output`
+/// immediately follows its paired `shell_call` (matched by `call_id`).
+///
+/// The shell tool emits the output item's terminal `done` *before* the call's
+/// (see `emit_success_done`'s wire-ordering invariant), so any consumer that
+/// rebuilds `response.output` from the forwarded `output_item.done` events
+/// captures the result *ahead* of its call. Persisting that order breaks
+/// `previous_response_id` replay: `responses_chain` →
+/// [`rewrite_shell_history_to_function_calls`] maps the array 1:1 to
+/// `function_call` / `function_call_output`, and a tool result preceding its
+/// call is an invalid upstream transcript — for Anthropic-family models the
+/// per-step assistant messages collapse, which relocates the signed `thinking`
+/// blocks and trips "`thinking` … blocks cannot be modified". Reordering only
+/// the rebuilt array keeps the live wire order intact while persisting a
+/// replayable transcript. Items already in call-then-output order (e.g. a
+/// passthrough upstream) are left untouched, as are non-shell items.
+pub(crate) fn order_shell_outputs_after_calls(
+ items: Vec,
+) -> Vec {
+ fn type_of(item: &serde_json::Value) -> Option<&str> {
+ item.get("type").and_then(|v| v.as_str())
+ }
+ fn call_id_of(item: &serde_json::Value) -> Option<&str> {
+ item.get("call_id").and_then(|v| v.as_str())
+ }
+
+ let mut result: Vec = Vec::with_capacity(items.len());
+ // shell_call_output items whose paired shell_call hasn't been placed yet
+ // (the normal case: the wire emits the output first).
+ let mut pending: Vec = Vec::new();
+ for item in items {
+ if type_of(&item) == Some("shell_call_output") {
+ // Slot it straight after its call if already placed; otherwise hold
+ // it until the call arrives.
+ let placed_at = call_id_of(&item).and_then(|cid| {
+ result.iter().rposition(|it| {
+ type_of(it) == Some("shell_call") && call_id_of(it) == Some(cid)
+ })
+ });
+ match placed_at {
+ Some(pos) => result.insert(pos + 1, item),
+ None => pending.push(item),
+ }
+ continue;
+ }
+ let is_call = type_of(&item) == Some("shell_call");
+ let cid = call_id_of(&item).map(str::to_string);
+ result.push(item);
+ // A call whose output was emitted first: attach it now, right after.
+ if is_call
+ && let Some(cid) = cid
+ && let Some(p) = pending
+ .iter()
+ .position(|o| call_id_of(o) == Some(cid.as_str()))
+ {
+ let out = pending.remove(p);
+ result.push(out);
+ }
+ }
+ // Any outputs whose call never appeared — keep, don't drop.
+ result.append(&mut pending);
+ result
+}
+
/// Reconstruct the model-emitted `function_call` from a stored `shell_call`.
/// The arguments mirror the `shell` function-tool schema
/// (`{ "action": { … } }`). `id` is left unset: the stored `shell_call`
@@ -3130,6 +3194,54 @@ mod tests {
)));
}
+ #[test]
+ fn order_shell_outputs_handles_output_before_call_and_interleaving() {
+ let reasoning = serde_json::json!({"type":"reasoning","id":"rs_1"});
+ let call = |c: &str| serde_json::json!({"type":"shell_call","id":c,"call_id":c});
+ let out = |c: &str| serde_json::json!({"type":"shell_call_output","id":c,"call_id":c});
+ let msg = serde_json::json!({"type":"message","id":"msg_1"});
+
+ // Wire order from the shell tool: each output's `done` precedes its
+ // call's `done` (output-before-call).
+ let input = vec![
+ reasoning.clone(),
+ out("a"),
+ call("a"),
+ out("b"),
+ call("b"),
+ msg.clone(),
+ ];
+ let ordered = super::order_shell_outputs_after_calls(input);
+ let types: Vec<&str> = ordered
+ .iter()
+ .map(|i| i["type"].as_str().unwrap())
+ .collect();
+ assert_eq!(
+ types,
+ vec![
+ "reasoning",
+ "shell_call",
+ "shell_call_output",
+ "shell_call",
+ "shell_call_output",
+ "message",
+ ]
+ );
+ // Each output still sits next to the call it pairs with.
+ assert_eq!(ordered[1]["call_id"], "a");
+ assert_eq!(ordered[2]["call_id"], "a");
+ assert_eq!(ordered[3]["call_id"], "b");
+ assert_eq!(ordered[4]["call_id"], "b");
+
+ // Already call-then-output (e.g. a passthrough upstream) is preserved.
+ let already_ordered = vec![call("a"), out("a"), msg.clone()];
+ let types: Vec = super::order_shell_outputs_after_calls(already_ordered)
+ .iter()
+ .map(|i| i["type"].as_str().unwrap().to_string())
+ .collect();
+ assert_eq!(types, vec!["shell_call", "shell_call_output", "message"]);
+ }
+
#[test]
fn shell_hint_describes_client_passthrough() {
let hint = ShellToolHint {