feat(flowcontrol): Add ImmediateResponse abort mechanism for evicting in-flight requests#2737
Conversation
|
Hi @RishabhSaini. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
nirrozenbaum
left a comment
There was a problem hiding this comment.
I recommend being consistent with names.
eviction is consistent with evictor. aborter is yet another naming option but is not consistent with the existing code.
|
/ok-to-test |
|
/assign @LukeAVanDrie |
|
/test all |
| abortCh chan struct{}, | ||
| ) (*extProcPb.ProcessingRequest, error, error) { | ||
| recvCh := make(chan recvResult, 1) | ||
| go func() { |
There was a problem hiding this comment.
This spawns a new goroutine to call srv.Recv() every time it is invoked.
If the abortCh triggers first, this method returns errEvicted, but the spawned goroutine remains blocked on srv.Recv() until the stream is eventually closed or data arrives. While this leak is technically bounded by the lifetime of the gRPC stream (which closes shortly after we send the ImmediateResponse and return nil), it is still a sub-optimal pattern to spawn goroutines in a loop for every message received.
Consider starting a single reader goroutine for the lifetime of the stream that reads from srv.Recv() and pumps messages into a channel. You can then use Go's property where selecting on a nil channel blocks forever to dynamically enable listening to the abortCh only after it is populated (post-scheduling).
For example:
// At the top of Process()
recvCh := make(chan recvResult, 1)
go func() {
for {
req, err := srv.Recv()
recvCh <- recvResult{req: req, err: err}
if err != nil { return }
}
}()
var abortCh chan struct{} // Initially nil
for {
select {
case result := <-recvCh:
// handle message...
// When scheduled: abortCh = s.abortLookup.Get(abortRequestID)
case <-abortCh: // Blocks forever while nil
// handle eviction
return nil
}
}There was a problem hiding this comment.
agreed, attempted to fix this using claude
There was a problem hiding this comment.
This looks good, thanks! Added one suggestion, then this PR lgtm
200b369 to
936939e
Compare
d2f7fb2 to
e975f58
Compare
| type Plugin struct { | ||
| queue *EvictionQueue | ||
| aborter Aborter | ||
| var _ requestcontrol.PreRequest = &RequestEvictor{} |
There was a problem hiding this comment.
This is a misuse of the pluggable framework. This logic represents the runtime that executes the eviction logic and it should not be modeled as a plugin. We need to put this guidance in a framework's dev guide: Plugins represent entities that a user can enable/disable via the config api, and they only reside under framework/epp/plugins.
In this case, there should be an evictor subsystem instance that gets passed to request control, for example PreRequest becomes evictor.TrackRequests.
We can do this clean up as a followup.
Integrate ext_proc ImmediateResponse as the abort mechanism for
evicting dispatched requests from vLLM. When eviction triggers,
closing the request's abort channel causes the Process() loop to
send ImmediateResponse(503), making Envoy reset the upstream
connection. vLLM detects the disconnect and frees KV blocks.
- AbortRegistry bridges eviction plugin and ext_proc Process() loop
- ImmediateResponseAborter closes abort channels via sync.Once
- recvOrAbort wraps srv.Recv() to select on abort signals
- Tests cover aborter, registry, recvOrAbort, plugin integration,
and concurrent eviction+completion races
Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
for naming consistency Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
e975f58 to
70e0ae0
Compare
70e0ae0 to
6c8c0f1
Compare
…ion, evictor cleanup - Replace per-message recvOrAbort goroutine with single reader goroutine for the stream lifetime, using nil channel select pattern - Remove errEvicted sentinel; eviction is a state transition (RequestEvicted) handled by updateStateAndSendIfNeeded, not an error - Move ImmediateResponse send from inline code into the state machine - Add EvictorWithCleanup interface and cleanupRequest helper to prevent ImmediateResponseEvictor.closeOnce map from growing unbounded Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
6c8c0f1 to
7e6a8ce
Compare
|
/approve /lgtm |
Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
|
/lgtm /approve |
|
/approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: ahg-g, LukeAVanDrie, RishabhSaini The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
… in-flight requests (kubernetes-sigs/gateway-api-inference-extension#2737) * Add ImmediateResponse abort mechanism for evicting in-flight requests Integrate ext_proc ImmediateResponse as the abort mechanism for evicting dispatched requests from vLLM. When eviction triggers, closing the request's abort channel causes the Process() loop to send ImmediateResponse(503), making Envoy reset the upstream connection. vLLM detects the disconnect and frees KV blocks. - AbortRegistry bridges eviction plugin and ext_proc Process() loop - ImmediateResponseAborter closes abort channels via sync.Once - recvOrAbort wraps srv.Recv() to select on abort signals - Tests cover aborter, registry, recvOrAbort, plugin integration, and concurrent eviction+completion races Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Rename Aborter to Evictor, Plugin to RequestEvictor for naming consistency Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Use 429 (TooManyRequests) instead of 503 for eviction ImmediateResponse Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Refactor Process() loop: single reader goroutine, state machine eviction, evictor cleanup - Replace per-message recvOrAbort goroutine with single reader goroutine for the stream lifetime, using nil channel select pattern - Remove errEvicted sentinel; eviction is a state transition (RequestEvicted) handled by updateStateAndSendIfNeeded, not an error - Move ImmediateResponse send from inline code into the state machine - Add EvictorWithCleanup interface and cleanupRequest helper to prevent ImmediateResponseEvictor.closeOnce map from growing unbounded Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * fix: fix data read write race to ctx by capturing it Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> --------- Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
resolves "How to evict?" part of #2632
Integrate ext_proc ImmediateResponse as the abort mechanism for evicting dispatched requests from vLLM. When eviction triggers, closing the request's abort channel causes the Process() loop to send ImmediateResponse(503), making Envoy reset the upstream connection. vLLM detects the disconnect and frees KV blocks.