Skip to content

Commit 89615b6

Browse files
authored
feat: add EventStore::stream_all operation (#82)
* feat(store): add EventStore::stream_all method * feat(inmemory): add implementation for InMemoryEventStore::stream_all * chore(postgres): add blanket impl for EventStore::stream_all * chore(test): update to tide 0.13 * feat(test): add example of full event store history endpoint * docs(inmemory): add documentation for new Error type
1 parent 63c86d8 commit 89615b6

9 files changed

Lines changed: 387 additions & 200 deletions

File tree

eventually-core/src/store.rs

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,30 @@ pub mod persistent {
1818
/// Creates a new [`PersistedEvent`] by wrapping an Event value.
1919
///
2020
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
21-
pub struct EventBuilder<T> {
21+
pub struct EventBuilder<SourceId, T> {
2222
pub(super) event: T,
23+
pub(super) source_id: SourceId,
2324
}
2425

25-
impl<T> From<T> for EventBuilder<T> {
26+
impl<SourceId, T> From<(SourceId, T)> for EventBuilder<SourceId, T> {
2627
#[inline]
27-
fn from(event: T) -> Self {
28-
Self { event }
28+
fn from(value: (SourceId, T)) -> Self {
29+
let (source_id, event) = value;
30+
Self { source_id, event }
2931
}
3032
}
3133

32-
impl<T> EventBuilder<T> {
34+
impl<SourceId, T> EventBuilder<SourceId, T> {
3335
/// Specifies the [`PersistentEvent`] version and moves to the next
3436
/// builder state.
3537
///
3638
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
3739
#[inline]
38-
pub fn version(self, value: u32) -> EventBuilderWithVersion<T> {
40+
pub fn version(self, value: u32) -> EventBuilderWithVersion<SourceId, T> {
3941
EventBuilderWithVersion {
4042
version: value,
4143
event: self.event,
44+
source_id: self.source_id,
4245
}
4346
}
4447

@@ -47,10 +50,11 @@ pub mod persistent {
4750
///
4851
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
4952
#[inline]
50-
pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<T> {
53+
pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<SourceId, T> {
5154
EventBuilderWithSequenceNumber {
5255
sequence_number: value,
5356
event: self.event,
57+
source_id: self.source_id,
5458
}
5559
}
5660
}
@@ -59,21 +63,23 @@ pub mod persistent {
5963
/// and its version.
6064
///
6165
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
62-
pub struct EventBuilderWithVersion<T> {
66+
pub struct EventBuilderWithVersion<SourceId, T> {
6367
version: u32,
6468
event: T,
69+
source_id: SourceId,
6570
}
6671

67-
impl<T> EventBuilderWithVersion<T> {
72+
impl<SourceId, T> EventBuilderWithVersion<SourceId, T> {
6873
/// Specifies the [`PersistentEvent`] sequence number and moves to the next
6974
/// builder state.
7075
///
7176
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
7277
#[inline]
73-
pub fn sequence_number(self, value: u32) -> super::PersistedEvent<T> {
78+
pub fn sequence_number(self, value: u32) -> super::PersistedEvent<SourceId, T> {
7479
super::PersistedEvent {
7580
version: self.version,
7681
event: self.event,
82+
source_id: self.source_id,
7783
sequence_number: value,
7884
}
7985
}
@@ -83,21 +89,23 @@ pub mod persistent {
8389
/// and its sequence number.
8490
///
8591
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
86-
pub struct EventBuilderWithSequenceNumber<T> {
92+
pub struct EventBuilderWithSequenceNumber<SourceId, T> {
8793
sequence_number: u32,
8894
event: T,
95+
source_id: SourceId,
8996
}
9097

91-
impl<T> EventBuilderWithSequenceNumber<T> {
98+
impl<SourceId, T> EventBuilderWithSequenceNumber<SourceId, T> {
9299
/// Specifies the [`PersistentEvent`] version and moves to the next
93100
/// builder state.
94101
///
95102
/// [`PersistentEvent`]: ../struct.PersistedEvent.html
96103
#[inline]
97-
pub fn version(self, value: u32) -> super::PersistedEvent<T> {
104+
pub fn version(self, value: u32) -> super::PersistedEvent<SourceId, T> {
98105
super::PersistedEvent {
99106
version: value,
100107
event: self.event,
108+
source_id: self.source_id,
101109
sequence_number: self.sequence_number,
102110
}
103111
}
@@ -113,35 +121,36 @@ pub mod persistent {
113121
/// [`EventStream`]: type.EventStream.html
114122
#[derive(Debug, Clone, PartialEq, Eq)]
115123
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
116-
pub struct PersistedEvent<T> {
124+
pub struct PersistedEvent<SourceId, T> {
125+
source_id: SourceId,
117126
version: u32,
118127
sequence_number: u32,
119128
#[serde(flatten)]
120129
event: T,
121130
}
122131

123-
impl<T> Versioned for PersistedEvent<T> {
132+
impl<SourceId, T> Versioned for PersistedEvent<SourceId, T> {
124133
#[inline]
125134
fn version(&self) -> u32 {
126135
self.version
127136
}
128137
}
129138

130-
impl<T> Deref for PersistedEvent<T> {
139+
impl<SourceId, T> Deref for PersistedEvent<SourceId, T> {
131140
type Target = T;
132141

133142
fn deref(&self) -> &Self::Target {
134143
&self.event
135144
}
136145
}
137146

138-
impl<T> PersistedEvent<T> {
147+
impl<SourceId, T> PersistedEvent<SourceId, T> {
139148
/// Creates a new [`EventBuilder`] from the provided Event value.
140149
///
141150
/// [`EventBuilder`]: persistent/struct.EventBuilder.html
142151
#[inline]
143-
pub fn from(event: T) -> persistent::EventBuilder<T> {
144-
persistent::EventBuilder { event }
152+
pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder<SourceId, T> {
153+
persistent::EventBuilder { source_id, event }
145154
}
146155

147156
/// Returns the event sequence number.
@@ -150,6 +159,14 @@ impl<T> PersistedEvent<T> {
150159
self.sequence_number
151160
}
152161

162+
/// Returns the [`SourceId`] of the persisted event.
163+
///
164+
/// [`SourceId`]: trait.EventStore.html#associatedType.SourceId
165+
#[inline]
166+
pub fn source_id(&self) -> &SourceId {
167+
&self.source_id
168+
}
169+
153170
/// Unwraps the inner [`Event`] from the `PersistedEvent` wrapper.
154171
///
155172
/// [`Event`]: trait.EventStore.html#associatedtype.Event
@@ -203,8 +220,13 @@ pub enum Expected {
203220
/// Stream type returned by the [`EventStore::stream`] method.
204221
///
205222
/// [`EventStore::stream`]: trait.EventStore.html#method.stream
206-
pub type EventStream<'a, S> =
207-
BoxStream<'a, Result<PersistedEvent<<S as EventStore>::Event>, <S as EventStore>::Error>>;
223+
pub type EventStream<'a, S> = BoxStream<
224+
'a,
225+
Result<
226+
PersistedEvent<<S as EventStore>::SourceId, <S as EventStore>::Event>,
227+
<S as EventStore>::Error,
228+
>,
229+
>;
208230

209231
/// Error type returned by [`append`] in [`EventStore`] implementations.
210232
///
@@ -270,13 +292,13 @@ pub trait EventStore {
270292
/// [`AppendError::Conflict`]: enum.AppendError.html
271293
fn append(
272294
&mut self,
273-
id: Self::SourceId,
295+
source_id: Self::SourceId,
274296
version: Expected,
275297
events: Vec<Self::Event>,
276298
) -> BoxFuture<Result<u32, Self::Error>>;
277299

278300
/// Streams a list of [`Event`]s from the `EventStore` back to the application,
279-
/// by specifying the desired [`SourceId`] and [`Offset`].
301+
/// by specifying the desired [`SourceId`] and [`Select`] operation.
280302
///
281303
/// [`SourceId`] will be used to request a particular `EventStream`.
282304
///
@@ -290,14 +312,29 @@ pub trait EventStore {
290312
/// [`EventStream`]: type.EventStream.html
291313
fn stream(
292314
&self,
293-
id: Self::SourceId,
315+
source_id: Self::SourceId,
294316
select: Select,
295317
) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
296318

319+
/// Streams a list of [`Event`]s from the `EventStore` back to the application,
320+
/// disregarding the [`SourceId`] values but using a [`Select`] operation.
321+
///
322+
/// [`SourceId`] will be used to request a particular `EventStream`.
323+
///
324+
/// [`Select`] specifies the selection strategy for the [`Event`]s
325+
/// in the returned [`EventStream`]: take a look at type documentation
326+
/// for all the available options.
327+
///
328+
/// [`Event`]: trait.EventStore.html#associatedtype.Event
329+
/// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
330+
/// [`Select`]: enum.Select.html
331+
/// [`EventStream`]: type.EventStream.html
332+
fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
333+
297334
/// Drops all the [`Event`]s related to one `Source`, specified by
298335
/// the provided [`SourceId`].
299336
///
300337
/// [`Event`]: trait.EventStore.html#associatedtype.Event
301338
/// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
302-
fn remove(&mut self, id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
339+
fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
303340
}

eventually-postgres/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
//! [`eventually`]: https://docs.rs/eventually
5252
//! [`EventStore`]: struct.EventStore.html
5353
54+
use std::convert::TryFrom;
55+
use std::fmt::Display;
5456
use std::sync::Arc;
5557

5658
use eventually::store::{AppendError, EventStream, Expected, PersistedEvent, Select};
@@ -197,7 +199,7 @@ pub struct EventStore<Id, Event> {
197199

198200
impl<Id, Event> EventStore<Id, Event>
199201
where
200-
Id: ToString + Eq + Send + Sync,
202+
Id: TryFrom<String> + Display + Eq + Send + Sync,
201203
{
202204
/// Creates a new table in the database for the provided Stream name
203205
/// during initialization.
@@ -230,7 +232,7 @@ where
230232

231233
impl<Id, Event> eventually::EventStore for EventStore<Id, Event>
232234
where
233-
Id: ToString + Eq + Send + Sync,
235+
Id: TryFrom<String, Error = ()> + Display + Eq + Send + Sync,
234236
Event: Serialize + Send + Sync,
235237
for<'de> Event: Deserialize<'de>,
236238
{
@@ -293,8 +295,9 @@ where
293295
.map_err(EventStoreError::from)?
294296
.map_ok(|row| {
295297
let event: Event = serde_json::from_value(row.get("event")).unwrap();
298+
let id: String = row.get("source_id");
296299

297-
PersistedEvent::from(event)
300+
PersistedEvent::from(Id::try_from(id).unwrap(), event)
298301
.version(row.get("version"))
299302
.sequence_number(row.get("offset"))
300303
})
@@ -303,6 +306,10 @@ where
303306
})
304307
}
305308

309+
fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>> {
310+
unimplemented!()
311+
}
312+
306313
fn remove(&mut self, id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>> {
307314
Box::pin(async move {
308315
self.client

eventually-test/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ license = "MIT"
77

88
[dependencies]
99
eventually = { version = "0.4", path = "../eventually", features = ["full"] }
10-
eventually-postgres = { version = "0.1", path = "../eventually-postgres" }
10+
# eventually-postgres = { version = "0.1", path = "../eventually-postgres" }
1111

1212
chrono = { version = "0.4", features = ["serde"] }
1313
env_logger = "0.7"
@@ -18,6 +18,8 @@ log = "0.4"
1818
serde = { version = "1.0", features = ["derive"] }
1919
serde_json = "1.0"
2020
smol = { version = "0.3", features = ["tokio02"] }
21-
tide = "0.8.1"
21+
tide = "0.13"
2222
tokio = { version = "0.2", features = ["sync"] }
2323
tokio-postgres = "0.5"
24+
femme = "2.1.0"
25+
anyhow = "1.0"

0 commit comments

Comments
 (0)