Skip to content

Commit 0db1497

Browse files
committed
Extend the Function stage with panic handler support.
Stages can be extended to allow panics to be caught and reported. The panic will still bubble up once the panic handler has resolved.
1 parent 951775d commit 0db1497

2 files changed

Lines changed: 39 additions & 4 deletions

File tree

pipe/function.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,42 @@ func Function(name string, f StageFunc) Stage {
3232
// goStage is a `Stage` that does its work by running an arbitrary
3333
// `stageFunc` in a goroutine.
3434
type goStage struct {
35-
name string
36-
f StageFunc
37-
done chan struct{}
38-
err error
35+
name string
36+
f StageFunc
37+
done chan struct{}
38+
err error
39+
panicHandler StagePanicHandler
3940
}
4041

4142
func (s *goStage) Name() string {
4243
return s.name
4344
}
4445

46+
func (s *goStage) SetPanicHandler(ph StagePanicHandler) {
47+
s.panicHandler = ph
48+
}
49+
4550
func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) {
4651
r, w := io.Pipe()
52+
4753
go func() {
54+
defer func() {
55+
if p := recover(); p != nil {
56+
if s.panicHandler == nil {
57+
// Nothing to do, just panic
58+
panic(p)
59+
}
60+
61+
_ = w.Close()
62+
if stdin != nil {
63+
_ = stdin.Close()
64+
}
65+
close(s.done)
66+
67+
s.err = s.panicHandler(p)
68+
}
69+
}()
70+
4871
s.err = s.f(ctx, env, stdin, w)
4972
if err := w.Close(); err != nil && s.err == nil {
5073
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)

pipe/panic.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package pipe
2+
3+
// StagePanicHandlerAware is an interface that Stages can implement to receive
4+
// a panic handler from the pipeline. This is particularly useful for stages
5+
// that execute work in a separate goroutine and need to manage panics occurring
6+
// within that goroutine.
7+
type StagePanicHandlerAware interface {
8+
SetPanicHandler(StagePanicHandler)
9+
}
10+
11+
// StagePanicHandler is a function that handles panics in the pipeline and its stages.
12+
type StagePanicHandler func(p any) error

0 commit comments

Comments
 (0)