Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/services/responses_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,11 @@ pub async fn collect_streaming_response_to_json(response: Response<Body>) -> Res
return Response::from_parts(parts, Body::from(bytes));
}
};
// Persist `shell_call_output` after its paired `shell_call`. The shell tool
// emits the output's `done` before the call's, so `output_items` holds them
// result-before-call; replaying that order is an invalid upstream transcript
// (see `order_shell_outputs_after_calls`).
let output_items = crate::services::shell_tool::order_shell_outputs_after_calls(output_items);
response_obj["output"] = serde_json::Value::Array(output_items);
if let Some(usage) = usage_sum
&& let Ok(usage_val) = serde_json::to_value(&usage)
Expand Down
183 changes: 172 additions & 11 deletions src/services/server_tools/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,14 +662,21 @@ fn event_type_of(event: &[u8]) -> Option<String> {
struct StreamRewriter {
seq: u64,
next_output_index: u64,
/// item id → assigned `output_index`, so added/done/delta events for
/// one item share a slot.
/// slot key (the item id, or `(type, id)` for the hosted-shell pair) →
/// assigned `output_index`, so an item's added/done/delta events share
/// one slot. See [`Self::index_for`] for why the shell pair is qualified.
item_index: HashMap<String, u64>,
/// Output items captured in forward order, for terminal-output
/// reconstruction.
output_items: Vec<serde_json::Value>,
/// item id → position in `output_items`, to dedupe a re-emitted item.
output_pos: HashMap<String, usize>,
/// `(item type, item id)` → position in `output_items`, to dedupe a
/// re-emitted item. Keyed on type *and* id because a `shell_call` and
/// its `shell_call_output` deliberately share one id (the model's
/// `call_id` doubles as the item id); keying on id alone let the
/// second-emitted item overwrite the first, silently dropping the tool
/// result from the persisted `response.output` and corrupting
/// `previous_response_id` replay.
output_pos: HashMap<(String, String), usize>,
/// Stable Hadrian `resp_…` id stamped onto every lifecycle event's
/// `response.id`, replacing the provider's per-turn id (e.g.
/// OpenRouter's `gen-…`). Matches the persisted/retrievable id so a
Expand Down Expand Up @@ -983,7 +990,11 @@ impl StreamRewriter {
.or_else(|| obj.get("item_id").and_then(|v| v.as_str()))
.map(str::to_string);
if let Some(id) = id {
let idx = self.index_for(&id);
let item_type = obj
.get("item")
.and_then(|i| i.get("type"))
.and_then(|v| v.as_str());
let idx = self.index_for(item_type, &id);
obj.insert("output_index".into(), serde_json::Value::from(idx));
}
}
Expand All @@ -1002,7 +1013,7 @@ impl StreamRewriter {
{
resp.insert(
"output".into(),
serde_json::Value::Array(self.output_items.clone()),
serde_json::Value::Array(self.ordered_output_items()),
);
}

Expand Down Expand Up @@ -1058,26 +1069,61 @@ impl StreamRewriter {
s
}

fn index_for(&mut self, id: &str) -> u64 {
if let Some(i) = self.item_index.get(id) {
/// The `output_index` for an item, stable across its lifecycle events.
///
/// Keyed on the item `id`, which is normally unique — and *must* stay the
/// key for delta events (`output_text.delta`, `function_call_arguments.delta`,
/// …) that reference their item by `item_id` alone (no `type`), so they
/// land on the slot their `output_item.added`/`done` already claimed.
///
/// The sole exception is the hosted-shell pair: a `shell_call` and its
/// `shell_call_output` deliberately share one id (the model's `call_id`
/// doubles as the item id), so keying on id alone collapses them onto a
/// single `output_index` — a streaming client tracking items by slot then
/// renders only the last-arriving one. Those two types ride exclusively on
/// full `output_item.added`/`done` events (never deltas), so qualifying
/// just them by `(type, id)` splits the pair into two slots safely, and
/// (the call's added arrives first) in call-before-output order to match
/// the terminal reordering. Mirrors the `(type, id)` keying
/// [`Self::record_output_item`] uses for `output_pos`.
fn index_for(&mut self, item_type: Option<&str>, id: &str) -> u64 {
let key = match item_type {
Some(t @ ("shell_call" | "shell_call_output")) => format!("{t}\u{0}{id}"),
_ => id.to_string(),
};
if let Some(i) = self.item_index.get(&key) {
return *i;
}
let i = self.next_output_index;
self.next_output_index += 1;
self.item_index.insert(id.to_string(), i);
self.item_index.insert(key, i);
i
}

fn record_output_item(&mut self, item: serde_json::Value) {
if let Some(id) = item.get("id").and_then(|v| v.as_str()).map(str::to_string) {
if let Some(&pos) = self.output_pos.get(&id) {
let kind = item
.get("type")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let key = (kind, id);
if let Some(&pos) = self.output_pos.get(&key) {
self.output_items[pos] = item;
return;
}
self.output_pos.insert(id, self.output_items.len());
self.output_pos.insert(key, self.output_items.len());
}
self.output_items.push(item);
}

/// The captured output items for the persisted `response.output`, with
/// every `shell_call_output` reordered to follow its paired `shell_call`.
/// See [`crate::services::shell_tool::order_shell_outputs_after_calls`] for
/// why this matters for `previous_response_id` replay.
fn ordered_output_items(&self) -> Vec<serde_json::Value> {
crate::services::shell_tool::order_shell_outputs_after_calls(self.output_items.clone())
}
}

/// True iff `event` is the SSE `[DONE]` end-of-stream sentinel
Expand Down Expand Up @@ -1290,6 +1336,50 @@ mod rewriter_tests {
assert_eq!(data(&delta)["output_index"], 1);
}

#[test]
fn shell_call_and_output_get_distinct_live_output_index() {
let mut r = StreamRewriter::new(None, Vec::new());
// The shell_call and its shell_call_output share one id (the model's
// call_id), but the live stream must still give them two distinct
// output_index slots — keyed on (type, id) — with the call's the lower
// one so the live order matches the terminal reordering. A following
// message's delta must keep sharing its own (id-keyed) slot.
let added_call = r.rewrite(ev(serde_json::json!({
"type":"response.output_item.added","output_index":0,
"item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"}
})));
let added_output = r.rewrite(ev(serde_json::json!({
"type":"response.output_item.added","output_index":0,
"item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"}
})));
// Output `done` arrives before the call `done` (the shell tool's wire
// ordering); each must land back on the slot its `added` claimed.
let done_output = r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":0,
"item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1"}
})));
let done_call = r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":0,
"item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1"}
})));
let added_msg = r.rewrite(ev(serde_json::json!({
"type":"response.output_item.added","output_index":0,
"item":{"type":"message","id":"msg_1"}
})));
let msg_delta = r.rewrite(ev(serde_json::json!({
"type":"response.output_text.delta","output_index":0,"item_id":"msg_1","delta":"x"
})));
// Two distinct slots for the pair, call below its output.
assert_eq!(data(&added_call)["output_index"], 0);
assert_eq!(data(&added_output)["output_index"], 1);
assert_eq!(data(&done_output)["output_index"], 1);
assert_eq!(data(&done_call)["output_index"], 0);
// The message takes the next slot after the pair, and its delta — which
// carries only `item_id`, no `type` — still resolves to that same slot.
assert_eq!(data(&added_msg)["output_index"], 2);
assert_eq!(data(&msg_delta)["output_index"], 2);
}

