11use std:: error:: Error as StdError ;
2- use std:: fmt:: Debug ;
32use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
43use std:: sync:: Arc ;
54
65use eventually_core:: projection:: Projection ;
76use eventually_core:: store:: { EventStore , Select } ;
87use eventually_core:: subscription:: EventSubscriber ;
98
10- use futures:: stream:: { Stream , StreamExt , TryStreamExt } ;
9+ use futures:: stream:: { StreamExt , TryStreamExt } ;
1110
12- use tokio:: sync:: watch :: { channel , Receiver , Sender } ;
11+ use tokio:: sync:: RwLock ;
1312
1413/// Reusable builder for multiple [`Projector`] instances.
1514///
@@ -29,21 +28,21 @@ impl<Store, Subscriber> ProjectorBuilder<Store, Subscriber> {
2928 Self { store, subscriber }
3029 }
3130
32- /// Builds a new [`Projector`] for the [`Projection`]
33- /// specified in the function type.
31+ /// Builds a new [`Projector`] for the [`Projection`] specified in the function type.
3432 ///
3533 /// [`Projector`]: struct.Projector.html
3634 /// [`Projection`]: ../../../eventually-core/projection/trait.Projection.html
37- pub fn build < P > ( & self ) -> Projector < P , Store , Subscriber >
35+ pub fn build < P > ( & self , projection : Arc < RwLock < P > > ) -> Projector < P , Store , Subscriber >
3836 where
3937 // NOTE: these bounds are required for Projector::run.
40- P : Projection + Debug + Clone ,
38+ P : Projection ,
4139 Store : EventStore < SourceId = P :: SourceId , Event = P :: Event > ,
4240 Subscriber : EventSubscriber < SourceId = P :: SourceId , Event = P :: Event > ,
41+ <P as Projection >:: Error : StdError + Send + Sync + ' static ,
4342 <Store as EventStore >:: Error : StdError + Send + Sync + ' static ,
4443 <Subscriber as EventSubscriber >:: Error : StdError + Send + Sync + ' static ,
4544 {
46- Projector :: new ( self . store . clone ( ) , self . subscriber . clone ( ) )
45+ Projector :: new ( projection , self . store . clone ( ) , self . subscriber . clone ( ) )
4746 }
4847}
4948
@@ -68,44 +67,31 @@ pub struct Projector<P, Store, Subscriber>
6867where
6968 P : Projection ,
7069{
71- tx : Sender < P > ,
72- rx : Receiver < P > , // Keep the receiver to be able to clone it in watch().
70+ projection : Arc < RwLock < P > > ,
7371 store : Arc < Store > ,
7472 subscriber : Arc < Subscriber > ,
75- state : P ,
7673 last_sequence_number : AtomicU32 ,
77- projection : std:: marker:: PhantomData < P > ,
7874}
7975
8076impl < P , Store , Subscriber > Projector < P , Store , Subscriber >
8177where
82- P : Projection + Debug + Clone ,
78+ P : Projection ,
8379 Store : EventStore < SourceId = P :: SourceId , Event = P :: Event > ,
8480 Subscriber : EventSubscriber < SourceId = P :: SourceId , Event = P :: Event > ,
8581 // NOTE: these bounds are needed for anyhow::Error conversion.
82+ <P as Projection >:: Error : StdError + Send + Sync + ' static ,
8683 <Store as EventStore >:: Error : StdError + Send + Sync + ' static ,
8784 <Subscriber as EventSubscriber >:: Error : StdError + Send + Sync + ' static ,
8885{
89- fn new ( store : Arc < Store > , subscriber : Arc < Subscriber > ) -> Self {
90- let state: P = Default :: default ( ) ;
91- let ( tx, rx) = channel ( state. clone ( ) ) ;
92-
86+ fn new ( projection : Arc < RwLock < P > > , store : Arc < Store > , subscriber : Arc < Subscriber > ) -> Self {
9387 Self {
94- tx,
95- rx,
9688 store,
9789 subscriber,
98- state ,
90+ projection ,
9991 last_sequence_number : Default :: default ( ) ,
100- projection : std:: marker:: PhantomData ,
10192 }
10293 }
10394
104- /// Provides a `Stream` that receives the latest copy of the `Projection` state.
105- pub fn watch ( & self ) -> impl Stream < Item = P > {
106- self . rx . clone ( )
107- }
108-
10995 /// Starts the update of the `Projection` by processing all the events
11096 /// coming from the [`EventStore`].
11197 ///
@@ -124,32 +110,30 @@ where
124110 let subscription = self . subscriber . subscribe_all ( ) . await ?;
125111 let one_off_stream = self . store . stream_all ( Select :: All ) . await ?;
126112
127- let mut stream = one_off_stream
113+ let stream = one_off_stream
128114 . map_err ( anyhow:: Error :: from)
129115 . chain ( subscription. map_err ( anyhow:: Error :: from) ) ;
130116
131- while let Some ( event ) = stream. next ( ) . await {
132- let event = event? ;
133- let expected_sequence_number = self . last_sequence_number . load ( Ordering :: SeqCst ) ;
134- let event_sequence_number = event. sequence_number ( ) ;
117+ stream
118+ . try_for_each ( | event| async {
119+ let expected_sequence_number = self . last_sequence_number . load ( Ordering :: SeqCst ) ;
120+ let event_sequence_number = event. sequence_number ( ) ;
135121
136- if event_sequence_number < expected_sequence_number {
137- continue ; // Duplicated event detected, let's skip it.
138- }
122+ if event_sequence_number < expected_sequence_number {
123+ return Ok ( ( ) ) ; // Duplicated event detected, let's skip it.
124+ }
139125
140- self . state = P :: project ( self . state . clone ( ) , event) ;
126+ self . projection . write ( ) . await . project ( event) . await ? ;
141127
142- self . last_sequence_number . compare_and_swap (
143- expected_sequence_number,
144- event_sequence_number,
145- Ordering :: SeqCst ,
146- ) ;
128+ self . last_sequence_number . compare_and_swap (
129+ expected_sequence_number,
130+ event_sequence_number,
131+ Ordering :: SeqCst ,
132+ ) ;
147133
148- // Notify watchers of the latest projection state.
149- self . tx . broadcast ( self . state . clone ( ) ) . expect (
150- "since this struct holds the original receiver, failures should not happen" ,
151- ) ;
152- }
134+ Ok ( ( ) )
135+ } )
136+ . await ?;
153137
154138 Ok ( ( ) )
155139 }
0 commit comments