Skip to content

Commit 355d845

Browse files
authored
fix(inference): prevent silent truncation of large streaming responses (NVIDIA#834)
* fix(inference): prevent silent truncation of large streaming responses The L7 inference proxy silently dropped tool_calls from large streaming responses due to an aggressive 30s per-chunk idle timeout and a reqwest total-request timeout that capped the entire body stream. Reasoning models that pause during "thinking" phases triggered these timeouts, producing valid-looking but truncated HTTP responses with no client-visible error. - Extract prepare_backend_request() helper and create a streaming variant that omits the total request timeout; body stream liveness is now enforced solely by the per-chunk idle timeout - Add 30s connect_timeout to the reqwest Client builder - Increase CHUNK_IDLE_TIMEOUT from 30s to 120s for reasoning models - Inject SSE error events (proxy_stream_error) before the HTTP chunked terminator on all truncation paths so clients can detect data loss - Wrap the streaming relay in BufWriter to reduce per-chunk TLS flush overhead - Bump OCSF severity for streaming truncation from Low to Medium Closes NVIDIA#829 * fix(inference): drop BufWriter to preserve SSE incremental delivery The BufWriter introduced in the previous commit buffered SSE frames until the 16KB capacity filled or the stream ended, defeating incremental token delivery and potentially reintroducing client-visible timeouts on healthy streams. Revert to per-chunk write_all+flush but keep the single format_chunk() call that coalesces framing into one write. Also fix the streaming integration test: add a 3s mock delay that exceeds the 1s route timeout so the test actually validates that the streaming path omits the total request timeout. Previously the mock responded immediately, passing regardless of timeout behavior.
1 parent 60035c6 commit 355d845

6 files changed

Lines changed: 308 additions & 32 deletions

File tree

architecture/inference-routing.md

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ sequenceDiagram
2828
Backend->>Router: Response headers + body stream
2929
Router->>Proxy: StreamingProxyResponse (headers first)
3030
Proxy->>Agent: HTTP/1.1 headers (chunked TE)
31-
loop Each body chunk
31+
loop Each body chunk (120s idle timeout per chunk)
3232
Router->>Proxy: chunk via next_chunk()
3333
Proxy->>Agent: Chunked-encoded frame
3434
end
35+
alt Stream truncated (idle timeout, byte limit, upstream error)
36+
Proxy->>Agent: SSE error event (proxy_stream_error)
37+
end
3538
Proxy->>Agent: Chunk terminator (0\r\n\r\n)
3639
```
3740

@@ -102,7 +105,7 @@ Key messages:
102105
Files:
103106

104107
- `crates/openshell-sandbox/src/proxy.rs` -- proxy interception, inference context, request routing
105-
- `crates/openshell-sandbox/src/l7/inference.rs` -- pattern detection, HTTP parsing, response formatting
108+
- `crates/openshell-sandbox/src/l7/inference.rs` -- pattern detection, HTTP parsing, response formatting, SSE error generation (`format_sse_error()`)
106109
- `crates/openshell-sandbox/src/lib.rs` -- inference context initialization, route refresh
107110
- `crates/openshell-sandbox/src/grpc_client.rs` -- `fetch_inference_bundle()`
108111

@@ -156,7 +159,7 @@ If no pattern matches, the proxy returns `403 Forbidden` with `{"error": "connec
156159
Files:
157160

158161
- `crates/openshell-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`, `proxy_with_candidates_streaming()`
159-
- `crates/openshell-router/src/backend.rs` -- `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction
162+
- `crates/openshell-router/src/backend.rs` -- `prepare_backend_request()`, `send_backend_request()`, `send_backend_request_streaming()`, `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction
160163
- `crates/openshell-router/src/config.rs` -- `RouteConfig`, `ResolvedRoute`, YAML loading
161164

162165
### Route selection
@@ -165,7 +168,7 @@ Files:
165168

166169
### Request rewriting
167170

168-
`proxy_to_backend()` rewrites outgoing requests:
171+
`prepare_backend_request()` (shared by both buffered and streaming paths) rewrites outgoing requests:
169172

170173
1. **Auth injection**: Uses the route's `AuthHeader` -- either `Authorization: Bearer <key>` or a custom header (e.g. `x-api-key: <key>` for Anthropic).
171174
2. **Header stripping**: Removes `authorization`, `x-api-key`, `host`, and any header names that will be set from route defaults.
@@ -198,20 +201,47 @@ The sandbox proxy (`route_inference_request()` in `proxy.rs`) uses the streaming
198201

199202
1. Calls `proxy_with_candidates_streaming()` to get headers immediately.
200203
2. Formats and sends the HTTP/1.1 response header with `Transfer-Encoding: chunked` via `format_http_response_header()`.
201-
3. Loops on `body.next_chunk()`, wrapping each fragment in HTTP chunked encoding via `format_chunk()`.
202-
4. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()`.
204+
3. Wraps the TLS client stream in a `BufWriter` (16 KiB capacity) to coalesce small SSE chunks into fewer TLS records, reducing per-chunk flush overhead.
205+
4. Loops on `body.next_chunk()` with a per-chunk idle timeout (`CHUNK_IDLE_TIMEOUT`, 120 seconds), wrapping each fragment in HTTP chunked encoding via `format_chunk()`. The 120-second timeout accommodates reasoning models (e.g. nemotron-3-super, o1, o3) that pause 60+ seconds between thinking and output phases.
206+
5. Enforces a total streaming body cap (`MAX_STREAMING_BODY`, 32 MiB).
207+
6. On truncation (idle timeout, byte limit, or upstream read error), injects an SSE error event before the chunk terminator so clients can detect the truncation rather than silently losing data.
208+
7. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()` and flushes the `BufWriter`.
203209

204210
This eliminates full-body buffering for streaming responses (SSE). Time-to-first-byte is determined by the backend's first chunk latency rather than the full generation time.
205211

212+
#### Truncation signaling
213+
214+
When the proxy truncates a streaming response, it injects an SSE error event via `format_sse_error()` (in `crates/openshell-sandbox/src/l7/inference.rs`) before sending the HTTP chunked terminator:
215+
216+
```
217+
data: {"error":{"message":"<reason>","type":"proxy_stream_error"}}
218+
```
219+
220+
Three truncation paths exist:
221+
222+
| Cause | SSE error message | OCSF severity |
223+
|-------|-------------------|---------------|
224+
| Per-chunk idle timeout (120s) | `response truncated: chunk idle timeout exceeded` | Medium |
225+
| Upstream read error | `response truncated: upstream read error` | Medium |
226+
| Streaming body exceeds 32 MiB | `response truncated: exceeded maximum streaming body size` | *(warn log only)* |
227+
228+
The `reason` field in the SSE event is sanitized — it never contains internal URLs, hostnames, or credentials. Full details are captured server-side in the OCSF log.
229+
206230
### Mock routes
207231

208232
File: `crates/openshell-router/src/mock.rs`
209233

210234
Routes with `mock://` scheme endpoints return canned responses without making HTTP requests. Mock responses are protocol-aware (OpenAI chat completion, OpenAI completion, Anthropic messages, or generic JSON). Mock routes include an `x-openshell-mock: true` response header.
211235

212-
### Per-request timeout
236+
### Timeout model
237+
238+
The router uses a layered timeout strategy with separate handling for buffered and streaming responses.
239+
240+
**Client connect timeout**: The `reqwest::Client` is built with a 30-second `connect_timeout` (in `crates/openshell-router/src/lib.rs``Router::new()`). This bounds TCP connection establishment and applies to all outgoing requests regardless of response mode.
241+
242+
**Buffered responses** (`proxy_to_backend()` via `send_backend_request()`): Apply the route's `timeout` as a total request timeout covering the entire lifecycle (connect + headers + body). When `timeout_secs` is `0` in the proto message, the default of 60 seconds is used (defined as `DEFAULT_ROUTE_TIMEOUT` in `config.rs`). Timeouts and connection failures map to `RouterError::UpstreamUnavailable`.
213243

214-
Each `ResolvedRoute` carries a `timeout` field (`Duration`). The `reqwest::Client` has no global timeout; instead, each outgoing request applies `.timeout(route.timeout)` on the request builder. When `timeout_secs` is `0` in the proto message, the default of 60 seconds is used (defined as `DEFAULT_ROUTE_TIMEOUT` in `config.rs`). Timeouts and connection failures map to `RouterError::UpstreamUnavailable`.
244+
**Streaming responses** (`proxy_to_backend_streaming()` via `send_backend_request_streaming()`): Do **not** apply a total request timeout. The total duration of a streaming response is unbounded — liveness is enforced by the sandbox proxy's per-chunk idle timeout (`CHUNK_IDLE_TIMEOUT`, 120 seconds in `proxy.rs`) instead. This separation exists because streaming inference responses (especially from reasoning models) can legitimately take minutes to complete while still sending data. The `prepare_backend_request()` helper in `backend.rs` builds the request identically for both paths; the caller decides whether to chain `.timeout()` before sending.
215245

216246
Timeout changes propagate dynamically to running sandboxes. The bundle revision hash includes `timeout_secs`, so when the timeout is updated via `openshell inference update --timeout`, the refresh loop detects the revision change and updates the route cache within one polling interval (5 seconds by default).
217247

crates/openshell-router/src/backend.rs

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,19 @@ impl StreamingProxyResponse {
8383
}
8484
}
8585

86-
/// Build and send an HTTP request to the backend configured in `route`.
86+
/// Build an HTTP request to the backend configured in `route`.
8787
///
88-
/// Returns the [`reqwest::Response`] with status, headers, and an un-consumed
89-
/// body stream. Shared by both the buffered and streaming public APIs.
90-
async fn send_backend_request(
88+
/// Returns the prepared [`reqwest::RequestBuilder`] with auth, headers, model
89+
/// rewrite, and body applied. The caller decides whether to apply a total
90+
/// request timeout before sending.
91+
fn prepare_backend_request(
9192
client: &reqwest::Client,
9293
route: &ResolvedRoute,
9394
method: &str,
9495
path: &str,
95-
headers: Vec<(String, String)>,
96+
headers: &[(String, String)],
9697
body: bytes::Bytes,
97-
) -> Result<reqwest::Response, RouterError> {
98+
) -> Result<(reqwest::RequestBuilder, String), RouterError> {
9899
let url = build_backend_url(&route.endpoint, path);
99100

100101
let reqwest_method: reqwest::Method = method
@@ -118,7 +119,7 @@ async fn send_backend_request(
118119
let strip_headers: [&str; 3] = ["authorization", "x-api-key", "host"];
119120

120121
// Forward non-sensitive headers.
121-
for (name, value) in &headers {
122+
for (name, value) in headers {
122123
let name_lc = name.to_ascii_lowercase();
123124
if strip_headers.contains(&name_lc.as_str()) {
124125
continue;
@@ -149,17 +150,57 @@ async fn send_backend_request(
149150
}
150151
Err(_) => body,
151152
};
152-
builder = builder.body(body).timeout(route.timeout);
153-
154-
builder.send().await.map_err(|e| {
155-
if e.is_timeout() {
156-
RouterError::UpstreamUnavailable(format!("request to {url} timed out"))
157-
} else if e.is_connect() {
158-
RouterError::UpstreamUnavailable(format!("failed to connect to {url}: {e}"))
159-
} else {
160-
RouterError::Internal(format!("HTTP request failed: {e}"))
161-
}
162-
})
153+
builder = builder.body(body);
154+
155+
Ok((builder, url))
156+
}
157+
158+
/// Send an error-mapped request, shared by both buffered and streaming paths.
159+
fn map_send_error(e: reqwest::Error, url: &str) -> RouterError {
160+
if e.is_timeout() {
161+
RouterError::UpstreamUnavailable(format!("request to {url} timed out"))
162+
} else if e.is_connect() {
163+
RouterError::UpstreamUnavailable(format!("failed to connect to {url}: {e}"))
164+
} else {
165+
RouterError::Internal(format!("HTTP request failed: {e}"))
166+
}
167+
}
168+
169+
/// Build and send an HTTP request to the backend with a total request timeout.
170+
///
171+
/// The timeout covers the entire request lifecycle (connect + headers + body).
172+
/// Suitable for non-streaming responses where the body is buffered completely.
173+
async fn send_backend_request(
174+
client: &reqwest::Client,
175+
route: &ResolvedRoute,
176+
method: &str,
177+
path: &str,
178+
headers: Vec<(String, String)>,
179+
body: bytes::Bytes,
180+
) -> Result<reqwest::Response, RouterError> {
181+
let (builder, url) = prepare_backend_request(client, route, method, path, &headers, body)?;
182+
builder
183+
.timeout(route.timeout)
184+
.send()
185+
.await
186+
.map_err(|e| map_send_error(e, &url))
187+
}
188+
189+
/// Build and send an HTTP request without a total request timeout.
190+
///
191+
/// For streaming responses, the total duration is unbounded — liveness is
192+
/// enforced by the caller's per-chunk idle timeout instead. Connection
193+
/// establishment is still bounded by the client-level `connect_timeout`.
194+
async fn send_backend_request_streaming(
195+
client: &reqwest::Client,
196+
route: &ResolvedRoute,
197+
method: &str,
198+
path: &str,
199+
headers: Vec<(String, String)>,
200+
body: bytes::Bytes,
201+
) -> Result<reqwest::Response, RouterError> {
202+
let (builder, url) = prepare_backend_request(client, route, method, path, &headers, body)?;
203+
builder.send().await.map_err(|e| map_send_error(e, &url))
163204
}
164205

165206
fn validation_probe(route: &ResolvedRoute) -> Result<ValidationProbe, ValidationFailure> {
@@ -408,7 +449,8 @@ pub async fn proxy_to_backend_streaming(
408449
headers: Vec<(String, String)>,
409450
body: bytes::Bytes,
410451
) -> Result<StreamingProxyResponse, RouterError> {
411-
let response = send_backend_request(client, route, method, path, headers, body).await?;
452+
let response =
453+
send_backend_request_streaming(client, route, method, path, headers, body).await?;
412454
let (status, resp_headers) = extract_response_metadata(&response);
413455

414456
Ok(StreamingProxyResponse {

crates/openshell-router/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub use backend::{
1010
ValidationFailureKind, verify_backend_endpoint,
1111
};
1212
use config::{ResolvedRoute, RouterConfig};
13+
use std::time::Duration;
1314
use tracing::info;
1415

1516
#[derive(Debug, thiserror::Error)]
@@ -37,6 +38,7 @@ pub struct Router {
3738
impl Router {
3839
pub fn new() -> Result<Self, RouterError> {
3940
let client = reqwest::Client::builder()
41+
.connect_timeout(Duration::from_secs(30))
4042
.build()
4143
.map_err(|e| RouterError::Internal(format!("failed to build HTTP client: {e}")))?;
4244
Ok(Self {

crates/openshell-router/tests/backend_integration.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,3 +468,134 @@ fn config_resolves_routes_with_protocol() {
468468
let routes = config.resolve_routes().unwrap();
469469
assert_eq!(routes[0].protocols, vec!["openai_chat_completions"]);
470470
}
471+
472+
/// Streaming proxy must not apply a total request timeout to the body stream.
473+
///
474+
/// The backend delays its response longer than the route timeout. With the old
475+
/// code this would fail (reqwest's total `.timeout()` fires), but the streaming
476+
/// path now omits that timeout — only the client-level `connect_timeout` and
477+
/// the sandbox idle timeout govern liveness.
478+
#[tokio::test]
479+
async fn streaming_proxy_completes_despite_exceeding_route_timeout() {
480+
use std::time::Duration;
481+
482+
let mock_server = MockServer::start().await;
483+
484+
let sse_body = concat!(
485+
"data: {\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n",
486+
"data: {\"choices\":[{\"delta\":{\"content\":\" world\"}}]}\n\n",
487+
"data: [DONE]\n\n",
488+
);
489+
490+
// Delay the response 3s — longer than the 1s route timeout.
491+
Mock::given(method("POST"))
492+
.and(path("/v1/chat/completions"))
493+
.and(bearer_token("test-api-key"))
494+
.respond_with(
495+
ResponseTemplate::new(200)
496+
.append_header("content-type", "text/event-stream")
497+
.set_body_string(sse_body)
498+
.set_delay(Duration::from_secs(3)),
499+
)
500+
.mount(&mock_server)
501+
.await;
502+
503+
let router = Router::new().unwrap();
504+
let candidates = vec![ResolvedRoute {
505+
name: "inference.local".to_string(),
506+
endpoint: mock_server.uri(),
507+
model: "test-model".to_string(),
508+
api_key: "test-api-key".to_string(),
509+
protocols: vec!["openai_chat_completions".to_string()],
510+
auth: AuthHeader::Bearer,
511+
default_headers: Vec::new(),
512+
// Route timeout shorter than the backend delay — streaming must
513+
// NOT be constrained by this.
514+
timeout: Duration::from_secs(1),
515+
}];
516+
517+
let body = serde_json::to_vec(&serde_json::json!({
518+
"model": "test-model",
519+
"messages": [{"role": "user", "content": "hi"}],
520+
"stream": true
521+
}))
522+
.unwrap();
523+
524+
// The streaming path should succeed despite the 3s delay exceeding
525+
// the 1s route timeout.
526+
let mut resp = router
527+
.proxy_with_candidates_streaming(
528+
"openai_chat_completions",
529+
"POST",
530+
"/v1/chat/completions",
531+
vec![("content-type".to_string(), "application/json".to_string())],
532+
bytes::Bytes::from(body),
533+
&candidates,
534+
)
535+
.await
536+
.expect("streaming proxy should not be killed by route timeout");
537+
538+
assert_eq!(resp.status, 200);
539+
540+
// Drain all chunks to verify the full body is received.
541+
let mut total_bytes = 0;
542+
while let Ok(Some(chunk)) = resp.next_chunk().await {
543+
total_bytes += chunk.len();
544+
}
545+
assert!(total_bytes > 0, "should have received body chunks");
546+
}
547+
548+
/// Non-streaming (buffered) proxy must still enforce the route timeout.
549+
#[tokio::test]
550+
async fn buffered_proxy_enforces_route_timeout() {
551+
use std::time::Duration;
552+
553+
let mock_server = MockServer::start().await;
554+
555+
Mock::given(method("POST"))
556+
.and(path("/v1/chat/completions"))
557+
.respond_with(
558+
ResponseTemplate::new(200)
559+
.set_body_string("{}")
560+
// Delay longer than the route timeout.
561+
.set_delay(Duration::from_secs(5)),
562+
)
563+
.mount(&mock_server)
564+
.await;
565+
566+
let router = Router::new().unwrap();
567+
let candidates = vec![ResolvedRoute {
568+
name: "inference.local".to_string(),
569+
endpoint: mock_server.uri(),
570+
model: "test-model".to_string(),
571+
api_key: "test-api-key".to_string(),
572+
protocols: vec!["openai_chat_completions".to_string()],
573+
auth: AuthHeader::Bearer,
574+
default_headers: Vec::new(),
575+
timeout: Duration::from_secs(1),
576+
}];
577+
578+
let body = serde_json::to_vec(&serde_json::json!({
579+
"model": "test-model",
580+
"messages": [{"role": "user", "content": "hi"}]
581+
}))
582+
.unwrap();
583+
584+
let result = router
585+
.proxy_with_candidates(
586+
"openai_chat_completions",
587+
"POST",
588+
"/v1/chat/completions",
589+
vec![("content-type".to_string(), "application/json".to_string())],
590+
bytes::Bytes::from(body),
591+
&candidates,
592+
)
593+
.await;
594+
595+
assert!(result.is_err(), "buffered proxy should timeout");
596+
let err = result.unwrap_err().to_string();
597+
assert!(
598+
err.contains("timed out"),
599+
"error should mention timeout, got: {err}"
600+
);
601+
}

0 commit comments

Comments
 (0)