#[test]
fn reconstructs_terminal_output_from_done_items() {
let mut r = StreamRewriter::new(None, Vec::new());
Expand Down Expand Up @@ -1321,6 +1411,77 @@ mod rewriter_tests {
assert_eq!(items[2]["type"], "message");
}

#[test]
fn shell_call_and_output_both_persist_in_call_then_output_order() {
let mut r = StreamRewriter::new(None, Vec::new());
// A signed reasoning block, then a shell call whose OUTPUT `done` is
// emitted before the CALL `done` (the shell tool's wire-ordering
// invariant), then the final message. The shell_call and its
// shell_call_output share one id (the model's call_id doubles as the
// item id) — the regression dropped one of the pair via that shared id.
r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":0,
"item":{"type":"reasoning","id":"rs_1","summary":[],
"content":[{"type":"reasoning_text","text":"think"}],
"signature":"sig","format":"anthropic-claude-v1","status":"completed"}
})));
// output BEFORE call, both id == call_id == "toolu_1".
r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":1,
"item":{"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1",
"status":"completed",
"output":[{"stdout":"hi\n","stderr":"",
"outcome":{"type":"exit","exit_code":0}}],
"output_files":[]}
})));
r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":1,
"item":{"type":"shell_call","id":"toolu_1","call_id":"toolu_1",
"status":"completed","action":{"commands":["echo hi"]}}
})));
r.rewrite(ev(serde_json::json!({
"type":"response.output_item.done","output_index":2,
"item":{"type":"message","id":"msg_1","role":"assistant"}
})));
let terminal = r.rewrite(ev(serde_json::json!({
"type":"response.completed",
"response":{"id":"resp_1","output":[{"type":"message","id":"msg_1"}]}
})));
let out = data(&terminal);
let items = out["response"]["output"].as_array().unwrap();
// Both the call and its output survive the shared id ...
assert_eq!(items.len(), 4, "got {items:#?}");
assert_eq!(items[0]["type"], "reasoning");
// ... and the call is persisted BEFORE its output for valid replay.
assert_eq!(items[1]["type"], "shell_call");
assert_eq!(items[2]["type"], "shell_call_output");
assert_eq!(items[1]["call_id"], "toolu_1");
assert_eq!(items[2]["call_id"], "toolu_1");
assert_eq!(items[3]["type"], "message");
}

