Skip to content

Commit af6c6d4

Browse files
authored
feat: add EventSubscriber trait and inmemory impl (#94)
* feat: add EventSubscriber trait * feat(inmemory): EventStore implements EventSubscriber interface * fix(util): disable benchmarks temporarily * fix(postgres): failing doc-tests * chore: add documentation for new Subscriber * chore(subscription): add missing documentation
1 parent b8e1598 commit af6c6d4

8 files changed

Lines changed: 369 additions & 218 deletions

File tree

eventually-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
pub mod aggregate;
1414
pub mod repository;
1515
pub mod store;
16+
pub mod subscription;
1617
pub mod versioning;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! Module for creating and managing long-running Subscriptions
2+
//! to incoming events in the [`EventStore`].
3+
//!
4+
//! [`EventStore`]: ../store/trait.EventStore.html
5+
6+
use futures::future::BoxFuture;
7+
use futures::stream::BoxStream;
8+
9+
use crate::store::Persisted;
10+
11+
/// Stream of events returned by the [`EventSubscriber::subscribe_all`] method.
12+
///
13+
/// [`EventSubscriber::subscribe_all`]: trait.EventSubscriber.html#method.subscribe_all
14+
pub type EventStream<'a, S> = BoxStream<
15+
'a,
16+
Result<
17+
Persisted<<S as EventSubscriber>::SourceId, <S as EventSubscriber>::Event>,
18+
<S as EventSubscriber>::Error,
19+
>,
20+
>;
21+
22+
/// Component to let users subscribe to newly-inserted events into the [`EventStore`].
23+
///
24+
/// Check out [`subscribe_all`] for more information.
25+
///
26+
/// Additional information can be found in the [_Volatile Subscription_] section
27+
/// of eventstore.com
28+
///
29+
/// [_Volatile Subscription_]: https://eventstore.com/docs/getting-started/reading-subscribing-events/index.html#volatile-subscriptions
30+
/// [`EventStore`]: ../store/trait.EventStore.html
31+
pub trait EventSubscriber {
32+
/// Type of the Source id, typically an [`AggregateId`].
33+
///
34+
/// [`AggregateId`]: ../aggregate/type.AggregateId.html
35+
type SourceId: Eq;
36+
37+
/// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
38+
///
39+
/// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
40+
/// [`EventStore`]: ../store/trait.EventStore.html
41+
type Event;
42+
43+
/// Possible errors returned when receiving events from the notification channel.
44+
type Error;
45+
46+
/// Subscribes to all new events persisted in the [`EventStore`], from
47+
/// the moment of calling this function, in the future.
48+
///
49+
/// Since this is a long-running stream, make sure not to *block*
50+
/// or await the full computation of the stream.
51+
///
52+
/// Prefer using a `while let` consumer for this [`EventStream`]:
53+
///
54+
/// ```text
55+
/// let stream = subscriber.subscribe_all().await?;
56+
///
57+
/// while let Some(event) = stream.next().await {
58+
/// // Do stuff with the received event...
59+
/// }
60+
/// ```
61+
///
62+
/// [`EventStore`]: ../store/trait.EventStore.html
63+
/// [`EventStream`]: type.EventStream.html
64+
fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
65+
}

eventually-postgres/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ tokio-postgres = { version = "0.5", features = ["with-serde_json-1"] }
2121
thiserror = "1.0"
2222
refinery = { version = "0.3.0", features = ["tokio-postgres"] }
2323
anyhow = "1.0.32"
24+
25+
[dev-dependencies]
26+
tokio = { version = "0.2", features = ["sync"] }

eventually-postgres/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
//!
1010
//! ```no_run
1111
//! # use std::sync::Arc;
12-
//! # use tokio::sync::RwLock;
1312
//! # use eventually_postgres::EventStoreBuilder;
1413
//! #
1514
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1615
//! // Open a connection with Postgres.
17-
//! let (client, connection) =
16+
//! let (mut client, connection) =
1817
//! tokio_postgres::connect("postgres://user@pass:localhost:5432/db", tokio_postgres::NoTls)
1918
//! .await
2019
//! .map_err(|err| {
@@ -45,7 +44,7 @@
4544
//! // to distinguish between different aggregates.
4645
//! //
4746
//! // You can also use std::any::type_name for that.
48-
//! let store = builder::build::<String, SomeEvent>("aggregate-name").await?
47+
//! let store = builder.build::<String, SomeEvent>("aggregate-name").await?;
4948
//!
5049
//! # Ok(())
5150
//! # }

eventually-util/Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,8 @@ futures = { version = "0.3", features = ["async-await"] }
1919
parking_lot = "0.11.0"
2020
serde = { version = "1.0", features = ["derive"], optional = true }
2121
thiserror = "1.0"
22+
tokio = { version = "0.2", features = ["sync"] }
2223

2324
[dev-dependencies]
2425
anyhow = "1.0"
25-
criterion = "0.3"
26-
tokio-test = "0.2"
27-
28-
[[bench]]
29-
name = "event-store-inmemory-bench"
30-
harness = false
26+
tokio = { version = "0.2", features = ["macros"] }

eventually-util/benches/event-store-inmemory-bench.rs

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
 (0)