Skip to content

Commit 47f3be0

Browse files
authored
feat(projection): pilot checkpointing behavior inside Projection.Apply (#54)
* feat(subscription): add CheckpointerMock * feat(projection): add projection.Checkpoint and projection.DoNotCheckpoint * feat: add Logger for projection.Runner
1 parent 6975b95 commit 47f3be0

7 files changed

Lines changed: 369 additions & 199 deletions

File tree

eventstore/store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ type Event struct {
4848
SequenceNumber int64
4949
}
5050

51+
// MapEvents maps a list of persisted eventstore.Event to only their internal Event data.
52+
func MapEvents(events []Event) []eventually.Event {
53+
res := make([]eventually.Event, 0, len(events))
54+
55+
for _, event := range events {
56+
res = append(res, event.Event)
57+
}
58+
59+
return res
60+
}
61+
5162
// EventStream is a stream of persisted Events.
5263
type EventStream chan<- Event
5364

logger/test_logger.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package logger
2+
3+
import "testing"
4+
5+
var _ Logger = Test{}
6+
7+
// Test is a logger.Logger implementation using testing.T instance.
8+
type Test struct{ t *testing.T }
9+
10+
// NewTest returns a new logger using the provided testing.T instance.
11+
func NewTest(t *testing.T) Test {
12+
return Test{t: t}
13+
}
14+
15+
// Debug uses t.Logf to print a debug message.
16+
func (t Test) Debug(msg string, fields ...Field) {
17+
t.t.Logf("[debug] %s {args: %+v}\n", msg, fields)
18+
}
19+
20+
// Info uses t.Logf to print an info message.
21+
func (t Test) Info(msg string, fields ...Field) {
22+
t.t.Logf("[info] %s {args: %+v}\n", msg, fields)
23+
}
24+
25+
// Error uses t.Logf to print an error message.
26+
func (t Test) Error(msg string, fields ...Field) {
27+
t.t.Logf("[error] %s {args: %+v}\n", msg, fields)
28+
}

projection/projection.go

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@ package projection
22

33
import (
44
"context"
5-
"fmt"
6-
7-
"golang.org/x/sync/errgroup"
85

96
"github.com/get-eventually/go-eventually/eventstore"
107
"github.com/get-eventually/go-eventually/query"
11-
"github.com/get-eventually/go-eventually/subscription"
128
)
139

1410
// Applier is a segregated Projection interface that focuses on applying
@@ -44,69 +40,3 @@ type Projection interface {
4440
query.Handler
4541
Applier
4642
}
47-
48-
// DefaultRunnerBufferSize is the default size for the buffered channels
49-
// opened by a Runner instance, if not specified.
50-
const DefaultRunnerBufferSize = 32
51-
52-
// Runner is an infrastructural component that orchestrates the state update
53-
// of an Applier or a Projection using the provided Subscription,
54-
// to subscribe to incoming events from the Event Store.
55-
type Runner struct {
56-
Applier Applier
57-
Subscription subscription.Subscription
58-
BufferSize int
59-
}
60-
61-
// Run starts listening to Events from the provided Subscription
62-
// and sinking them to the Applier instance to trigger state change.
63-
//
64-
// Run is a blocking call, that will exit when either the Applier returns an error,
65-
// or the Subscription stops.
66-
//
67-
// Run uses buffered channels to coordinate events communication between components,
68-
// using the value specified in BufferSize, if any, or DefaultRunnerBufferSize otherwise.
69-
//
70-
// To stop the Runner, cancel the provided context.
71-
// If the error returned upon exit is context.Canceled, that usually represent
72-
// a case of normal operation, so it could be treated as a non-error.
73-
func (r Runner) Run(ctx context.Context) error {
74-
if r.BufferSize == 0 {
75-
r.BufferSize = DefaultRunnerBufferSize
76-
}
77-
78-
eventStream := make(chan eventstore.Event, r.BufferSize)
79-
toCheckpoint := make(chan eventstore.Event, r.BufferSize)
80-
81-
group, ctx := errgroup.WithContext(ctx)
82-
83-
group.Go(func() error {
84-
if err := r.Subscription.Start(ctx, eventStream); err != nil {
85-
return fmt.Errorf("projection.Runner: subscription exited with error: %w", err)
86-
}
87-
88-
return nil
89-
})
90-
91-
group.Go(func() error {
92-
defer close(toCheckpoint)
93-
94-
for event := range eventStream {
95-
if err := r.Applier.Apply(ctx, event); err != nil {
96-
return fmt.Errorf("projection.Runner: failed to apply event to projection: %w", err)
97-
}
98-
99-
toCheckpoint <- event
100-
}
101-
102-
return nil
103-
})
104-
105-
for event := range toCheckpoint {
106-
if err := r.Subscription.Checkpoint(ctx, event); err != nil {
107-
return fmt.Errorf("projection.Runner: failed to checkpoint event: %w", err)
108-
}
109-
}
110-
111-
return group.Wait()
112-
}

projection/projection_test.go

Lines changed: 0 additions & 129 deletions
This file was deleted.

projection/runner.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package projection
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"golang.org/x/sync/errgroup"
8+
9+
"github.com/get-eventually/go-eventually/eventstore"
10+
"github.com/get-eventually/go-eventually/logger"
11+
"github.com/get-eventually/go-eventually/subscription"
12+
)
13+
14+
// DefaultRunnerBufferSize is the default size for the buffered channels
15+
// opened by a Runner instance, if not specified.
16+
const DefaultRunnerBufferSize = 32
17+
18+
type shouldCheckpointCtxKey struct{}
19+
20+
func contextWithPreferredCheckpointStrategy(ctx context.Context, v *bool) context.Context {
21+
return context.WithValue(ctx, shouldCheckpointCtxKey{}, v)
22+
}
23+
24+
// Checkpoint should be used in a projection.Apply method to signal
25+
// to a projection.Runner that the event processed should not checkpointed.
26+
//
27+
// NOTE: this is currently the default behavior of projection.Runner, so using
28+
// this method is not usually necessary.
29+
func Checkpoint(ctx context.Context) {
30+
if v, ok := ctx.Value(shouldCheckpointCtxKey{}).(*bool); ok {
31+
*v = true
32+
}
33+
}
34+
35+
// DoNotCheckpoint should be used in a projection.Apply method to signal
36+
// to a projection.Runner that the event processed should not be checkpointed.
37+
func DoNotCheckpoint(ctx context.Context) {
38+
if v, ok := ctx.Value(shouldCheckpointCtxKey{}).(*bool); ok {
39+
*v = false
40+
}
41+
}
42+
43+
// Runner is an infrastructural component that orchestrates the state update
44+
// of an Applier or a Projection using the provided Subscription,
45+
// to subscribe to incoming events from the Event Store.
46+
type Runner struct {
47+
Applier Applier
48+
Subscription subscription.Subscription
49+
BufferSize int
50+
Logger logger.Logger
51+
}
52+
53+
// Run starts listening to Events from the provided Subscription
54+
// and sinking them to the Applier instance to trigger state change.
55+
//
56+
// Run is a blocking call, that will exit when either the Applier returns an error,
57+
// or the Subscription stops.
58+
//
59+
// Run uses buffered channels to coordinate events communication between components,
60+
// using the value specified in BufferSize, if any, or DefaultRunnerBufferSize otherwise.
61+
//
62+
// To stop the Runner, cancel the provided context.
63+
// If the error returned upon exit is context.Canceled, that usually represent
64+
// a case of normal operation, so it could be treated as a non-error.
65+
func (r Runner) Run(ctx context.Context) error {
66+
if r.BufferSize == 0 {
67+
r.BufferSize = DefaultRunnerBufferSize
68+
}
69+
70+
eventStream := make(chan eventstore.Event, r.BufferSize)
71+
toCheckpoint := make(chan eventstore.Event, r.BufferSize)
72+
73+
group, ctx := errgroup.WithContext(ctx)
74+
75+
group.Go(func() error {
76+
logger.Info(r.Logger, "Subscription started for projection runner")
77+
78+
if err := r.Subscription.Start(ctx, eventStream); err != nil {
79+
return fmt.Errorf("projection.Runner: subscription exited with error: %w", err)
80+
}
81+
82+
return nil
83+
})
84+
85+
group.Go(func() error {
86+
defer close(toCheckpoint)
87+
88+
for event := range eventStream {
89+
// Default behavior for chechkpointing is to checkpoint every event processed.
90+
//
91+
// Users could use projection.DoNotCheckpoint(ctx) to specify whether they want the message
92+
// not to be checkpointed.
93+
checkpoint := true
94+
ctx := contextWithPreferredCheckpointStrategy(ctx, &checkpoint) //nolint:govet // Shadowing is fine.
95+
96+
if err := r.Applier.Apply(ctx, event); err != nil {
97+
return fmt.Errorf("projection.Runner: failed to apply event to projection: %w", err)
98+
}
99+
100+
if checkpoint {
101+
toCheckpoint <- event
102+
}
103+
104+
logger.Info(r.Logger, "Skip checkpointing of event processed",
105+
logger.With("sequenceNumber", event.SequenceNumber),
106+
)
107+
}
108+
109+
return nil
110+
})
111+
112+
for event := range toCheckpoint {
113+
logger.Info(r.Logger, "Checkpointing processed event",
114+
logger.With("sequenceNumber", event.SequenceNumber),
115+
)
116+
117+
if err := r.Subscription.Checkpoint(ctx, event); err != nil {
118+
return fmt.Errorf("projection.Runner: failed to checkpoint event: %w", err)
119+
}
120+
}
121+
122+
return group.Wait()
123+
}

0 commit comments

Comments
 (0)