Skip to content

Commit a038785

Browse files
authored
refactor(eventstore): change Streamer interface to a single method, introduce Event Stream targets type (#52)
* refactor: simplify eventstore.Streamer interface to a single method * fix: all broken instances of eventstore.Streamer * test: add testcases for stream by types
1 parent ee6d733 commit a038785

24 files changed

Lines changed: 259 additions & 288 deletions

aggregate/repository.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/get-eventually/go-eventually/aggregate/snapshot"
1111
"github.com/get-eventually/go-eventually/eventstore"
12+
"github.com/get-eventually/go-eventually/eventstore/stream"
1213
)
1314

1415
// ErrRootNotFound is returned by the Repository when no Events for the
@@ -99,7 +100,7 @@ func (r *Repository) Add(ctx context.Context, root Root) error {
99100
return nil
100101
}
101102

102-
streamID := eventstore.StreamID{
103+
streamID := stream.ID{
103104
Type: r.aggregateType.Name(),
104105
Name: root.AggregateID().String(),
105106
}
@@ -144,7 +145,7 @@ func (r Repository) Get(ctx context.Context, id ID) (Root, error) {
144145
ctx, cancel := context.WithCancel(ctx)
145146
defer cancel()
146147

147-
streamID := eventstore.StreamID{
148+
streamID := stream.ID{
148149
Type: r.aggregateType.Name(),
149150
Name: id.String(),
150151
}
@@ -154,19 +155,19 @@ func (r Repository) Get(ctx context.Context, id ID) (Root, error) {
154155
return nil, fmt.Errorf("aggregate.Repository: failed to fetch initial aggregate root state: %w", err)
155156
}
156157

157-
stream := make(chan eventstore.Event, 1)
158+
eventStream := make(chan eventstore.Event, 1)
158159
streamSelect := eventstore.Select{From: root.Version()}
159160

160161
group, ctx := errgroup.WithContext(ctx)
161162
group.Go(func() error {
162-
if err := r.eventStore.Stream(ctx, stream, streamID, streamSelect); err != nil {
163+
if err := r.eventStore.Stream(ctx, eventStream, stream.ByID(streamID), streamSelect); err != nil {
163164
return fmt.Errorf("aggregate.Repository: failed while reading event from stream: %w", err)
164165
}
165166

166167
return nil
167168
})
168169

169-
for event := range stream {
170+
for event := range eventStream {
170171
isEmpty = false
171172

172173
if err := root.Apply(event.Event); err != nil {
@@ -187,7 +188,7 @@ func (r Repository) Get(ctx context.Context, id ID) (Root, error) {
187188
return root, nil
188189
}
189190

190-
func (r Repository) fetchAggregateRootState(ctx context.Context, id eventstore.StreamID) (Root, bool, error) {
191+
func (r Repository) fetchAggregateRootState(ctx context.Context, id stream.ID) (Root, bool, error) {
191192
state := r.aggregateType.instance()
192193
isEmpty := true
193194

command/error_recorder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/get-eventually/go-eventually"
88
"github.com/get-eventually/go-eventually/eventstore"
9+
"github.com/get-eventually/go-eventually/eventstore/stream"
910
)
1011

1112
// FailedType is the default stream type used by ErrorRecorder
@@ -58,13 +59,13 @@ func (er ErrorRecorder) streamType() string {
5859
return FailedType
5960
}
6061

61-
func (er ErrorRecorder) buildStreamID(cmd eventually.Command) eventstore.StreamID {
62+
func (er ErrorRecorder) buildStreamID(cmd eventually.Command) stream.ID {
6263
streamName := cmd.Payload.Name()
6364
if er.StreamNameMapper != nil {
6465
streamName = er.StreamNameMapper(cmd)
6566
}
6667

67-
return eventstore.StreamID{
68+
return stream.ID{
6869
Type: er.streamType(),
6970
Name: streamName,
7071
}

command/error_recorder_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/get-eventually/go-eventually/command"
1212
"github.com/get-eventually/go-eventually/eventstore"
1313
"github.com/get-eventually/go-eventually/eventstore/inmemory"
14+
"github.com/get-eventually/go-eventually/eventstore/stream"
1415
)
1516

1617
type mockCommand struct {
@@ -84,7 +85,7 @@ func TestErrorRecorder(t *testing.T) {
8485
assert.Equal(t, []eventstore.Event{
8586
{
8687
Version: 1,
87-
Stream: eventstore.StreamID{
88+
Stream: stream.ID{
8889
Type: command.FailedType,
8990
Name: expectedCommand.Payload.Name(),
9091
},
@@ -127,7 +128,7 @@ func TestErrorRecorder(t *testing.T) {
127128
assert.Equal(t, []eventstore.Event{
128129
{
129130
Version: 1,
130-
Stream: eventstore.StreamID{
131+
Stream: stream.ID{
131132
Type: command.FailedType,
132133
Name: expectedCommand.Payload.Name(),
133134
},
@@ -172,7 +173,7 @@ func TestErrorRecorder(t *testing.T) {
172173
assert.Equal(t, []eventstore.Event{
173174
{
174175
Version: 1,
175-
Stream: eventstore.StreamID{
176+
Stream: stream.ID{
176177
Type: expectedStreamType,
177178
Name: expectedCommand.Payload.Name(),
178179
},
@@ -219,7 +220,7 @@ func TestErrorRecorder(t *testing.T) {
219220
assert.Equal(t, []eventstore.Event{
220221
{
221222
Version: 1,
222-
Stream: eventstore.StreamID{
223+
Stream: stream.ID{
223224
Type: expectedStreamType,
224225
Name: expectedCommand.Payload.(mockCommand).message,
225226
},

eventstore/example_fuse_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99

1010
func ExampleFused() {
1111
eventStore := inmemory.NewEventStore()
12-
correlatedEventStore := correlation.WrapEventStore(eventStore, func() string {
13-
return "test-id"
14-
})
12+
correlatedEventStore := correlation.EventStoreWrapper{
13+
Appender: eventStore,
14+
Generator: func() string { return "test-id" },
15+
}
1516

1617
aggregate.NewRepository(aggregate.Type{}, eventstore.Fused{
1718
Appender: correlatedEventStore,

eventstore/inmemory/store.go

Lines changed: 67 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/get-eventually/go-eventually"
99
"github.com/get-eventually/go-eventually/eventstore"
10+
"github.com/get-eventually/go-eventually/eventstore/stream"
1011
)
1112

1213
var _ eventstore.Store = &EventStore{}
@@ -28,21 +29,7 @@ func NewEventStore() *EventStore {
2829
}
2930
}
3031

31-
// StreamAll streams all Event Store committed events onto the provided EventStream,
32-
// from the specified Global Sequence Number in `from`.
33-
//
34-
// Note: this call is synchronous, and will return when all the Events
35-
// have been successfully written to the provided EventStream, or when
36-
// the context has been canceled.
37-
//
38-
// An error is returned if one Event in the Event Store does not have a
39-
// Global Sequence Number (which should never happen), or when the context
40-
// is done.
41-
func (s *EventStore) StreamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error {
42-
s.mx.RLock()
43-
defer s.mx.RUnlock()
44-
defer close(es)
45-
32+
func (s *EventStore) streamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error {
4633
for _, event := range s.events {
4734
if event.SequenceNumber < selectt.From {
4835
continue
@@ -58,26 +45,12 @@ func (s *EventStore) StreamAll(ctx context.Context, es eventstore.EventStream, s
5845
return nil
5946
}
6047

61-
// StreamByType streams all Event Store committed events of a specific Type (or Category)
62-
// onto the provided EventStream, from the specified Global Sequence Number in `from`.
63-
//
64-
// Note: this call is synchronous, and will return when all the Events
65-
// have been successfully written to the provided EventStream, or when
66-
// the context has been canceled.
67-
//
68-
// An error is returned if one Event in the Event Store does not have a
69-
// Global Sequence Number (which should never happen), or when the context
70-
// is done.
71-
func (s *EventStore) StreamByType(
48+
func (s *EventStore) streamByType(
7249
ctx context.Context,
7350
es eventstore.EventStream,
7451
typ string,
7552
selectt eventstore.Select,
7653
) error {
77-
s.mx.RLock()
78-
defer s.mx.RUnlock()
79-
defer close(es)
80-
8154
for _, eventIdx := range s.byType[typ] {
8255
event := s.events[eventIdx]
8356

@@ -95,24 +68,42 @@ func (s *EventStore) StreamByType(
9568
return nil
9669
}
9770

98-
// Stream streams all Domain Events committed in a specific event stream
99-
// onto the provided EventStream channel, from the specified version in `from`.
100-
//
101-
// Note: this call is synchronous, and will return when all the Events
102-
// have been successfully written to the provided EventStream, or when
103-
// the context has been canceled.
104-
//
105-
// An error is returned when the context is done.
106-
func (s *EventStore) Stream(
71+
func (s *EventStore) streamByTypes(
10772
ctx context.Context,
10873
es eventstore.EventStream,
109-
id eventstore.StreamID,
74+
types []string,
11075
selectt eventstore.Select,
11176
) error {
112-
s.mx.RLock()
113-
defer s.mx.RUnlock()
114-
defer close(es)
77+
indexedTypes := make(map[string]struct{})
78+
for _, typ := range types {
79+
indexedTypes[typ] = struct{}{}
80+
}
81+
82+
for _, event := range s.events {
83+
if event.SequenceNumber < selectt.From {
84+
continue
85+
}
11586

87+
if _, ok := indexedTypes[event.Stream.Type]; !ok {
88+
continue
89+
}
90+
91+
select {
92+
case es <- event:
93+
case <-ctx.Done():
94+
return contextErr(ctx)
95+
}
96+
}
97+
98+
return nil
99+
}
100+
101+
func (s *EventStore) streamByID(
102+
ctx context.Context,
103+
es eventstore.EventStream,
104+
id stream.ID,
105+
selectt eventstore.Select,
106+
) error {
116107
if m, ok := s.byTypeAndInstance[id.Type]; !ok || m == nil {
117108
return nil
118109
}
@@ -139,6 +130,38 @@ func (s *EventStore) Stream(
139130
return nil
140131
}
141132

133+
// Stream streams committed events in the Event Store onto the provided EventStream,
134+
// from the specified Global Sequence Number in `from`, based on the provided stream.Target.
135+
//
136+
// Note: this call is synchronous, and will return when all the Events
137+
// have been successfully written to the provided EventStream, or when
138+
// the context has been canceled.
139+
//
140+
// This method fails only when the context is canceled.
141+
func (s *EventStore) Stream(
142+
ctx context.Context,
143+
es eventstore.EventStream,
144+
target stream.Target,
145+
selectt eventstore.Select,
146+
) error {
147+
s.mx.RLock()
148+
defer s.mx.RUnlock()
149+
defer close(es)
150+
151+
switch t := target.(type) {
152+
case stream.All:
153+
return s.streamAll(ctx, es, selectt)
154+
case stream.ByType:
155+
return s.streamByType(ctx, es, string(t), selectt)
156+
case stream.ByTypes:
157+
return s.streamByTypes(ctx, es, []string(t), selectt)
158+
case stream.ByID:
159+
return s.streamByID(ctx, es, stream.ID(t), selectt)
160+
default:
161+
return fmt.Errorf("inmemory.EventStore: unsupported stream target type provided: %T", t)
162+
}
163+
}
164+
142165
// Append inserts the specified Domain Events into the Event Stream specified
143166
// by the current instance, returning the new version of the Event Stream.
144167
//
@@ -153,7 +176,7 @@ func (s *EventStore) Stream(
153176
// version check fails against the current version of the Event Stream.
154177
func (s *EventStore) Append(
155178
ctx context.Context,
156-
id eventstore.StreamID,
179+
id stream.ID,
157180
expected eventstore.VersionCheck,
158181
events ...eventually.Event,
159182
) (int64, error) {

eventstore/inmemory/tracking.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/get-eventually/go-eventually"
88
"github.com/get-eventually/go-eventually/eventstore"
9+
"github.com/get-eventually/go-eventually/eventstore/stream"
910
)
1011

1112
// TrackingEventStore is an Event Store wrapper to track the Events
@@ -45,7 +46,7 @@ func (es *TrackingEventStore) Recorded() []eventstore.Event {
4546
// The recorded events can be accessed by calling Recorded().
4647
func (es *TrackingEventStore) Append(
4748
ctx context.Context,
48-
id eventstore.StreamID,
49+
id stream.ID,
4950
expected eventstore.VersionCheck,
5051
events ...eventually.Event,
5152
) (int64, error) {

eventstore/inmemory/tracking_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/get-eventually/go-eventually"
1010
"github.com/get-eventually/go-eventually/eventstore"
1111
"github.com/get-eventually/go-eventually/eventstore/inmemory"
12+
"github.com/get-eventually/go-eventually/eventstore/stream"
1213
"github.com/get-eventually/go-eventually/internal"
1314
)
1415

@@ -24,7 +25,7 @@ func TestTrackingEventStore(t *testing.T) {
2425
eventStore := inmemory.NewEventStore()
2526
trackingEventStore := inmemory.NewTrackingEventStore(eventStore)
2627

27-
streamID := eventstore.StreamID{
28+
streamID := stream.ID{
2829
Type: "test-type",
2930
Name: "test-instance",
3031
}
@@ -43,7 +44,7 @@ func TestTrackingEventStore(t *testing.T) {
4344

4445
// Compare events in the event store and recorded ones from the tracking store.
4546
events, err := eventstore.StreamToSlice(ctx, func(ctx context.Context, es eventstore.EventStream) error {
46-
return eventStore.Stream(ctx, es, streamID, eventstore.SelectFromBeginning)
47+
return eventStore.Stream(ctx, es, stream.ByID(streamID), eventstore.SelectFromBeginning)
4748
})
4849

4950
assert.NoError(t, err)

0 commit comments

Comments
 (0)