Skip to content

feat(sse): add 30s heartbeat to task stream endpoint#646

Merged
majiayu000 merged 3 commits intomainfrom
feat/633-sse-heartbeat
Apr 10, 2026
Merged

feat(sse): add 30s heartbeat to task stream endpoint#646
majiayu000 merged 3 commits intomainfrom
feat/633-sse-heartbeat

Conversation

@majiayu000
Copy link
Copy Markdown
Owner

Summary

  • Adds a periodic SSE comment (": heartbeat") every 30 s on GET /tasks/{id}/stream
  • Uses tokio::time::interval_at with 30 s initial delay so the first heartbeat is deferred
  • Uses tokio::select! { biased; } to prioritize real task events over the timer tick

Why

Reverse proxies with idle timeouts (nginx defaults to 60 s) can silently drop an SSE connection when the agent produces no output (e.g. waiting for compilation). Clients have no way to distinguish a dropped connection from a live-but-quiet one. SSE comment lines are ignored by browsers/clients but keep the TCP stream active through proxies.

Test plan

  • cargo clippy --workspace --all-targets -- -D warnings passes
  • cargo test --workspace passes
  • Manual: connect to /tasks/{id}/stream via curl -N, observe ": heartbeat" lines every 30 s when agent is idle

Fixes #633

Emit SSE comment lines (`: heartbeat`) every 30 seconds on the
GET /tasks/{id}/stream endpoint so that reverse proxies with idle
timeouts (e.g. nginx default 60 s) don't drop a live SSE connection
while the agent produces no output.

Uses tokio::time::interval_at with a 30 s initial delay so the first
heartbeat is deferred, then tokio::select! with biased polling to
prefer real events over the timer tick.

Fixes #633

Signed-off-by: majiayu000 <1835304752@qq.com>
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a manual heartbeat mechanism for Server-Sent Events (SSE) using tokio::time::interval and tokio::select! to prevent connection timeouts from reverse proxies. The review feedback suggests replacing this manual implementation with Axum's built-in .keep_alive() functionality, which would simplify the code, handle empty streams more effectively, and avoid potential heartbeat flooding issues.

Comment thread crates/harness-server/src/http.rs Outdated
Comment on lines 1532 to 1578
// Send a heartbeat comment every 30 s so reverse proxies (nginx default
// 60 s idle timeout) don't drop the connection while the agent is silent.
let heartbeat_interval = {
let start = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
tokio::time::interval_at(start, std::time::Duration::from_secs(30))
};

let stream = futures::stream::unfold(
(rx, heartbeat_interval),
|(mut rx, mut interval)| async move {
tokio::select! {
biased;
result = rx.recv() => {
match result {
Ok(item) => {
let data = match serde_json::to_string(&item) {
Ok(s) => s,
Err(e) => {
tracing::warn!("sse: failed to serialize event: {e}");
String::new()
}
};
Some((
Ok::<Event, std::convert::Infallible>(Event::default().data(data)),
(rx, interval),
))
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let event = Event::default()
.event("lag")
.data(format!("dropped {n} events due to slow consumer"));
Some((Ok(event), (rx, interval)))
}
}
};
Some((
Ok::<Event, std::convert::Infallible>(Event::default().data(data)),
rx,
))
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let event = Event::default()
.event("lag")
.data(format!("dropped {n} events due to slow consumer"));
Some((Ok(event), rx))
}
_ = interval.tick() => {
Some((
Ok::<Event, std::convert::Infallible>(Event::default().comment("heartbeat")),
(rx, interval),
))
}
}
}
});
},
);

Sse::new(stream).into_response()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual implementation of heartbeats using tokio::time::interval and tokio::select! inside futures::stream::unfold adds significant complexity and duplicates the event serialization logic.

Axum provides a built-in keep_alive mechanism for SSE that is much cleaner and more robust. Using it also ensures that heartbeats are sent even when the stream is empty (e.g., the case handled at line 1527), which the current implementation misses. Furthermore, it avoids the "heartbeat flooding" issue that can occur with the default Burst behavior of tokio::time::Interval after the stream has been busy for a long period.

I recommend reverting the unfold logic to its original simple form and using .keep_alive() on the Sse response.

    let stream = futures::stream::unfold(rx, |mut rx| async move {
        match rx.recv().await {
            Ok(item) => {
                let data = match serde_json::to_string(&item) {
                    Ok(s) => s,
                    Err(e) => {
                        tracing::warn!("sse: failed to serialize event: {e}");
                        String::new()
                    }
                };
                Some((
                    Ok::<Event, std::convert::Infallible>(Event::default().data(data)),
                    rx,
                ))
            }
            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                let event = Event::default()
                    .event("lag")
                    .data(format!("dropped {n} events due to slow consumer"));
                Some((Ok(event), rx))
            }
        }
    });

    Sse::new(stream)
        .keep_alive(
            axum::response::sse::KeepAlive::new()
                .interval(std::time::Duration::from_secs(30))
                .text("heartbeat"),
        )
        .into_response()

Prevents catch-up burst when biased select! prioritizes rx.recv():
accumulated ticks under sustained event traffic would flush as rapid
back-to-back heartbeat frames when traffic paused. Skip discards missed
ticks instead of firing them all at once.

Signed-off-by: majiayu000 <1835304752@qq.com>
@github-actions
Copy link
Copy Markdown

/gemini review

…erval

Replace manual tokio::time::interval + tokio::select! heartbeat logic
with axum's Sse::keep_alive(). This simplifies the stream unfold back
to its original form and avoids potential heartbeat flooding from the
Burst tick behavior. The KeepAlive sends a comment every 30s to prevent
reverse-proxy idle timeouts.

Signed-off-by: majiayu000 <1835304752@qq.com>
@github-actions
Copy link
Copy Markdown

/gemini review

@majiayu000 majiayu000 merged commit 2d7a330 into main Apr 10, 2026
9 checks passed
@majiayu000 majiayu000 deleted the feat/633-sse-heartbeat branch April 10, 2026 13:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SSE 端点无 keepalive,反向代理环境下可能断开

1 participant