Skip to content

Commit 3e8f966

Browse files
authored
feat(postgres): add Serde interface (#62)
* feat: add serde * fix: migration casts correctly from JSONB to BYTEA * fix: migrations can be applied correctly * fix: linting errors * refactor: change event appending middleware to use the serialized payload * chore: add coverpkg flag
1 parent 76dc654 commit 3e8f966

14 files changed

Lines changed: 889 additions & 242 deletions

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ project adheres to [Semantic Versioning](https://semver.org/).
77

88
## [Unreleased]
99
### Added
10-
- An option to override Event appending logic in Postgres EventStore implementation.
10+
- An option to override Event appending logic in Postgres EventStore implementation.
11+
- `postgres.Serde` interface to support more serialization formats.
1112

1213
### Changed
1314
- Existing `Event-Id` value in Event Metadata does not get overwritten in correlation.EventStoreWrapper.
15+
- `postgres.EventStore` now uses the `Serde` interface for serializing to and deserializing from byte array.
16+
- `postgres.Registry` is now called `postgres.JSONRegistry` and implements thenew `postgres.Serde` interface.
1417

1518
### Deprecated
1619
- ...

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
GO_TEST_FLAGS := -race -v
1+
GO_TEST_FLAGS := -race -v -coverpkg=./...
22
GOLANGCI_YML ?= $(shell find ~+ -name .golangci.yml)
33

44
.PHONY: run-linter

eventstore/postgres/checkpointer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
func TestCheckpointer(t *testing.T) {
15-
db, _ := obtainEventStore(t)
15+
db, _, _ := obtainEventStore(t)
1616
defer func() { assert.NoError(t, db.Close()) }()
1717

1818
log := zaplogger.Wrap(zap.NewNop())

eventstore/postgres/migrations/2_append_to_store.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
-- NOTE: this function has been superseded by the version in 7_convert_payload_to_byte_array.up!
12
CREATE OR REPLACE FUNCTION append_to_store(
23
_stream_type TEXT,
34
stream_id TEXT,

eventstore/postgres/migrations/5_correlated_events.up.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ CREATE TABLE correlated_events (
1111

1212
CREATE MATERIALIZED VIEW correlated_events_view AS
1313
SELECT ce.correlation_id, e.*
14-
FROM correlated_events ce
15-
INNER JOIN events e ON e.stream_type = ce.event_stream_type
16-
AND e.stream_id = ce.event_stream_id
17-
AND e.version = ce.event_stream_version;
14+
FROM correlated_events ce INNER JOIN events e
15+
ON e.stream_type = ce.event_stream_type
16+
AND e.stream_id = ce.event_stream_id
17+
AND e.version = ce.event_stream_version;
1818

1919
CREATE OR REPLACE FUNCTION project_correlated_event()
2020
RETURNS TRIGGER
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-- NOTE: this migration is IRREVERSIBLE!
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
-- correlated_events_view is using the old event column. The quickest way
2+
-- to solve the conflicting issue is to drop the materialized view and rebuild it.
3+
DROP MATERIALIZED VIEW correlated_events_view;
4+
5+
-- Change the events table into a byte array.
6+
ALTER TABLE events
7+
ALTER COLUMN "event" TYPE BYTEA USING decode("event"::TEXT, 'escape');
8+
9+
-- Recreate the materialized view.
10+
CREATE MATERIALIZED VIEW correlated_events_view AS
11+
SELECT ce.correlation_id, e.*
12+
FROM correlated_events ce INNER JOIN events e
13+
ON e.stream_type = ce.event_stream_type
14+
AND e.stream_id = ce.event_stream_id
15+
AND e.version = ce.event_stream_version;
16+
17+
-- Due to using the same name and number of parameters, it's better to drop
18+
-- the previous version of the function explicitly, and recreate it after
19+
-- with the new signature.
20+
DROP FUNCTION append_to_store(TEXT, TEXT, INTEGER, TEXT, JSONB, JSONB);
21+
22+
CREATE FUNCTION append_to_store(
23+
_stream_type TEXT,
24+
stream_id TEXT,
25+
version_check INTEGER,
26+
event_name TEXT,
27+
event_payload BYTEA,
28+
metadata JSONB
29+
) RETURNS TABLE (
30+
"version" INTEGER
31+
) AS $$
32+
DECLARE
33+
last_stream_version INTEGER;
34+
BEGIN
35+
36+
-- Retrieve the latest stream version for the specified stream.
37+
SELECT s."version"
38+
INTO last_stream_version
39+
FROM streams s
40+
WHERE id = stream_id AND s.stream_type = _stream_type;
41+
42+
IF NOT FOUND THEN
43+
-- Create a new entry for the desired stream.
44+
INSERT INTO streams (id, stream_type)
45+
VALUES (stream_id, _stream_type);
46+
47+
-- Make sure to initialize the stream version in this case.
48+
last_stream_version = 0;
49+
END IF;
50+
51+
-- Perform optimistic concurrency check.
52+
IF version_check <> -1 AND version_check <> last_stream_version THEN
53+
RAISE EXCEPTION 'stream version check failed, expected: %, current: %', version_check, last_stream_version;
54+
END IF;
55+
56+
-- Increment the stream version prior to inserting the new event.
57+
last_stream_version = last_stream_version + 1;
58+
59+
-- Add a recorded_at timestamp in the metadata.
60+
metadata = metadata || ('{"Recorded-At": ' || to_json(NOW()) || '}')::JSONB;
61+
62+
-- Insert the event into the events table.
63+
-- Version numbers should start from 1.
64+
INSERT INTO events (
65+
stream_type,
66+
stream_id,
67+
"version",
68+
event_type,
69+
"event",
70+
metadata
71+
) VALUES (
72+
_stream_type,
73+
stream_id,
74+
last_stream_version,
75+
event_name,
76+
event_payload,
77+
metadata
78+
);
79+
80+
-- Update the stream with the latest version computed.
81+
UPDATE streams s
82+
SET "version" = last_stream_version
83+
WHERE id = stream_id AND s.stream_type = _stream_type;
84+
85+
RETURN QUERY
86+
SELECT last_stream_version;
87+
88+
END;
89+
$$ LANGUAGE PLPGSQL;

0 commit comments

Comments
 (0)