Skip to content

Commit b1ace11

Browse files
authored
feat: add Projections support (#95)
* feat(core): add core support for Projections * feat(inmemory): add Projector and ProjectorBuilder support * feat(test): add example of TotalOrderProjection * chore(inmemory): add documentation for inmemory::projector
1 parent af6c6d4 commit b1ace11

12 files changed

Lines changed: 281 additions & 7 deletions

File tree

eventually-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! [`eventually`]: https://crates.io/crates/eventually
1212
1313
pub mod aggregate;
14+
pub mod projection;
1415
pub mod repository;
1516
pub mod store;
1617
pub mod subscription;

eventually-core/src/projection.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! Contain support for [`Projection`], an optimized read model
2+
//! of an [`Aggregate`] or of a number of `Aggregate`s.
3+
//!
4+
//! More information about projections can be found here:
5+
//! https://eventstore.com/docs/getting-started/projections/index.html
6+
//!
7+
//! [`Projection`]: trait.Projection.html
8+
//! [`Aggregate`]: ../aggregate/trait.Aggregate.html
9+
10+
use crate::store::Persisted;
11+
12+
/// A `Projection` is an optimized read model (or materialized view)
13+
/// of an [`Aggregate`] model(s), that can be assembled by left-folding
14+
/// its previous state and a number of ordered, consecutive events.
15+
///
16+
/// The events passed to a `Projection` have been persisted onto
17+
/// an [`EventStore`] first.
18+
///
19+
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
20+
/// [`EventStore`]: ../store/trait.EventStore.html
21+
pub trait Projection: Default {
22+
/// Type of the Source id, typically an [`AggregateId`].
23+
///
24+
/// [`AggregateId`]: ../aggregate/type.AggregateId.html
25+
type SourceId: Eq;
26+
27+
/// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`].
28+
///
29+
/// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
30+
type Event;
31+
32+
/// Updates the next value of the `Projection` using the provided
33+
/// event value.
34+
fn project(self, event: Persisted<Self::SourceId, Self::Event>) -> Self;
35+
}

eventually-test/src/api.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ pub(crate) async fn full_history(req: Request<AppState>) -> Result<Response, Err
4444
.build())
4545
}
4646

47+
pub(crate) async fn total_orders(req: Request<AppState>) -> Result<Response, Error> {
48+
let state = req.state().total_orders_projection.read().await;
49+
50+
Ok(Response::builder(StatusCode::Ok)
51+
.body(Body::from_json(&*state)?)
52+
.build())
53+
}
54+
4755
pub(crate) async fn history(req: Request<AppState>) -> Result<Response, Error> {
4856
#[derive(Deserialize)]
4957
struct Params {
@@ -79,8 +87,6 @@ pub(crate) async fn history(req: Request<AppState>) -> Result<Response, Error> {
7987
pub(crate) async fn get_order(req: Request<AppState>) -> Result<Response, Error> {
8088
let id: String = req.param("id")?;
8189

82-
println!("ASD");
83-
8490
let root = req
8591
.state()
8692
.repository

eventually-test/src/lib.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ mod state;
66
use std::sync::Arc;
77

88
use eventually::aggregate::Optional;
9-
use eventually::inmemory::EventStoreBuilder;
9+
use eventually::inmemory::{EventStoreBuilder, ProjectorBuilder};
1010
use eventually::{AggregateRootBuilder, Repository};
1111

12+
use futures::stream::StreamExt;
13+
1214
use tokio::sync::RwLock;
1315

1416
use crate::config::Config;
15-
use crate::order::OrderAggregate;
17+
use crate::order::{OrderAggregate, TotalOrdersProjection};
1618

1719
pub async fn run(config: Config) -> anyhow::Result<()> {
1820
femme::with_level(config.log_level);
@@ -33,6 +35,42 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
3335
store.clone(),
3436
)));
3537

38+
// Put the store behind an Arc to allow for clone-ness of a single instance.
39+
let store = Arc::new(store);
40+
41+
// Create a new Projector for the desired projection.
42+
let mut total_orders_projector =
43+
ProjectorBuilder::new(store.clone(), store.clone()).build::<TotalOrdersProjection>();
44+
45+
// Get a watch channel from the Projector: updates to the projector values
46+
// will be sent here.
47+
let mut total_orders_projector_rx = total_orders_projector.watch();
48+
49+
// Keep the projection value in memory.
50+
// We can use it to access it from the context of an endpoint and serialize the read model.
51+
let total_orders_projection = Arc::new(RwLock::new(TotalOrdersProjection::default()));
52+
let total_orders_projection_state = total_orders_projection.clone();
53+
54+
// Spawn a dedicated coroutine to run the projector.
55+
//
56+
// The projector will open its own running subscription, on which
57+
// it will receive all oldest and newest events as they come into the EventStore,
58+
// and it will progressively update the projection as events arrive.
59+
tokio::spawn(async move { total_orders_projector.run().await.expect("should not fail") });
60+
61+
// Spawn a dedicated coroutine to listen to changes to the projection.
62+
//
63+
// In this case we're logging the latest version, but in more advanced
64+
// scenario you might want to do something more with it.
65+
//
66+
// In some cases you might not need to watch the projection changes.
67+
tokio::spawn(async move {
68+
while let Some(total_orders) = total_orders_projector_rx.next().await {
69+
log::info!("Total orders: {:?}", total_orders);
70+
*total_orders_projection_state.write().await = total_orders;
71+
}
72+
});
73+
3674
// Set up the HTTP router.
3775
let mut app = tide::new();
3876

@@ -41,9 +79,11 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
4179
store,
4280
builder: aggregate_root_builder,
4381
repository,
82+
total_orders_projection,
4483
});
4584

