Skip to content

Commit 63c86d8

Browse files
authored
feat: move versioning check on the EventStore (#79)
* feat(eventually-core): EventStore use current versioning checks * feat(eventually-util): fix InMemoryRepository with new versioning strategy * feat(eventually-core): PersistentEvent implements Deref to inner event * fix(eventually-test): new features support and disable Postgres * fix(eventually-postgres): disable append, waiting for crate overhaul
1 parent 88dc4c2 commit 63c86d8

9 files changed

Lines changed: 318 additions & 176 deletions

File tree

eventually-core/src/repository.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::fmt::Debug;
1212
use futures::stream::TryStreamExt;
1313

1414
use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
15-
use crate::store::{EventStore, Select};
15+
use crate::store::{EventStore, Expected, Select};
1616
use crate::versioning::Versioned;
1717

1818
/// Error type returned by the [`Repository`].
@@ -145,11 +145,10 @@ where
145145

146146
if let Some(events) = events_to_commit {
147147
if !events.is_empty() {
148-
// Version is incremented at each events flush.
149-
version += 1;
150-
151-
self.store
152-
.append(root.id().clone(), version, events)
148+
// Version is incremented at each events flush by the EventStore.
149+
version = self
150+
.store
151+
.append(root.id().clone(), Expected::Exact(version), events)
153152
.await
154153
.map_err(Error::Store)?;
155154
}

eventually-core/src/store.rs

Lines changed: 139 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,108 @@
22
//!
33
//! [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
44
5+
use std::ops::Deref;
6+
57
use futures::future::BoxFuture;
68
use futures::stream::BoxStream;
79

810
use serde::{Deserialize, Serialize};
911

1012
use crate::versioning::Versioned;
1113

14+
/// Contains a type-state builder for [`PersistentEvent`] type.
15+
///
16+
/// [`PersistentEvent`]: struct.PersistedEvent.html
17+
pub mod persistent {
18+
/// Creates a new [`PersistedEvent`] by wrapping an Event value.
19+
///
20+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
21+
pub struct EventBuilder<T> {
22+
pub(super) event: T,
23+
}
24+
25+
impl<T> From<T> for EventBuilder<T> {
26+
#[inline]
27+
fn from(event: T) -> Self {
28+
Self { event }
29+
}
30+
}
31+
32+
impl<T> EventBuilder<T> {
33+
/// Specifies the [`PersistentEvent`] version and moves to the next
34+
/// builder state.
35+
///
36+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
37+
#[inline]
38+
pub fn version(self, value: u32) -> EventBuilderWithVersion<T> {
39+
EventBuilderWithVersion {
40+
version: value,
41+
event: self.event,
42+
}
43+
}
44+
45+
/// Specifies the [`PersistentEvent`] sequence number and moves to the next
46+
/// builder state.
47+
///
48+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
49+
#[inline]
50+
pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<T> {
51+
EventBuilderWithSequenceNumber {
52+
sequence_number: value,
53+
event: self.event,
54+
}
55+
}
56+
}
57+
58+
/// Next step in creating a new [`PersistedEvent`] carrying an Event value
59+
/// and its version.
60+
///
61+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
62+
pub struct EventBuilderWithVersion<T> {
63+
version: u32,
64+
event: T,
65+
}
66+
67+
impl<T> EventBuilderWithVersion<T> {
68+
/// Specifies the [`PersistentEvent`] sequence number and moves to the next
69+
/// builder state.
70+
///
71+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
72+
#[inline]
73+
pub fn sequence_number(self, value: u32) -> super::PersistedEvent<T> {
74+
super::PersistedEvent {
75+
version: self.version,
76+
event: self.event,
77+
sequence_number: value,
78+
}
79+
}
80+
}
81+
82+
/// Next step in creating a new [`PersistedEvent`] carrying an Event value
83+
/// and its sequence number.
84+
///
85+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
86+
pub struct EventBuilderWithSequenceNumber<T> {
87+
sequence_number: u32,
88+
event: T,
89+
}
90+
91+
impl<T> EventBuilderWithSequenceNumber<T> {
92+
/// Specifies the [`PersistentEvent`] version and moves to the next
93+
/// builder state.
94+
///
95+
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
96+
#[inline]
97+
pub fn version(self, value: u32) -> super::PersistedEvent<T> {
98+
super::PersistedEvent {
99+
version: value,
100+
event: self.event,
101+
sequence_number: self.sequence_number,
102+
}
103+
}
104+
}
105+
}
106+
12107
/// An [`Event`] wrapper for events that have been
13108
/// successfully committed to the [`EventStore`].
14109
///
@@ -25,37 +120,28 @@ pub struct PersistedEvent<T> {
25120
event: T,
26121
}
27122

28-
impl<T> From<T> for PersistedEvent<T> {
29-
#[inline]
30-
fn from(event: T) -> Self {
31-
Self {
32-
event,
33-
version: 0,
34-
sequence_number: 0,
35-
}
36-
}
37-
}
38-
39123
impl<T> Versioned for PersistedEvent<T> {
40124
#[inline]
41125
fn version(&self) -> u32 {
42126
self.version
43127
}
44128
}
45129

46-
impl<T> PersistedEvent<T> {
47-
/// Updates the event version to the one specified.
48-
#[inline]
49-
pub fn with_version(mut self, version: u32) -> Self {
50-
self.version = version;
51-
self
130+
impl<T> Deref for PersistedEvent<T> {
131+
type Target = T;
132+
133+
fn deref(&self) -> &Self::Target {
134+
&self.event
52135
}
136+
}
53137

54-
/// Updates the sequence number version to the one specified.
138+
impl<T> PersistedEvent<T> {
139+
/// Creates a new [`EventBuilder`] from the provided Event value.
140+
///
141+
/// [`EventBuilder`]: persistent/struct.EventBuilder.html
55142
#[inline]
56-
pub fn with_sequence_number(mut self, sequence_number: u32) -> Self {
57-
self.sequence_number = sequence_number;
58-
self
143+
pub fn from(event: T) -> persistent::EventBuilder<T> {
144+
persistent::EventBuilder { event }
59145
}
60146

61147
/// Returns the event sequence number.
@@ -93,6 +179,27 @@ pub enum Select {
93179
From(u32),
94180
}
95181

182+
/// Specifies the optimistic locking level when performing [`append`] from
183+
/// an [`EventStore`].
184+
///
185+
/// Check out [`append`] documentation for more info.
186+
///
187+
/// [`append`]: trait.EventStore.html#method.append
188+
/// [`EventStore`]: trait.EventStore.html
189+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190+
pub enum Expected {
191+
/// Append events disregarding the current [`Aggregate`] version.
192+
///
193+
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
194+
Any,
195+
196+
/// Append events only if the current version of the [`Aggregate`]
197+
/// is the one specified by the value provided here.
198+
///
199+
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
200+
Exact(u32),
201+
}
202+
96203
/// Stream type returned by the [`EventStore::stream`] method.
97204
///
98205
/// [`EventStore::stream`]: trait.EventStore.html#method.stream
@@ -142,7 +249,15 @@ pub trait EventStore {
142249
/// `append` is a transactional operation: it either appends all the events,
143250
/// or none at all and returns an [`AppendError`].
144251
///
145-
/// The desired version for the new [`Event`]s to append must be specified.
252+
/// The desired version for the new [`Event`]s to append must be specified
253+
/// through an [`Expected`] element.
254+
///
255+
/// When using `Expected::Any`, no checks on the current [`Aggregate`]
256+
/// values will be performed, disregarding optimistic locking.
257+
///
258+
/// When using `Expected::Exact`, the Store will check that the current
259+
/// version of the [`Aggregate`] is _exactly_ the one specified.
260+
///
146261
/// If the version is not the one expected from the Store, implementations
147262
/// should raise an [`AppendError::Conflict`] error.
148263
///
@@ -156,9 +271,9 @@ pub trait EventStore {
156271
fn append(
157272
&mut self,
158273
id: Self::SourceId,
159-
version: u32,
274+
version: Expected,
160275
events: Vec<Self::Event>,
161-
) -> BoxFuture<Result<(), Self::Error>>;
276+
) -> BoxFuture<Result<u32, Self::Error>>;
162277

163278
/// Streams a list of [`Event`]s from the `EventStore` back to the application,
164279
/// by specifying the desired [`SourceId`] and [`Offset`].

eventually-postgres/src/lib.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
5454
use std::sync::Arc;
5555

56-
use eventually::store::{AppendError, EventStream, PersistedEvent, Select};
56+
use eventually::store::{AppendError, EventStream, Expected, PersistedEvent, Select};
5757
use eventually::{Aggregate, AggregateId};
5858

5959
use futures::future::BoxFuture;
@@ -241,31 +241,34 @@ where
241241
fn append(
242242
&mut self,
243243
id: Self::SourceId,
244-
version: u32,
244+
version: Expected,
245245
events: Vec<Self::Event>,
246-
) -> BoxFuture<Result<(), Self::Error>> {
247-
let serialized = events
248-
.into_iter()
249-
.enumerate()
250-
.map(|(i, event)| serde_json::to_value(event).map(|value| (i, value)))
251-
.collect::<Result<Vec<_>, _>>()
252-
.unwrap();
246+
) -> BoxFuture<Result<u32, Self::Error>> {
247+
// FIXME(ar3s3ru): this crate needs a new overhaul.
248+
unimplemented!("crate needs a new overhaul, after the changes with the EventStore")
253249

254-
Box::pin(async move {
255-
let mut tx = self.client.write().await;
256-
let tx = tx.transaction().await.map_err(EventStoreError::from)?;
250+
// let serialized = events
251+
// .into_iter()
252+
// .enumerate()
253+
// .map(|(i, event)| serde_json::to_value(event).map(|value| (i, value)))
254+
// .collect::<Result<Vec<_>, _>>()
255+
// .unwrap();
257256

258-
for (i, event) in serialized {
259-
tx.execute(
260-
&*self.append_query,
261-
&[&id.to_string(), &event, &version, &(i as u32)],
262-
)
263-
.await
264-
.map_err(EventStoreError::from)?;
265-
}
257+
// Box::pin(async move {
258+
// let mut tx = self.client.write().await;
259+
// let tx = tx.transaction().await.map_err(EventStoreError::from)?;
266260

267-
tx.commit().await.map_err(EventStoreError::from)
268-
})
261+
// for (i, event) in serialized {
262+
// tx.execute(
263+
// &*self.append_query,
264+
// &[&id.to_string(), &event, &version, &(i as u32)],
265+
// )
266+
// .await
267+
// .map_err(EventStoreError::from)?;
268+
// }
269+
270+
// tx.commit().await.map_err(EventStoreError::from)
271+
// })
269272
}
270273

271274
fn stream(
@@ -292,8 +295,8 @@ where
292295
let event: Event = serde_json::from_value(row.get("event")).unwrap();
293296

294297
PersistedEvent::from(event)
295-
.with_version(row.get("version"))
296-
.with_sequence_number(row.get("offset"))
298+
.version(row.get("version"))
299+
.sequence_number(row.get("offset"))
297300
})
298301
.map_err(EventStoreError::from)
299302
.boxed())

0 commit comments

Comments
 (0)