feat(sse): add 30s heartbeat to task stream endpoint#646
Conversation
Emit SSE comment lines (`: heartbeat`) every 30 seconds on the
GET /tasks/{id}/stream endpoint so that reverse proxies with idle
timeouts (e.g. nginx default 60 s) don't drop a live SSE connection
while the agent produces no output.
Uses tokio::time::interval_at with a 30 s initial delay so the first
heartbeat is deferred, then tokio::select! with biased polling to
prefer real events over the timer tick.
Fixes #633
Signed-off-by: majiayu000 <1835304752@qq.com>
There was a problem hiding this comment.
Code Review
This pull request implements a manual heartbeat mechanism for Server-Sent Events (SSE) using tokio::time::interval and tokio::select! to prevent connection timeouts from reverse proxies. The review feedback suggests replacing this manual implementation with Axum's built-in .keep_alive() functionality, which would simplify the code, handle empty streams more effectively, and avoid potential heartbeat flooding issues.
| // Send a heartbeat comment every 30 s so reverse proxies (nginx default | ||
| // 60 s idle timeout) don't drop the connection while the agent is silent. | ||
| let heartbeat_interval = { | ||
| let start = tokio::time::Instant::now() + std::time::Duration::from_secs(30); | ||
| tokio::time::interval_at(start, std::time::Duration::from_secs(30)) | ||
| }; | ||
|
|
||
| let stream = futures::stream::unfold( | ||
| (rx, heartbeat_interval), | ||
| |(mut rx, mut interval)| async move { | ||
| tokio::select! { | ||
| biased; | ||
| result = rx.recv() => { | ||
| match result { | ||
| Ok(item) => { | ||
| let data = match serde_json::to_string(&item) { | ||
| Ok(s) => s, | ||
| Err(e) => { | ||
| tracing::warn!("sse: failed to serialize event: {e}"); | ||
| String::new() | ||
| } | ||
| }; | ||
| Some(( | ||
| Ok::<Event, std::convert::Infallible>(Event::default().data(data)), | ||
| (rx, interval), | ||
| )) | ||
| } | ||
| Err(tokio::sync::broadcast::error::RecvError::Closed) => None, | ||
| Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { | ||
| let event = Event::default() | ||
| .event("lag") | ||
| .data(format!("dropped {n} events due to slow consumer")); | ||
| Some((Ok(event), (rx, interval))) | ||
| } | ||
| } | ||
| }; | ||
| Some(( | ||
| Ok::<Event, std::convert::Infallible>(Event::default().data(data)), | ||
| rx, | ||
| )) | ||
| } | ||
| Err(tokio::sync::broadcast::error::RecvError::Closed) => None, | ||
| Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { | ||
| let event = Event::default() | ||
| .event("lag") | ||
| .data(format!("dropped {n} events due to slow consumer")); | ||
| Some((Ok(event), rx)) | ||
| } | ||
| _ = interval.tick() => { | ||
| Some(( | ||
| Ok::<Event, std::convert::Infallible>(Event::default().comment("heartbeat")), | ||
| (rx, interval), | ||
| )) | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| }, | ||
| ); | ||
|
|
||
| Sse::new(stream).into_response() |
There was a problem hiding this comment.
The manual implementation of heartbeats using tokio::time::interval and tokio::select! inside futures::stream::unfold adds significant complexity and duplicates the event serialization logic.
Axum provides a built-in keep_alive mechanism for SSE that is much cleaner and more robust. Using it also ensures that heartbeats are sent even when the stream is empty (e.g., the case handled at line 1527), which the current implementation misses. Furthermore, it avoids the "heartbeat flooding" issue that can occur with the default Burst behavior of tokio::time::Interval after the stream has been busy for a long period.
I recommend reverting the unfold logic to its original simple form and using .keep_alive() on the Sse response.
let stream = futures::stream::unfold(rx, |mut rx| async move {
match rx.recv().await {
Ok(item) => {
let data = match serde_json::to_string(&item) {
Ok(s) => s,
Err(e) => {
tracing::warn!("sse: failed to serialize event: {e}");
String::new()
}
};
Some((
Ok::<Event, std::convert::Infallible>(Event::default().data(data)),
rx,
))
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let event = Event::default()
.event("lag")
.data(format!("dropped {n} events due to slow consumer"));
Some((Ok(event), rx))
}
}
});
Sse::new(stream)
.keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(30))
.text("heartbeat"),
)
.into_response()Prevents catch-up burst when biased select! prioritizes rx.recv(): accumulated ticks under sustained event traffic would flush as rapid back-to-back heartbeat frames when traffic paused. Skip discards missed ticks instead of firing them all at once. Signed-off-by: majiayu000 <1835304752@qq.com>
|
/gemini review |
…erval Replace manual tokio::time::interval + tokio::select! heartbeat logic with axum's Sse::keep_alive(). This simplifies the stream unfold back to its original form and avoids potential heartbeat flooding from the Burst tick behavior. The KeepAlive sends a comment every 30s to prevent reverse-proxy idle timeouts. Signed-off-by: majiayu000 <1835304752@qq.com>
|
/gemini review |
Summary
": heartbeat") every 30 s onGET /tasks/{id}/streamtokio::time::interval_atwith 30 s initial delay so the first heartbeat is deferredtokio::select! { biased; }to prioritize real task events over the timer tickWhy
Reverse proxies with idle timeouts (nginx defaults to 60 s) can silently drop an SSE connection when the agent produces no output (e.g. waiting for compilation). Clients have no way to distinguish a dropped connection from a live-but-quiet one. SSE comment lines are ignored by browsers/clients but keep the TCP stream active through proxies.
Test plan
cargo clippy --workspace --all-targets -- -D warningspassescargo test --workspacepasses/tasks/{id}/streamviacurl -N, observe": heartbeat"lines every 30 s when agent is idleFixes #633