#[test]
fn persisted_shell_call_output_deserializes_as_output_item() {
// `previous_response_id` replay (services/responses_chain.rs) parses
// the stored `output` array as `Vec<ResponsesOutputItem>`; a shape that
// fails would turn the (previously silently dropped) tool result into a
// hard `CorruptRecord` error. Lock the round-trip for the exact JSON
// `format_shell_call_output_item` emits.
use crate::api_types::responses::ResponsesOutputItem;
let item = serde_json::json!({
"type":"shell_call_output","id":"toolu_1","call_id":"toolu_1",
"status":"completed",
"output":[{"stdout":"hi\n","stderr":"",
"outcome":{"type":"exit","exit_code":0},"created_by":"gateway"}],
"output_files":[],
"max_output_length":1000,
"created_by":"gateway"
});
let parsed: ResponsesOutputItem =
serde_json::from_value(item).expect("shell_call_output must parse as an output item");
assert!(matches!(parsed, ResponsesOutputItem::ShellCallOutput(_)));
}

#[test]
fn accumulates_usage_across_suppressed_turns() {
let mut r = StreamRewriter::new(None, Vec::new());
Expand Down
112 changes: 112 additions & 0 deletions src/services/shell_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,70 @@ fn rewrite_shell_history_to_function_calls(payload: &mut CreateResponsesPayload)
}
}

