Skip to content

Commit 70e0ae0

Browse files
committed
Refactor Process() loop: single reader goroutine, state machine eviction, evictor cleanup
- Replace per-message recvOrAbort goroutine with single reader goroutine for the stream lifetime, using nil channel select pattern - Remove errEvicted sentinel; eviction is a state transition (RequestEvicted) handled by updateStateAndSendIfNeeded, not an error - Move ImmediateResponse send from inline code into the state machine - Add EvictorWithCleanup interface and cleanupRequest helper to prevent ImmediateResponseEvictor.closeOnce map from growing unbounded Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
1 parent dc45109 commit 70e0ae0

9 files changed

Lines changed: 321 additions & 265 deletions

File tree

pkg/epp/flowcontrol/eviction/abort_registry.go renamed to pkg/epp/flowcontrol/eviction/eviction_registry.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,45 @@ package eviction
1818

1919
import "sync"
2020

21-
// AbortRegistry is a shared registry that maps request IDs to abort channels.
21+
// EvictionRegistry is a shared registry that maps request IDs to eviction channels.
2222
// It bridges the eviction plugin (which decides what to evict) and the ext_proc Process()
2323
// goroutine (which owns the stream needed to send ImmediateResponse).
2424
//
2525
// Lifecycle:
26-
// - PreRequest: plugin creates an abort channel and registers it via Register().
26+
// - PreRequest: plugin creates an eviction channel and registers it via Register().
2727
// - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it.
28-
// - EvictN: aborter closes the channel via the EvictionItem.AbortCh reference.
28+
// - EvictN: evictor closes the channel via the EvictionItem.EvictCh reference.
2929
// - Process() defer: removes the channel via Deregister().
3030
//
3131
// All methods are goroutine-safe.
32-
type AbortRegistry struct {
32+
type EvictionRegistry struct {
3333
mu sync.RWMutex
34-
channels map[string]chan struct{} // requestID → abort channel
34+
channels map[string]chan struct{} // requestID → eviction channel
3535
}
3636

37-
// NewAbortRegistry creates a new AbortRegistry.
38-
func NewAbortRegistry() *AbortRegistry {
39-
return &AbortRegistry{
37+
// NewEvictionRegistry creates a new EvictionRegistry.
38+
func NewEvictionRegistry() *EvictionRegistry {
39+
return &EvictionRegistry{
4040
channels: make(map[string]chan struct{}),
4141
}
4242
}
4343

44-
// Register stores an abort channel for the given request ID.
45-
func (r *AbortRegistry) Register(requestID string, ch chan struct{}) {
44+
// Register stores an eviction channel for the given request ID.
45+
func (r *EvictionRegistry) Register(requestID string, ch chan struct{}) {
4646
r.mu.Lock()
4747
defer r.mu.Unlock()
4848
r.channels[requestID] = ch
4949
}
5050

51-
// Get returns the abort channel for the given request ID, or nil if not found.
52-
func (r *AbortRegistry) Get(requestID string) chan struct{} {
51+
// Get returns the eviction channel for the given request ID, or nil if not found.
52+
func (r *EvictionRegistry) Get(requestID string) chan struct{} {
5353
r.mu.RLock()
5454
defer r.mu.RUnlock()
5555
return r.channels[requestID]
5656
}
5757

58-
// Deregister removes the abort channel for the given request ID.
59-
func (r *AbortRegistry) Deregister(requestID string) {
58+
// Deregister removes the eviction channel for the given request ID.
59+
func (r *EvictionRegistry) Deregister(requestID string) {
6060
r.mu.Lock()
6161
defer r.mu.Unlock()
6262
delete(r.channels, requestID)

pkg/epp/flowcontrol/eviction/abort_registry_test.go renamed to pkg/epp/flowcontrol/eviction/eviction_registry_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import (
2424
"github.com/stretchr/testify/assert"
2525
)
2626

27-
func TestAbortRegistry_RegisterAndGet(t *testing.T) {
27+
func TestEvictionRegistry_RegisterAndGet(t *testing.T) {
2828
t.Parallel()
29-
r := NewAbortRegistry()
29+
r := NewEvictionRegistry()
3030

3131
ch := make(chan struct{})
3232
r.Register("req-1", ch)
@@ -37,9 +37,9 @@ func TestAbortRegistry_RegisterAndGet(t *testing.T) {
3737
assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil")
3838
}
3939

40-
func TestAbortRegistry_Deregister(t *testing.T) {
40+
func TestEvictionRegistry_Deregister(t *testing.T) {
4141
t.Parallel()
42-
r := NewAbortRegistry()
42+
r := NewEvictionRegistry()
4343

4444
ch := make(chan struct{})
4545
r.Register("req-1", ch)
@@ -51,9 +51,9 @@ func TestAbortRegistry_Deregister(t *testing.T) {
5151
r.Deregister("non-existent")
5252
}
5353

54-
func TestAbortRegistry_Concurrency(t *testing.T) {
54+
func TestEvictionRegistry_Concurrency(t *testing.T) {
5555
t.Parallel()
56-
r := NewAbortRegistry()
56+
r := NewEvictionRegistry()
5757

5858
const goroutines = 10
5959
const opsPerGoroutine = 100
Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem)
4343
return nil
4444
}
4545

46-
// ImmediateResponseEvictor evicts requests by closing the EvictionItem's AbortCh.
46+
// ImmediateResponseEvictor evicts requests by closing the EvictionItem's EvictCh.
4747
// The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse
4848
// to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server.
4949
type ImmediateResponseEvictor struct {
@@ -57,13 +57,13 @@ func NewImmediateResponseEvictor() *ImmediateResponseEvictor {
5757
}
5858

5959
func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
60-
if item.AbortCh == nil {
61-
return fmt.Errorf("eviction item %s has no abort channel", item.RequestID)
60+
if item.EvictCh == nil {
61+
return fmt.Errorf("eviction item %s has no eviction channel", item.RequestID)
6262
}
6363

6464
once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{})
6565
once.(*sync.Once).Do(func() {
66-
close(item.AbortCh)
66+
close(item.EvictCh)
6767
})
6868

6969
log.FromContext(ctx).Info("Eviction signal sent",
@@ -72,3 +72,9 @@ func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.
7272
"targetURL", item.TargetURL)
7373
return nil
7474
}
75+
76+
// Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth.
77+
// Called when a request completes or is untracked.
78+
func (e *ImmediateResponseEvictor) Cleanup(requestID string) {
79+
e.closeOnce.Delete(requestID)
80+
}

pkg/epp/flowcontrol/eviction/aborter_test.go renamed to pkg/epp/flowcontrol/eviction/evictor_test.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,30 @@ func TestImmediateResponseEvictor_ClosesChannel(t *testing.T) {
3030
t.Parallel()
3131
evictor := NewImmediateResponseEvictor()
3232

33-
abortCh := make(chan struct{})
33+
evictCh := make(chan struct{})
3434
item := &flowcontrol.EvictionItem{
3535
RequestID: "req-1",
36-
AbortCh: abortCh,
36+
EvictCh: evictCh,
3737
}
3838

3939
err := evictor.Evict(context.Background(), item)
4040
require.NoError(t, err)
4141

4242
select {
43-
case <-abortCh:
43+
case <-evictCh:
4444
default:
45-
t.Fatal("abort channel should be closed after Evict()")
45+
t.Fatal("eviction channel should be closed after Evict()")
4646
}
4747
}
4848

4949
func TestImmediateResponseEvictor_DoubleEvictSafe(t *testing.T) {
5050
t.Parallel()
5151
evictor := NewImmediateResponseEvictor()
5252

53-
abortCh := make(chan struct{})
53+
evictCh := make(chan struct{})
5454
item := &flowcontrol.EvictionItem{
5555
RequestID: "req-1",
56-
AbortCh: abortCh,
56+
EvictCh: evictCh,
5757
}
5858

5959
err := evictor.Evict(context.Background(), item)
@@ -70,13 +70,48 @@ func TestImmediateResponseEvictor_NilChannel(t *testing.T) {
7070

7171
item := &flowcontrol.EvictionItem{
7272
RequestID: "req-1",
73-
AbortCh: nil,
73+
EvictCh: nil,
7474
}
7575

7676
err := evictor.Evict(context.Background(), item)
7777
assert.Error(t, err, "Evict with nil channel should return error")
7878
}
7979

80+
func TestImmediateResponseEvictor_Cleanup(t *testing.T) {
81+
t.Parallel()
82+
evictor := NewImmediateResponseEvictor()
83+
84+
evictCh := make(chan struct{})
85+
item := &flowcontrol.EvictionItem{
86+
RequestID: "req-1",
87+
EvictCh: evictCh,
88+
}
89+
90+
_ = evictor.Evict(context.Background(), item)
91+
92+
// Cleanup should remove the sync.Once entry.
93+
evictor.Cleanup("req-1")
94+
95+
// After cleanup, a new Evict on the same requestID with a new channel should work
96+
// (the old sync.Once is gone, so a new one will be created).
97+
evictCh2 := make(chan struct{})
98+
item2 := &flowcontrol.EvictionItem{
99+
RequestID: "req-1",
100+
EvictCh: evictCh2,
101+
}
102+
err := evictor.Evict(context.Background(), item2)
103+
require.NoError(t, err)
104+
105+
select {
106+
case <-evictCh2:
107+
default:
108+
t.Fatal("new channel should be closed after Evict post-Cleanup")
109+
}
110+
111+
// Cleanup non-existent should not panic.
112+
evictor.Cleanup("non-existent")
113+
}
114+
80115
func TestNoOpEvictor(t *testing.T) {
81116
t.Parallel()
82117
evictor := &NoOpEvictor{}

pkg/epp/flowcontrol/eviction/plugin.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ var _ requestcontrol.ResponseBody = &RequestEvictor{}
3737

3838
// RequestEvictor tracks in-flight requests via RequestControl hooks and provides eviction capability.
3939
type RequestEvictor struct {
40-
queue *EvictionQueue
41-
evictor Evictor
42-
abortRegistry *AbortRegistry
40+
queue *EvictionQueue
41+
evictor Evictor
42+
evictionRegistry *EvictionRegistry
4343
}
4444

4545
// NewRequestEvictor creates a RequestEvictor with the given policies and evictor.
@@ -49,16 +49,16 @@ func NewRequestEvictor(
4949
evictor Evictor,
5050
) *RequestEvictor {
5151
return &RequestEvictor{
52-
queue: NewEvictionQueue(ordering, filter),
53-
evictor: evictor,
54-
abortRegistry: NewAbortRegistry(),
52+
queue: NewEvictionQueue(ordering, filter),
53+
evictor: evictor,
54+
evictionRegistry: NewEvictionRegistry(),
5555
}
5656
}
5757

58-
// AbortRegistry returns the shared abort registry.
59-
// The ext_proc Process() goroutine uses this to look up abort channels for dispatched requests.
60-
func (p *RequestEvictor) AbortRegistry() *AbortRegistry {
61-
return p.abortRegistry
58+
// EvictionRegistry returns the shared eviction registry.
59+
// The ext_proc Process() goroutine uses this to look up eviction channels for dispatched requests.
60+
func (p *RequestEvictor) EvictionRegistry() *EvictionRegistry {
61+
return p.evictionRegistry
6262
}
6363

6464
func (p *RequestEvictor) TypedName() plugin.TypedName {
@@ -88,7 +88,7 @@ func (p *RequestEvictor) PreRequest(
8888
return
8989
}
9090

91-
abortCh := make(chan struct{})
91+
evictCh := make(chan struct{})
9292

9393
item := &flowcontrol.EvictionItem{
9494
RequestID: requestID,
@@ -97,19 +97,18 @@ func (p *RequestEvictor) PreRequest(
9797
TargetURL: "http://" + net.JoinHostPort(metadata.GetIPAddress(), metadata.GetPort()),
9898
Request: request,
9999
TargetEndpoint: metadata,
100-
AbortCh: abortCh,
100+
EvictCh: evictCh,
101101
}
102102

103103
p.queue.Track(item)
104-
p.abortRegistry.Register(requestID, abortCh)
104+
p.evictionRegistry.Register(requestID, evictCh)
105105

106106
// Bind untrack to the request context's lifetime as a safety net.
107107
// If the client disconnects and ResponseBody(EndOfStream) never fires,
108108
// ctx.Done() ensures the request is still cleaned up. Untrack is idempotent.
109109
go func() {
110110
<-ctx.Done()
111-
p.queue.Untrack(requestID)
112-
p.abortRegistry.Deregister(requestID)
111+
p.cleanupRequest(requestID)
113112
}()
114113

115114
log.FromContext(ctx).V(logutil.DEBUG).Info("Tracked in-flight request",
@@ -138,8 +137,7 @@ func (p *RequestEvictor) ResponseBody(
138137
return
139138
}
140139

141-
p.queue.Untrack(requestID)
142-
p.abortRegistry.Deregister(requestID)
140+
p.cleanupRequest(requestID)
143141

144142
log.FromContext(ctx).V(logutil.DEBUG).Info("Untracked completed request",
145143
"requestID", requestID,
@@ -148,12 +146,12 @@ func (p *RequestEvictor) ResponseBody(
148146
}
149147

150148
// EvictN attempts to evict up to n requests from the eviction queue.
151-
// Each request is only removed from tracking after a successful abort. If the abort fails,
149+
// Each request is only removed from tracking after a successful eviction. If the eviction fails,
152150
// the request remains in the queue for a future eviction attempt.
153-
// Returns the request IDs that were successfully aborted.
151+
// Returns the request IDs that were successfully evicted.
154152
func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) {
155153
logger := log.FromContext(ctx)
156-
aborted := make([]string, 0, n)
154+
evicted := make([]string, 0, n)
157155

158156
for range n {
159157
items := p.queue.PopN(1)
@@ -167,16 +165,33 @@ func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) {
167165
p.queue.Track(item)
168166
continue
169167
}
170-
aborted = append(aborted, item.RequestID)
168+
evicted = append(evicted, item.RequestID)
171169
}
172170

173-
if len(aborted) > 0 {
174-
logger.Info("Eviction complete", "requested", n, "aborted", len(aborted))
171+
if len(evicted) > 0 {
172+
logger.Info("Eviction complete", "requested", n, "evicted", len(evicted))
175173
}
176-
return aborted, nil
174+
return evicted, nil
177175
}
178176

179177
// Stats returns the current in-flight and evictable request counts.
180178
func (p *RequestEvictor) Stats() (inFlight int, evictable int) {
181179
return p.queue.InFlightLen(), p.queue.EvictableLen()
182180
}
181+
182+
// cleanupRequest removes a request from all tracking structures.
183+
// If the evictor supports cleanup (e.g., ImmediateResponseEvictor), it also
184+
// cleans up evictor-internal state to prevent unbounded map growth.
185+
func (p *RequestEvictor) cleanupRequest(requestID string) {
186+
p.queue.Untrack(requestID)
187+
p.evictionRegistry.Deregister(requestID)
188+
if c, ok := p.evictor.(EvictorWithCleanup); ok {
189+
c.Cleanup(requestID)
190+
}
191+
}
192+
193+
// EvictorWithCleanup is an optional interface for evictors that maintain per-request state
194+
// that needs to be cleaned up when a request completes or is untracked.
195+
type EvictorWithCleanup interface {
196+
Cleanup(requestID string)
197+
}

0 commit comments

Comments
 (0)