4685
api.at("/history").get(api::full_history);
86+
api.at("/total").get(api::total_orders);
4787

4888
api.at("/:id").get(api::get_order);
4989
api.at("/:id/create").post(api::create_order);

eventually-test/src/order.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,31 @@ use futures::{future, future::BoxFuture};
77
use serde::{Deserialize, Serialize};
88

99
use eventually::optional::Aggregate;
10+
use eventually::store::Persisted;
11+
use eventually::Projection;
12+
13+
#[derive(Debug, Default, Clone, Copy, Serialize)]
14+
pub struct TotalOrdersProjection {
15+
created: u64,
16+
completed: u64,
17+
cancelled: u64,
18+
}
19+
20+
impl Projection for TotalOrdersProjection {
21+
type SourceId = String;
22+
type Event = OrderEvent;
23+
24+
fn project(mut self, event: Persisted<Self::SourceId, Self::Event>) -> Self {
25+
match event.take() {
26+
OrderEvent::Created { .. } => self.created += 1,
27+
OrderEvent::Completed { .. } => self.completed += 1,
28+
OrderEvent::Cancelled { .. } => self.cancelled += 1,
29+
_ => (),
30+
};
31+
32+
self
33+
}
34+
}
1035

1136
#[derive(Debug, Clone, Serialize, Deserialize)]
1237
pub struct OrderItem {

eventually-test/src/state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ pub(crate) type OrderRepository = Repository<OrderAggregate, OrderStore>;
1414

1515
#[derive(Clone)]
1616
pub(crate) struct AppState {
17-
pub store: OrderStore,
17+
pub store: Arc<OrderStore>,
1818
pub builder: AggregateRootBuilder<OrderAggregate>,
1919
pub repository: Arc<RwLock<OrderRepository>>,
20+
pub total_orders_projection: Arc<RwLock<order::TotalOrdersProjection>>,
2021
}

eventually-test/tests/acceptance_tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ fn setup() {
2323
let config = Config::init().unwrap();
2424
SERVER_STARTED.store(true, std::sync::atomic::Ordering::SeqCst);
2525

26-
smol::run(eventually_test::run(config));
26+
smol::run(eventually_test::run(config)).expect("don't fail :(");
2727
});
2828
});
2929

30+
// Busy loading :(
3031
while !SERVER_STARTED.load(std::sync::atomic::Ordering::SeqCst) {}
3132
}
3233

eventually-util/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ parking_lot = "0.11.0"
2020
serde = { version = "1.0", features = ["derive"], optional = true }
2121
thiserror = "1.0"
2222
tokio = { version = "0.2", features = ["sync"] }
23+
anyhow = "1.0"
2324

