fix: Run response path plugin hooks asynchronously to reduce streaming latency#2717
fix: Run response path plugin hooks asynchronously to reduce streaming latency#2717k8s-ci-robot merged 3 commits intokubernetes-sigs:mainfrom
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Hi @gyliu513. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/cc @kfswain |
| if len(d.requestControlPlugins.responseReceivedPlugins) == 0 { | ||
| return | ||
| } | ||
| go d.runResponseHeaderPlugins(ctx, request, response, targetEndpoint) |
There was a problem hiding this comment.
Even if we don't block, don't we want to ensure that the handling for chunks A-->B-->C for request, R, are still processed in that order?
If we fire these off as routines and chunks arrive quickly, couldn't this result in out of order execution?
There was a problem hiding this comment.
Ah, good catch, thanks @LukeAVanDrie. You're right. The PredictedLatency plugin's ResponseBody is stateful across chunks (it tracks TTFT on the first chunk, accumulates TPOT across subsequent chunks via predictedLatencyCtx). Firing goroutines per chunk could result in out-of-order execution and corrupt that state.
How about this approach to keep the order: instead of firing a goroutine per chunk, each request gets a dedicated channel + goroutine queue. The flow would be:
- Intermediate chunks (endOfStream=false): sent to a per-request buffered channel. A single goroutine reads from the channel and runs plugins sequentially — so chunks A→B→C are always processed in that order.
- Final chunk (endOfStream=true): the channel is closed, we wait for the goroutine to drain all queued chunks, then run plugins synchronously on the main goroutine. This ensures DynamicMetadata written by plugins (e.g., requestattributereporter) is available before the Envoy response is generated.
- Response header plugins: remain fire-and-forget async since they are stateless and only called once per request.
There was a problem hiding this comment.
This sounds good to me!
There was a problem hiding this comment.
Re-reading this, how about we don't fire-and-forget the responseHeader just leave it as is calling in sync. Because intuitively responseHeader should be also called before any other responseBody.
There was a problem hiding this comment.
Done, I was using sync to handle response header now.
|
/ok-to-test |
|
I think this looks good, @kfswain do we want to move forward with this? |
|
/assign @zetxqx Bob, can you pls take a look, this touches request control and handling. |
zetxqx
left a comment
There was a problem hiding this comment.
Thanks for your patience. One major concern I have is whether we should make responseHeader async. Theoretically, this could allow responseHeader to be invoked after responseBody, which change the existing behavior a little bit.
If we keep the responseHeader call synchronous, the primary benefit of this PR would be faster delivery of intermediate response chunks to the client. Because we have to wait for the final chunk to modify the dynamic data.
| if len(d.requestControlPlugins.responseReceivedPlugins) == 0 { | ||
| return | ||
| } |
There was a problem hiding this comment.
nit: can we also move this early return check to func (d *Director) HandleResponseHeader ? consistent with the style with func (d *Director) HandleResponseBody( ? we can probably inline runResponseHeaderPluginsAsync
| if len(d.requestControlPlugins.responseReceivedPlugins) == 0 { | ||
| return | ||
| } | ||
| go d.runResponseHeaderPlugins(ctx, request, response, targetEndpoint) |
There was a problem hiding this comment.
Re-reading this, how about we don't fire-and-forget the responseHeader just leave it as is calling in sync. Because intuitively responseHeader should be also called before any other responseBody.
…ueue for ordered execution
| if len(d.requestControlPlugins.responseReceivedPlugins) == 0 { | ||
| return | ||
| } |
| if len(d.requestControlPlugins.responseReceivedPlugins) == 0 { | ||
| return | ||
| } | ||
| go d.runResponseHeaderPlugins(ctx, request, response, targetEndpoint) |
There was a problem hiding this comment.
Done, I was using sync to handle response header now.
fe6c314 to
07f201e
Compare
| // HandleResponseHeader runs plugins asynchronously, so wait for completion. | ||
| require.Eventually(t, func() bool { | ||
| pr1.mu.Lock() | ||
| defer pr1.mu.Unlock() | ||
| return pr1.lastRespOnResponse != nil | ||
| }, time.Second, 10*time.Millisecond, "response header plugin should have been called") | ||
|
|
||
| pr1.mu.Lock() | ||
| lastResp := pr1.lastRespOnResponse | ||
| lastTargetPod := pr1.lastTargetPodOnResponse | ||
| pr1.mu.Unlock() | ||
|
|
||
| if diff := cmp.Diff("test-req-id-for-response", lastResp.RequestId); diff != "" { |
There was a problem hiding this comment.
nit: since we change the responseHeader back to sync we can revert back to the previous verification way,
Thank you, just one nit comment regarding the unittest |
|
/lgtm cc: @ahg-g |
|
/approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: ahg-g, gyliu513 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
…g latency (kubernetes-sigs/gateway-api-inference-extension#2717) * fix: Run response path plugin hooks asynchronously to reduce streaming latency * fix: async response body plugin processing with per-request channel queue for ordered execution * sync handle request header
Fixed #1935
Response header and response body plugins were previously executed synchronously, blocking the ext_proc stream on every chunk, this is unnecessary because these plugins perform side-effect work (metrics reporting, cache updates, metadata collection) that does not modify the chunk content being forwarded to the client.
This PR runs response header plugins and intermediate streaming chunk body plugins asynchronously in goroutines so they no longer add latency to the response path. The final chunk (endOfStream=true) remains synchronous because plugins like requestattributereporter write DynamicMetadata at that point, which must be available when generating the Envoy response.
What type of PR is this?
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #
Does this PR introduce a user-facing change?:
/kind bug