Skip to content

Commit 2b20cfb

Browse files
committed
state: add new state package as a foundation for v2
1 parent e508a12 commit 2b20cfb

12 files changed

Lines changed: 4937 additions & 11 deletions

File tree

cachekeys/keys.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@ func GVRMetaNamespaceKeyFunc(gvr schema.GroupVersionResource, obj interface{}) (
3434
func SplitGVRMetaNamespaceKey(key string) (gvr *schema.GroupVersionResource, namespace, name string, err error) {
3535
before, after, ok := strings.Cut(key, "::")
3636
if !ok {
37-
err = fmt.Errorf("error parsing key: %s", key)
38-
return
37+
return nil, "", "", fmt.Errorf("error parsing key: %s", key)
3938
}
4039
gvr, _ = schema.ParseResourceArg(before)
4140
if gvr == nil {
42-
err = fmt.Errorf("error parsing gvr from key: %s", before)
43-
return
41+
return nil, "", "", fmt.Errorf("error parsing gvr from key: %s", before)
4442
}
4543
namespace, name, err = cache.SplitMetaNamespaceKey(after)
46-
return
44+
return gvr, namespace, name, err
4745
}

component/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (c *Component[K]) List(ctx context.Context, indexValue fmt.Stringer) (out [
5858
ownedObjects, err := c.indexer.ByIndex(c.indexName, indexValue.String())
5959
if err != nil {
6060
utilruntime.HandleError(err)
61-
return
61+
return out
6262
}
6363
for _, d := range ownedObjects {
6464
ls := d.GetLabels()

go.mod

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/authzed/controller-idioms
22

3-
go 1.24.0
4-
5-
toolchain go1.24.4
3+
go 1.25.0
64

75
require (
86
github.com/cespare/xxhash/v2 v2.3.0
@@ -89,6 +87,7 @@ require (
8987
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
9088
github.com/MakeNowJust/heredoc v1.0.0 // indirect
9189
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
90+
github.com/authzed/ctxkey v0.0.0-20260210154927-ca132876f62c // indirect
9291
github.com/chai2010/gettext-go v1.0.2 // indirect
9392
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
9493
github.com/fxamacker/cbor/v2 v2.8.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
1010
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
1111
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
1212
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
13+
github.com/authzed/ctxkey v0.0.0-20260210154927-ca132876f62c h1:kcjpI9wcj5yFYWJ7bEeiY6hqH2I6Es3lVz6PB7RKblQ=
14+
github.com/authzed/ctxkey v0.0.0-20260210154927-ca132876f62c/go.mod h1:ve6DXRv9l2HcM2lxSUmZKWXl7Iq/00xYixBuHOtST+4=
1315
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
1416
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
1517
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=

manager/controller_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"sync"
78
"testing"
89
"time"
910

@@ -32,7 +33,7 @@ func ExampleNewOwnedResourceController() {
3233
CtxQueue := queue.NewQueueOperationsCtx()
3334
registry := typed.NewRegistry()
3435
broadcaster := record.NewBroadcaster()
35-
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")}
36+
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewClientset().CoreV1().Events("")}
3637

3738
// the controller processes objects on the queue, but doesn't set up any
3839
// informers by default.
@@ -120,11 +121,12 @@ func TestControllerEventsBroadcast(t *testing.T) {
120121
recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test")
121122

122123
require.Eventually(t, func() bool {
123-
return len(eventSink.Events) > 0
124+
return eventSink.Len() > 0
124125
}, 5*time.Second, 1*time.Millisecond)
125126
}
126127

127128
type fakeEventSink struct {
129+
mu sync.Mutex
128130
Events map[types.UID]*v1.Event
129131
}
130132

@@ -134,18 +136,30 @@ func newFakeEventSink() *fakeEventSink {
134136
}
135137
}
136138

139+
func (f *fakeEventSink) Len() int {
140+
f.mu.Lock()
141+
defer f.mu.Unlock()
142+
return len(f.Events)
143+
}
144+
137145
func (f *fakeEventSink) Create(event *v1.Event) (*v1.Event, error) {
146+
f.mu.Lock()
138147
f.Events[event.UID] = event
148+
f.mu.Unlock()
139149
return event, nil
140150
}
141151

142152
func (f *fakeEventSink) Update(event *v1.Event) (*v1.Event, error) {
153+
f.mu.Lock()
143154
f.Events[event.UID] = event
155+
f.mu.Unlock()
144156
return event, nil
145157
}
146158

147159
func (f *fakeEventSink) Patch(oldEvent *v1.Event, _ []byte) (*v1.Event, error) {
160+
f.mu.Lock()
148161
f.Events[oldEvent.UID] = oldEvent
162+
f.mu.Unlock()
149163
return oldEvent, nil
150164
}
151165

queue/handlers.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
// Package queue provides helpers for working with client-go's `workqueues` and control flow handlers.
2+
//
3+
// This file provides handlers that integrate with the state package to provide clean
4+
// control flow patterns for controllers. Instead of manually calling queue operations
5+
// and returning, controllers can now use:
6+
//
7+
// return queue.Done()
8+
// return queue.Requeue()
9+
// return queue.RequeueAfter(5 * time.Second)
10+
// return queue.RequeueErr(err)
11+
//
12+
// These handlers automatically call the appropriate queue operations and terminate
13+
// the handler pipeline by returning nil.
14+
//
15+
// # Error Propagation Pattern
16+
//
17+
// Errors in this system are communicated via:
18+
//
19+
// 1. **Context Cancellation**: ctx.Err() is checked by Continue() and parallel execution.
20+
// When context is cancelled, the pipeline stops (unless WithErrorHandler is set).
21+
//
22+
// 2. **Explicit Error Handlers**: queue.RequeueErr(err) and queue.RequeueAPIErr(err)
23+
// take an explicit error and pass it to the queue for retry logic.
24+
//
25+
// 3. **Context Values**: Steps can store errors in context using typedctx.Box or
26+
// context.WithValue, then check them in subsequent steps or OnError handlers.
27+
//
28+
// 4. **Decision-based Error Handling**: Use Decision() to branch based on whether
29+
// an error occurred, or handle errors inline:
30+
//
31+
// riskyOperation := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
32+
// result, err := doSomethingRisky()
33+
// if err != nil {
34+
// return queue.RequeueErr(err).Step().Run(ctx) // Handle error inline
35+
// }
36+
// // Store result and continue
37+
// ctx = context.WithValue(ctx, "result", result)
38+
// return state.Continue(ctx, next)
39+
// })
40+
//
41+
// Or with Decision for conditional logic:
42+
//
43+
// Sequence(
44+
// validateInput,
45+
// Decision(
46+
// func(ctx context.Context) bool { return isValid(ctx) },
47+
// continueProcessing,
48+
// queue.RequeueErr(fmt.Errorf("validation failed")),
49+
// ),
50+
// )
51+
package queue
52+
53+
import (
54+
"context"
55+
"time"
56+
57+
"github.com/authzed/controller-idioms/state"
58+
)
59+
60+
// Done creates a handler that marks the current queue key as finished and terminates the pipeline.
61+
// This is equivalent to calling queue.NewQueueOperationsCtx().Done(ctx) and returning.
62+
//
63+
// Usage:
64+
//
65+
// pipeline := state.Sequence(
66+
// validateInput,
67+
// processResource,
68+
// queue.Done(), // Stop here - processing complete
69+
// )
70+
func Done() state.NewStep {
71+
return state.NewTerminalStepFunc(func(ctx context.Context) {
72+
NewQueueOperationsCtx().Done(ctx)
73+
})
74+
}
75+
76+
// Requeue creates a handler that requeues the current key immediately and terminates the pipeline.
77+
// This is equivalent to calling queue.NewQueueOperationsCtx().Requeue(ctx) and returning.
78+
//
79+
// Usage:
80+
//
81+
// pipeline := state.Decision(
82+
// resourceReady,
83+
// continueProcessing,
84+
// queue.Requeue(), // Not ready - try again immediately
85+
// )
86+
func Requeue() state.NewStep {
87+
return state.NewTerminalStepFunc(func(ctx context.Context) {
88+
NewQueueOperationsCtx().Requeue(ctx)
89+
})
90+
}
91+
92+
// RequeueAfter creates a handler that requeues the current key after the specified duration
93+
// and terminates the pipeline.
94+
// This is equivalent to calling queue.NewQueueOperationsCtx().RequeueAfter(ctx, duration) and returning.
95+
//
96+
// Usage:
97+
//
98+
// pipeline := state.Decision(
99+
// resourceReady,
100+
// continueProcessing,
101+
// queue.RequeueAfter(30 * time.Second), // Not ready - try again in 30s
102+
// )
103+
func RequeueAfter(duration time.Duration) state.NewStep {
104+
return state.NewTerminalStepFunc(func(ctx context.Context) {
105+
NewQueueOperationsCtx().RequeueAfter(ctx, duration)
106+
})
107+
}
108+
109+
// RequeueErr creates a handler that records an error and requeues the current key immediately,
110+
// then terminates the pipeline.
111+
// This is equivalent to calling queue.NewQueueOperationsCtx().RequeueErr(ctx, err) and returning.
112+
//
113+
// Usage - return directly from within a step when error occurs:
114+
//
115+
// validateInput := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
116+
// obj := getObjectFromContext(ctx)
117+
// if err := validate(obj); err != nil {
118+
// return queue.RequeueErr(fmt.Errorf("validation failed: %w", err)).Step().Run(ctx)
119+
// }
120+
// return state.Continue(ctx, next)
121+
// })
122+
//
123+
// Or use in Decision branches when the error is known statically:
124+
//
125+
// state.Decision(
126+
// inputValid,
127+
// continueProcessing,
128+
// queue.RequeueErr(fmt.Errorf("validation failed")), // Error known at composition time
129+
// )
130+
func RequeueErr(err error) state.NewStep {
131+
return state.NewTerminalStepFunc(func(ctx context.Context) {
132+
NewQueueOperationsCtx().RequeueErr(ctx, err)
133+
})
134+
}
135+
136+
// RequeueAPIErr creates a handler that handles API errors with appropriate retry logic
137+
// and terminates the pipeline.
138+
// This checks if the error contains retry information from the API server and requeues
139+
// accordingly, equivalent to calling queue.NewQueueOperationsCtx().RequeueAPIErr(ctx, err).
140+
//
141+
// Usage - return directly from within a step when error occurs:
142+
//
143+
// callKubernetesAPI := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
144+
// result, err := clientset.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
145+
// if err != nil {
146+
// return queue.RequeueAPIErr(err).Step().Run(ctx) // execute inline, terminating the pipeline
147+
// }
148+
// // Store result in context and continue
149+
// ctx = context.WithValue(ctx, "deployment", result)
150+
// return state.Continue(ctx, next)
151+
// })
152+
//
153+
// Note: .Step() is equivalent to calling the NewStep with nil: queue.RequeueAPIErr(err)(nil)
154+
// but is more readable and explicit about the conversion.
155+
//
156+
// This pattern keeps errors local to where they occur, avoiding the need to store
157+
// errors in context or check them in Decision branches.
158+
func RequeueAPIErr(err error) state.NewStep {
159+
return state.NewTerminalStepFunc(func(ctx context.Context) {
160+
NewQueueOperationsCtx().RequeueAPIErr(ctx, err)
161+
})
162+
}
163+
164+
// OnError creates a handler that executes different queue operations based on whether
165+
// an error occurred in the context.
166+
//
167+
// Usage:
168+
//
169+
// pipeline := state.Sequence(
170+
// riskyOperation,
171+
// queue.OnError(
172+
// queue.RequeueErr(fmt.Errorf("operation failed")), // If error
173+
// queue.Done(), // If success
174+
// ),
175+
// )
176+
func OnError(errorHandler, successHandler state.NewStep) state.NewStep {
177+
return func(next state.Step) state.Step {
178+
errStep := errorHandler(next)
179+
okStep := successHandler(next)
180+
return state.StepFunc(func(ctx context.Context) state.Step {
181+
if ctx.Err() != nil {
182+
if errStep != nil {
183+
return errStep.Run(ctx)
184+
}
185+
return nil
186+
}
187+
if okStep != nil {
188+
return okStep.Run(ctx)
189+
}
190+
return nil
191+
})
192+
}
193+
}

0 commit comments

Comments
 (0)