Skip to content

Commit 1992841

Browse files
authored
refactor: use Interface Segregation to split traits in individual ones (#238)
* refactor(eventually): split event::Store in event::Streamer and event::Appender * refactor(eventually): split aggregate::Repository in aggregate::Saver and aggregate::Getter * fix(examples/bank-accounting): use the new interfaces * fix: tests broken from api changes are now passing
1 parent 2b697b9 commit 1992841

13 files changed

Lines changed: 320 additions & 173 deletions

File tree

eventually-postgres/src/aggregate.rs

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use eventually::{
55
aggregate,
66
aggregate::Aggregate,
77
serde::{Deserializer, Serde, Serializer},
8-
version::{ConflictError, Version},
8+
version,
9+
version::Version,
910
};
1011
use sqlx::{PgPool, Postgres, Row};
1112

@@ -56,19 +57,25 @@ where
5657
}
5758

5859
#[derive(Debug, thiserror::Error)]
59-
pub enum RepositoryError {
60+
pub enum GetError {
6061
#[error("failed to fetch the aggregate state row: {0}")]
6162
FetchAggregateRow(#[source] sqlx::Error),
6263
#[error("failed to deserialize the aggregate state from the database row: {0}")]
6364
DeserializeAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
6465
#[error("failed to convert the aggregate state into its domain type: {0}")]
6566
ConvertAggregate(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
67+
#[error("database returned an error: {0}")]
68+
Database(#[from] sqlx::Error),
69+
}
70+
71+
#[derive(Debug, thiserror::Error)]
72+
pub enum SaveError {
6673
#[error("failed to begin a new transaction: {0}")]
6774
BeginTransaction(#[source] sqlx::Error),
6875
#[error("conflict error detected: {0})")]
69-
Conflict(#[source] ConflictError),
76+
Conflict(#[source] version::ConflictError),
7077
#[error("concurrent update detected, represented as a conflict error: {0})")]
71-
Concurrency(#[source] ConflictError),
78+
Concurrency(#[source] version::ConflictError),
7279
#[error("failed to save the new aggregate state: {0}")]
7380
SaveAggregateState(#[source] sqlx::Error),
7481
#[error("failed to append a new domain event: {0}")]
@@ -79,11 +86,11 @@ pub enum RepositoryError {
7986
Database(#[from] sqlx::Error),
8087
}
8188

82-
impl From<RepositoryError> for Option<ConflictError> {
83-
fn from(err: RepositoryError) -> Self {
89+
impl From<SaveError> for Option<version::ConflictError> {
90+
fn from(err: SaveError) -> Self {
8491
match err {
85-
RepositoryError::Conflict(v) => Some(v),
86-
RepositoryError::Concurrency(v) => Some(v),
92+
SaveError::Conflict(v) => Some(v),
93+
SaveError::Concurrency(v) => Some(v),
8794
_ => None,
8895
}
8996
}
@@ -104,7 +111,7 @@ where
104111
aggregate_id: &str,
105112
expected_version: Version,
106113
root: &mut aggregate::Root<T>,
107-
) -> Result<(), RepositoryError> {
114+
) -> Result<(), SaveError> {
108115
let out_state = root.to_aggregate_type::<OutT>();
109116
let bytes_state = self.aggregate_serde.serialize(out_state);
110117

@@ -117,13 +124,15 @@ where
117124
.execute(tx)
118125
.await
119126
.map_err(|err| match crate::check_for_conflict_error(&err) {
120-
Some(err) => RepositoryError::Conflict(err),
127+
Some(err) => SaveError::Conflict(err),
121128
None => match err.as_database_error().and_then(|err| err.code()) {
122-
Some(code) if code == "40001" => RepositoryError::Concurrency(ConflictError {
123-
expected: expected_version,
124-
actual: root.version(),
125-
}),
126-
_ => RepositoryError::SaveAggregateState(err),
129+
Some(code) if code == "40001" => {
130+
SaveError::Concurrency(version::ConflictError {
131+
expected: expected_version,
132+
actual: root.version(),
133+
})
134+
}
135+
_ => SaveError::SaveAggregateState(err),
127136
},
128137
})?;
129138

@@ -132,7 +141,7 @@ where
132141
}
133142

134143
#[async_trait]
135-
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::Repository<T>
144+
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::Getter<T>
136145
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
137146
where
138147
T: Aggregate + TryFrom<OutT> + Send + Sync,
@@ -144,7 +153,7 @@ where
144153
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
145154
EvtSerde: Serializer<OutEvt> + Send + Sync,
146155
{
147-
type Error = RepositoryError;
156+
type Error = GetError;
148157

149158
async fn get(
150159
&self,
@@ -163,25 +172,41 @@ where
163172
.await
164173
.map_err(|err| match err {
165174
sqlx::Error::RowNotFound => aggregate::RepositoryGetError::AggregateRootNotFound,
166-
_ => aggregate::RepositoryGetError::Inner(RepositoryError::FetchAggregateRow(err)),
175+
_ => aggregate::RepositoryGetError::Inner(GetError::FetchAggregateRow(err)),
167176
})?;
168177

169-
let version: i32 = row.try_get("version").map_err(RepositoryError::Database)?;
170-
let bytes_state: Vec<u8> = row.try_get("state").map_err(RepositoryError::Database)?;
178+
let version: i32 = row.try_get("version").map_err(GetError::Database)?;
179+
let bytes_state: Vec<u8> = row.try_get("state").map_err(GetError::Database)?;
171180

172181
let aggregate: T = self
173182
.aggregate_serde
174183
.deserialize(bytes_state)
175-
.map_err(|err| RepositoryError::DeserializeAggregate(Box::new(err)))
184+
.map_err(|err| GetError::DeserializeAggregate(Box::new(err)))
176185
.and_then(|out_t| {
177-
T::try_from(out_t).map_err(|err| RepositoryError::ConvertAggregate(Box::new(err)))
186+
T::try_from(out_t).map_err(|err| GetError::ConvertAggregate(Box::new(err)))
178187
})?;
179188

180189
Ok(aggregate::Root::rehydrate_from_state(
181190
version as Version,
182191
aggregate,
183192
))
184193
}
194+
}
195+
196+
#[async_trait]
197+
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::Saver<T>
198+
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
199+
where
200+
T: Aggregate + TryFrom<OutT> + Send + Sync,
201+
<T as Aggregate>::Id: ToString,
202+
<T as TryFrom<OutT>>::Error: std::error::Error + Send + Sync + 'static,
203+
OutT: From<T> + Send + Sync,
204+
OutEvt: From<T::Event> + Send + Sync,
205+
TSerde: Serde<OutT> + Send + Sync,
206+
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
207+
EvtSerde: Serializer<OutEvt> + Send + Sync,
208+
{
209+
type Error = SaveError;
185210

186211
async fn store(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::Error> {
187212
let events_to_commit = root.take_uncommitted_events();
@@ -194,7 +219,7 @@ where
194219
.pool
195220
.begin()
196221
.await
197-
.map_err(RepositoryError::BeginTransaction)?;
222+
.map_err(SaveError::BeginTransaction)?;
198223

199224
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE")
200225
.execute(&mut tx)
@@ -214,11 +239,9 @@ where
214239
events_to_commit,
215240
)
216241
.await
217-
.map_err(RepositoryError::AppendEvent)?;
242+
.map_err(SaveError::AppendEvent)?;
218243

219-
tx.commit()
220-
.await
221-
.map_err(RepositoryError::CommitTransaction)?;
244+
tx.commit().await.map_err(SaveError::CommitTransaction)?;
222245

223246
Ok(())
224247
}

eventually-postgres/src/event.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ where
209209
}
210210
}
211211

212-
#[async_trait]
213-
impl<Id, Evt, OutEvt, S> event::Store for Store<Id, Evt, OutEvt, S>
212+
impl<Id, Evt, OutEvt, S> event::Streamer<Id, Evt> for Store<Id, Evt, OutEvt, S>
214213
where
215214
Id: ToString + Clone + Send + Sync,
216215
Evt: TryFrom<OutEvt> + Message + std::fmt::Debug + Send + Sync,
@@ -219,16 +218,9 @@ where
219218
S: Serde<OutEvt> + Send + Sync,
220219
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
221220
{
222-
type StreamId = Id;
223-
type Event = Evt;
224-
type StreamError = StreamError;
225-
type AppendError = AppendError;
221+
type Error = StreamError;
226222

227-
fn stream(
228-
&self,
229-
id: &Self::StreamId,
230-
select: event::VersionSelect,
231-
) -> event::Stream<Self::StreamId, Self::Event, Self::StreamError> {
223+
fn stream(&self, id: &Id, select: event::VersionSelect) -> event::Stream<Id, Evt, Self::Error> {
232224
let from_version: i32 = match select {
233225
event::VersionSelect::All => 0,
234226
event::VersionSelect::From(v) => v as i32,
@@ -251,13 +243,26 @@ where
251243
.and_then(move |row| ready(self.event_row_to_persisted_event(id.clone(), row)))
252244
.boxed()
253245
}
246+
}
247+
248+
#[async_trait]
249+
impl<Id, Evt, OutEvt, S> event::Appender<Id, Evt> for Store<Id, Evt, OutEvt, S>
250+
where
251+
Id: ToString + Clone + Send + Sync,
252+
Evt: TryFrom<OutEvt> + Message + std::fmt::Debug + Send + Sync,
253+
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
254+
OutEvt: From<Evt> + Send + Sync,
255+
S: Serde<OutEvt> + Send + Sync,
256+
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
257+
{
258+
type Error = AppendError;
254259

255260
async fn append(
256261
&self,
257-
id: Self::StreamId,
262+
id: Id,
258263
version_check: event::StreamVersionExpected,
259-
events: Vec<event::Envelope<Self::Event>>,
260-
) -> Result<Version, Self::AppendError> {
264+
events: Vec<event::Envelope<Evt>>,
265+
) -> Result<Version, Self::Error> {
261266
let mut tx = self
262267
.pool
263268
.begin()

eventually-postgres/tests/aggregate_repository.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use eventually::{
2-
aggregate::{Repository, RepositoryGetError},
2+
aggregate::{Getter, RepositoryGetError, Saver},
33
serde::json::Json,
44
version,
55
};

eventually-postgres/tests/event_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::{SystemTime, UNIX_EPOCH};
22

33
use eventually::{
4-
event::{Persisted, Store, StreamVersionExpected, VersionSelect},
4+
event::{Appender, Persisted, StreamVersionExpected, Streamer, VersionSelect},
55
serde::json::Json,
66
version,
77
version::Version,

eventually/src/aggregate.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use crate::{event, message, version::Version};
2929

3030
mod repository;
3131
pub use repository::{
32-
EventSourced as EventSourcedRepository, GetError as RepositoryGetError, Repository,
32+
EventSourced as EventSourcedRepository, GetError as RepositoryGetError, Getter, Repository,
33+
Saver,
3334
};
3435

3536
/// An Aggregate represents a Domain Model that, through an Aggregate [Root],
@@ -390,7 +391,7 @@ mod test {
390391
use crate::{
391392
aggregate,
392393
aggregate::test_user_domain::{User, UserEvent},
393-
aggregate::Repository,
394+
aggregate::{Getter, Saver},
394395
event,
395396
event::store::EventStoreExt,
396397
version,

eventually/src/aggregate/repository.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,44 @@ pub enum GetError<I> {
1717
Inner(#[from] I),
1818
}
1919

20-
/// A Repository is an object that allows to load and save
21-
/// an [Aggregate Root][Root] from and to a persistent data store.
2220
#[async_trait]
23-
pub trait Repository<T>: Send + Sync
21+
pub trait Getter<T>: Send + Sync
2422
where
2523
T: Aggregate,
2624
{
27-
/// The error type that can be returned by the Repository implementation
28-
/// during loading or storing of an Aggregate Root.
29-
type Error;
25+
type Error: Send + Sync;
3026

3127
/// Loads an Aggregate Root instance from the data store,
3228
/// referenced by its unique identifier.
3329
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError<Self::Error>>;
30+
}
31+
32+
#[async_trait]
33+
pub trait Saver<T>: Send + Sync
34+
where
35+
T: Aggregate,
36+
{
37+
type Error: Send + Sync;
3438

3539
/// Stores a new version of an Aggregate Root instance to the data store.
3640
async fn store(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::Error>;
3741
}
3842

43+
/// A Repository is an object that allows to load and save
44+
/// an [Aggregate Root][Root] from and to a persistent data store.
45+
pub trait Repository<T>: Getter<T> + Saver<T> + Send + Sync
46+
where
47+
T: Aggregate,
48+
{
49+
}
50+
51+
impl<T, R> Repository<T> for R
52+
where
53+
T: Aggregate,
54+
R: Getter<T> + Saver<T> + Send + Sync,
55+
{
56+
}
57+
3958
/// List of possible errors that can be returned by an [`EventSourced`] method.
4059
#[derive(Debug, thiserror::Error)]
4160
pub enum EventSourcedError<E, SE, AE> {
@@ -71,7 +90,7 @@ pub enum EventSourcedError<E, SE, AE> {
7190
pub struct EventSourced<T, S>
7291
where
7392
T: Aggregate,
74-
S: event::Store<StreamId = T::Id, Event = T::Event>,
93+
S: event::Store<T::Id, T::Event>,
7594
{
7695
store: S,
7796
aggregate: PhantomData<T>,
@@ -80,7 +99,7 @@ where
8099
impl<T, S> From<S> for EventSourced<T, S>
81100
where
82101
T: Aggregate,
83-
S: event::Store<StreamId = T::Id, Event = T::Event>,
102+
S: event::Store<T::Id, T::Event>,
84103
{
85104
fn from(store: S) -> Self {
86105
Self {
@@ -91,14 +110,18 @@ where
91110
}
92111

93112
#[async_trait]
94-
impl<T, S> Repository<T> for EventSourced<T, S>
113+
impl<T, S> Getter<T> for EventSourced<T, S>
95114
where
96115
T: Aggregate,
97116
T::Id: Clone,
98117
T::Error: Debug,
99-
S: event::Store<StreamId = T::Id, Event = T::Event>,
118+
S: event::Store<T::Id, T::Event>,
100119
{
101-
type Error = EventSourcedError<T::Error, S::StreamError, S::AppendError>;
120+
type Error = EventSourcedError<
121+
T::Error,
122+
<S as event::Streamer<T::Id, T::Event>>::Error,
123+
<S as event::Appender<T::Id, T::Event>>::Error,
124+
>;
102125

103126
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError<Self::Error>> {
104127
let ctx = self
@@ -120,6 +143,21 @@ where
120143

121144
ctx.ok_or(GetError::AggregateRootNotFound)
122145
}
146+
}
147+
148+
#[async_trait]
149+
impl<T, S> Saver<T> for EventSourced<T, S>
150+
where
151+
T: Aggregate,
152+
T::Id: Clone,
153+
T::Error: Debug,
154+
S: event::Store<T::Id, T::Event>,
155+
{
156+
type Error = EventSourcedError<
157+
T::Error,
158+
<S as event::Streamer<T::Id, T::Event>>::Error,
159+
<S as event::Appender<T::Id, T::Event>>::Error,
160+
>;
123161

124162
async fn store(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::Error> {
125163
let events_to_commit = root.take_uncommitted_events();

0 commit comments

Comments
 (0)