/// Reorder a rebuilt `response.output` array so every `shell_call_output`
/// immediately follows its paired `shell_call` (matched by `call_id`).
///
/// The shell tool emits the output item's terminal `done` *before* the call's
/// (see `emit_success_done`'s wire-ordering invariant), so any consumer that
/// rebuilds `response.output` from the forwarded `output_item.done` events
/// captures the result *ahead* of its call. Persisting that order breaks
/// `previous_response_id` replay: `responses_chain` →
/// [`rewrite_shell_history_to_function_calls`] maps the array 1:1 to
/// `function_call` / `function_call_output`, and a tool result preceding its
/// call is an invalid upstream transcript — for Anthropic-family models the
/// per-step assistant messages collapse, which relocates the signed `thinking`
/// blocks and trips "`thinking` … blocks cannot be modified". Reordering only
/// the rebuilt array keeps the live wire order intact while persisting a
/// replayable transcript. Items already in call-then-output order (e.g. a
/// passthrough upstream) are left untouched, as are non-shell items.
pub(crate) fn order_shell_outputs_after_calls(
items: Vec<serde_json::Value>,
) -> Vec<serde_json::Value> {
fn type_of(item: &serde_json::Value) -> Option<&str> {
item.get("type").and_then(|v| v.as_str())
}
fn call_id_of(item: &serde_json::Value) -> Option<&str> {
item.get("call_id").and_then(|v| v.as_str())
}

let mut result: Vec<serde_json::Value> = Vec::with_capacity(items.len());
// shell_call_output items whose paired shell_call hasn't been placed yet
// (the normal case: the wire emits the output first).
let mut pending: Vec<serde_json::Value> = Vec::new();
for item in items {
if type_of(&item) == Some("shell_call_output") {
// Slot it straight after its call if already placed; otherwise hold
// it until the call arrives.
let placed_at = call_id_of(&item).and_then(|cid| {
result.iter().rposition(|it| {
type_of(it) == Some("shell_call") && call_id_of(it) == Some(cid)
})
});
match placed_at {
Some(pos) => result.insert(pos + 1, item),
None => pending.push(item),
}
continue;
}
let is_call = type_of(&item) == Some("shell_call");
let cid = call_id_of(&item).map(str::to_string);
result.push(item);
// A call whose output was emitted first: attach it now, right after.
if is_call
&& let Some(cid) = cid
&& let Some(p) = pending
.iter()
.position(|o| call_id_of(o) == Some(cid.as_str()))
{
let out = pending.remove(p);
result.push(out);
}
}
// Any outputs whose call never appeared — keep, don't drop.
result.append(&mut pending);
result
}

/// Reconstruct the model-emitted `function_call` from a stored `shell_call`.
/// The arguments mirror the `shell` function-tool schema
/// (`{ "action": { … } }`). `id` is left unset: the stored `shell_call`
Expand Down Expand Up @@ -3130,6 +3194,54 @@ mod tests {
)));
}

#[test]
fn order_shell_outputs_handles_output_before_call_and_interleaving() {
let reasoning = serde_json::json!({"type":"reasoning","id":"rs_1"});
let call = |c: &str| serde_json::json!({"type":"shell_call","id":c,"call_id":c});
let out = |c: &str| serde_json::json!({"type":"shell_call_output","id":c,"call_id":c});
let msg = serde_json::json!({"type":"message","id":"msg_1"});

// Wire order from the shell tool: each output's `done` precedes its
// call's `done` (output-before-call).
let input = vec![
reasoning.clone(),
out("a"),
call("a"),
out("b"),
call("b"),
msg.clone(),
];
let ordered = super::order_shell_outputs_after_calls(input);
let types: Vec<&str> = ordered
.iter()
.map(|i| i["type"].as_str().unwrap())
.collect();
assert_eq!(
types,
vec![
"reasoning",
"shell_call",
"shell_call_output",
"shell_call",
"shell_call_output",
"message",
]
);
// Each output still sits next to the call it pairs with.
assert_eq!(ordered[1]["call_id"], "a");
assert_eq!(ordered[2]["call_id"], "a");
assert_eq!(ordered[3]["call_id"], "b");
assert_eq!(ordered[4]["call_id"], "b");

// Already call-then-output (e.g. a passthrough upstream) is preserved.
let already_ordered = vec![call("a"), out("a"), msg.clone()];
let types: Vec<String> = super::order_shell_outputs_after_calls(already_ordered)
.iter()
.map(|i| i["type"].as_str().unwrap().to_string())
.collect();
assert_eq!(types, vec!["shell_call", "shell_call_output", "message"]);
}

#[test]
fn shell_hint_describes_client_passthrough() {
let hint = ShellToolHint {
Expand Down
Loading