2425
[dev-dependencies]
25-
anyhow = "1.0"
2626
tokio = { version = "0.2", features = ["macros"] }
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//! Contains supporting entities using an in-memory backend.
2+
3+
mod projector;
4+
mod store;
5+
6+
pub use projector::*;
7+
pub use store::*;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use std::error::Error as StdError;
2+
use std::fmt::Debug;
3+
use std::sync::atomic::{AtomicU32, Ordering};
4+
use std::sync::Arc;
5+
6+
use eventually_core::projection::Projection;
7+
use eventually_core::store::{EventStore, Select};
8+
use eventually_core::subscription::EventSubscriber;
9+
10+
use futures::stream::{Stream, StreamExt, TryStreamExt};
11+
12+
use tokio::sync::watch::{channel, Receiver, Sender};
13+
14+
/// Reusable builder for multiple [`Projector`] instances.
15+
///
16+
/// [`Projector`]: struct.Projector.html
17+
pub struct ProjectorBuilder<Store, Subscriber> {
18+
store: Arc<Store>,
19+
subscriber: Arc<Subscriber>,
20+
}
21+
22+
impl<Store, Subscriber> ProjectorBuilder<Store, Subscriber> {
23+
/// Creates a new builder instance using the provided [`EventStore`]
24+
/// and [`EventSubscriber`].
25+
///
26+
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
27+
/// [`EventSubscriber`]: ../../../eventually-core/subscription/trait.EventSubscriber.html
28+
pub fn new(store: Arc<Store>, subscriber: Arc<Subscriber>) -> Self {
29+
Self { store, subscriber }
30+
}
31+
32+
/// Builds a new [`Projector`] for the [`Projection`]
33+
/// specified in the function type.
34+
///
35+
/// [`Projector`]: struct.Projector.html
36+
/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
37+
pub fn build<P>(&self) -> Projector<P, Store, Subscriber>
38+
where
39+
// NOTE: these bounds are required for Projector::run.
40+
P: Projection + Debug + Clone,
41+
Store: EventStore<SourceId = P::SourceId, Event = P::Event>,
42+
Subscriber: EventSubscriber<SourceId = P::SourceId, Event = P::Event>,
43+
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
44+
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
45+
{
46+
Projector::new(self.store.clone(), self.subscriber.clone())
47+
}
48+
}
49+
50+
/// A `Projector` manages the state of a single [`Projection`]
51+
/// by opening a long-running stream of all events coming from the [`EventStore`].
52+
///
53+
/// New instances of a `Projector` are obtainable through a [`ProjectorBuilder`]
54+
/// instance.
55+
///
56+
/// The `Projector` will start updating the [`Projection`] state when [`run`]
57+
/// is called.
58+
///
59+
/// At each update, the `Projector` will broadcast the latest version of the
60+
/// [`Projection`] on a `Stream` obtainable through [`watch`].
61+
///
62+
/// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
63+
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
64+
/// [`ProjectorBuilder`]: struct.ProjectorBuilder.html
65+
/// [`run`]: struct.Projector.html#method.run
66+
/// [`watch`]: struct.Projector.html#method.watch
67+
pub struct Projector<P, Store, Subscriber>
68+
where
69+
P: Projection,
70+
{
71+
tx: Sender<P>,
72+
rx: Receiver<P>, // Keep the receiver to be able to clone it in watch().
73+
store: Arc<Store>,
74+
subscriber: Arc<Subscriber>,
75+
state: P,
76+
last_sequence_number: AtomicU32,
77+
projection: std::marker::PhantomData<P>,
78+
}
79+
80+
impl<P, Store, Subscriber> Projector<P, Store, Subscriber>
81+
where
82+
P: Projection + Debug + Clone,
83+
Store: EventStore<SourceId = P::SourceId, Event = P::Event>,
84+
Subscriber: EventSubscriber<SourceId = P::SourceId, Event = P::Event>,
85+
// NOTE: these bounds are needed for anyhow::Error conversion.
86+
<Store as EventStore>::Error: StdError + Send + Sync + 'static,
87+
<Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
88+
{
89+
fn new(store: Arc<Store>, subscriber: Arc<Subscriber>) -> Self {
90+
let state: P = Default::default();
91+
let (tx, rx) = channel(state.clone());
92+
93+
Self {
94+
tx,
95+
rx,
96+
store,
97+
subscriber,
98+
state,
99+
last_sequence_number: Default::default(),
100+
projection: std::marker::PhantomData,
101+
}
102+
}
103+
104+
/// Provides a `Stream` that receives the latest copy of the `Projection` state.
105+
pub fn watch(&self) -> impl Stream<Item = P> {
106+
self.rx.clone()
107+
}
108+
109+
/// Starts the update of the `Projection` by processing all the events
110+
/// coming from the [`EventStore`].
111+
///
112+
/// [`EventStore`]: ../../../eventually-core/store/trait.EventStore.html
113+
pub async fn run(&mut self) -> anyhow::Result<()> {
114+
// Create the Subscription first, so that once the future has been resolved
115+
// we'll start receiving events right away.
116+
//
117+
// This is to avoid losing events when waiting for the one-off stream
118+
// to resolve its future.
119+
//
120+
// The impact is that we _might_ get duplicated events from the one-off stream
121+
// and the subscription stream. Luckily, we can discard those by
122+
// keeping an internal state of the last processed sequence number,
123+
// and discard all those events that are found.
124+
let subscription = self.subscriber.subscribe_all().await?;
125+
let one_off_stream = self.store.stream_all(Select::All).await?;
126+
127+
let mut stream = one_off_stream
128+
.map_err(anyhow::Error::from)
129+
.chain(subscription.map_err(anyhow::Error::from));
130+
131+
while let Some(event) = stream.next().await {
132+
let event = event?;
133+
let expected_sequence_number = self.last_sequence_number.load(Ordering::SeqCst);
134+
let event_sequence_number = event.sequence_number();
135+
136+
if event_sequence_number < expected_sequence_number {
137+
continue; // Duplicated event detected, let's skip it.
138+
}
139+
140+
self.state = P::project(self.state.clone(), event);
141+
142+
self.last_sequence_number.compare_and_swap(
143+
expected_sequence_number,
144+
event_sequence_number,
145+
Ordering::SeqCst,
146+
);
147+
148+
// Notify watchers of the latest projection state.
149+
self.tx.broadcast(self.state.clone()).expect(
150+
"since this struct holds the original receiver, failures should not happen",
151+
);
152+
}
153+
154+
Ok(())
155+
}
156+
}

0 commit comments

Comments
 (0)