Skip to content

Commit d24a80e

Browse files
KostassoidKonstantin Alexandrovar3s3ru
authored
Allow to wrap Event appending function for Postgres EventStore (#59)
* Allow to wrap append to store function in postgres.EventStore * Cleanup * Update eventstore/postgres/store.go Co-authored-by: Danilo Cianfrone <danilocianfr@gmail.com> * Add documentation, fix linter issues Co-authored-by: Konstantin Alexandrov <kale@hellofresh.com> Co-authored-by: Danilo Cianfrone <danilocianfr@gmail.com>
1 parent 985eabf commit d24a80e

4 files changed

Lines changed: 98 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ project adheres to [Semantic Versioning](https://semver.org/).
77

88
## [Unreleased]
99
### Added
10-
- ...
10+
- An option to override Event appending logic in Postgres EventStore implementation.
1111

1212
### Changed
1313
- ...

eventstore/postgres/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
3030
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
3131
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
3232
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
33-
github.com/get-eventually/go-eventually v0.0.0-20210830220720-a038785a5316 h1:mdvJ8lYRc0BXPnkQczyF7EeV9xNnKTMh459lnetvDus=
34-
github.com/get-eventually/go-eventually v0.0.0-20210830220720-a038785a5316/go.mod h1:yOBDlloZ9BP1Hbmdp0lvkCC49esxLizZuytfjakJ5J4=
3533
github.com/get-eventually/go-eventually v0.0.0-20210903204041-6da801fa7f4e h1:9+YQjGoKqBYE0I2GPW3HVNb6k6T6dQHkvEcLORoIOgg=
3634
github.com/get-eventually/go-eventually v0.0.0-20210903204041-6da801fa7f4e/go.mod h1:yOBDlloZ9BP1Hbmdp0lvkCC49esxLizZuytfjakJ5J4=
3735
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=

eventstore/postgres/store.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,47 @@ var (
2020
_ eventstore.SequenceNumberGetter = &EventStore{}
2121
)
2222

23+
// AppendToStoreFunc represents a function type for persisting an instance of eventually.Event in postgres.EventStore.
24+
type AppendToStoreFunc func(
25+
ctx context.Context,
26+
tx *sql.Tx,
27+
id stream.ID,
28+
expected eventstore.VersionCheck,
29+
event eventually.Event,
30+
) (int64, error)
31+
2332
// EventStore is an eventstore.Store implementation which uses
2433
// PostgreSQL as backend datastore.
2534
type EventStore struct {
26-
db *sql.DB
27-
registry eventstore.Registry
35+
db *sql.DB
36+
registry eventstore.Registry
37+
appendToStore AppendToStoreFunc
38+
}
39+
40+
// Option defines a type for providing additional constructor adjustments for postgres.EventStore.
41+
type Option func(EventStore) EventStore
42+
43+
// WithAppendMiddleware allows overriding the internal logic for appending events within a transaction.
44+
func WithAppendMiddleware(wrap func(AppendToStoreFunc) AppendToStoreFunc) Option {
45+
return func(store EventStore) EventStore {
46+
store.appendToStore = wrap(store.appendToStore)
47+
return store
48+
}
2849
}
2950

3051
// NewEventStore creates a new EventStore using the database connection pool provided.
31-
func NewEventStore(db *sql.DB) EventStore {
32-
return EventStore{
33-
db: db,
34-
registry: eventstore.NewRegistry(json.Unmarshal),
52+
func NewEventStore(db *sql.DB, options ...Option) EventStore {
53+
store := EventStore{
54+
db: db,
55+
registry: eventstore.NewRegistry(json.Unmarshal),
56+
appendToStore: appendEvent,
3557
}
58+
59+
for _, option := range options {
60+
store = option(store)
61+
}
62+
63+
return store
3664
}
3765

3866
// Register registers Domain Events used by the application in order to decode events
@@ -129,7 +157,7 @@ func (st EventStore) Append(
129157
}()
130158

131159
for _, event := range events {
132-
if v, err = st.appendEvent(ctx, tx, id, expected, event); err != nil {
160+
if v, err = st.appendToStore(ctx, tx, id, expected, event); err != nil {
133161
return 0, err
134162
}
135163

@@ -145,7 +173,7 @@ func (st EventStore) Append(
145173
}
146174

147175
// TODO(ar3s3ru): add the ErrConflict error in case of optimistic concurrency issues.
148-
func (st *EventStore) appendEvent(
176+
func appendEvent(
149177
ctx context.Context,
150178
tx *sql.Tx,
151179
id stream.ID,

eventstore/postgres/store_test.go

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,29 @@ func obtainEventStore(t *testing.T) (*sql.DB, postgres.EventStore) {
4040
db, err := sql.Open("postgres", url)
4141
require.NoError(t, err)
4242

43+
handleError := func(err error) {
44+
if !assert.NoError(t, err) {
45+
t.FailNow()
46+
}
47+
}
48+
49+
tx, err := db.Begin()
50+
require.NoError(t, err)
51+
52+
// Reset checkpoints for subscriptions.
53+
_, err = tx.Exec("DELETE FROM subscriptions_checkpoints")
54+
require.NoError(t, err)
55+
56+
// Reset committed events and streams.
57+
_, err = tx.Exec("DELETE FROM streams")
58+
require.NoError(t, err)
59+
60+
// Reset the global sequence number to 1.
61+
_, err = tx.Exec("ALTER SEQUENCE events_global_sequence_number_seq RESTART WITH 1")
62+
require.NoError(t, err)
63+
64+
handleError(tx.Commit())
65+
4366
return db, postgres.NewEventStore(db)
4467
}
4568

@@ -50,29 +73,6 @@ func TestStoreSuite(t *testing.T) {
5073
require.NoError(t, store.Register(internal.IntPayload(0)))
5174

5275
suite.Run(t, eventstore.NewStoreSuite(func() eventstore.Store {
53-
handleError := func(err error) {
54-
if !assert.NoError(t, err) {
55-
t.FailNow()
56-
}
57-
}
58-
59-
tx, err := db.Begin()
60-
require.NoError(t, err)
61-
62-
// Reset checkpoints for subscriptions.
63-
_, err = tx.Exec("DELETE FROM subscriptions_checkpoints")
64-
require.NoError(t, err)
65-
66-
// Reset committed events and streams.
67-
_, err = tx.Exec("DELETE FROM streams")
68-
require.NoError(t, err)
69-
70-
// Reset the global sequence number to 1.
71-
_, err = tx.Exec("ALTER SEQUENCE events_global_sequence_number_seq RESTART WITH 1")
72-
require.NoError(t, err)
73-
74-
handleError(tx.Commit())
75-
7676
return store
7777
}))
7878
}
@@ -111,3 +111,41 @@ func TestLatestSequenceNumber(t *testing.T) {
111111
assert.NoError(t, err)
112112
assert.Equal(t, latestSequenceNumber, actual)
113113
}
114+
115+
func TestAppendToStoreWrapperOption(t *testing.T) {
116+
db, _ := obtainEventStore(t)
117+
defer func() { assert.NoError(t, db.Close()) }()
118+
119+
triggered := false
120+
121+
store := postgres.NewEventStore(
122+
db,
123+
postgres.WithAppendMiddleware(func(super postgres.AppendToStoreFunc) postgres.AppendToStoreFunc {
124+
return func(
125+
ctx context.Context,
126+
tx *sql.Tx,
127+
id stream.ID,
128+
expected eventstore.VersionCheck,
129+
event eventually.Event,
130+
) (int64, error) {
131+
triggered = true
132+
return super(ctx, tx, id, expected, event)
133+
}
134+
}),
135+
)
136+
137+
require.NoError(t, store.Register(internal.IntPayload(0)))
138+
139+
ctx := context.Background()
140+
141+
_, _ = store.Append(
142+
ctx,
143+
firstInstance,
144+
eventstore.VersionCheck(int64(-1)),
145+
eventually.Event{Payload: internal.IntPayload(13)},
146+
)
147+
148+
latestSequenceNumber, _ := store.LatestSequenceNumber(ctx)
149+
assert.Equal(t, int64(1), latestSequenceNumber)
150+
assert.True(t, triggered)
151+
}

0 commit comments

Comments
 (0)