@@ -12,8 +12,8 @@ use rand_chacha::ChaCha8Rng;
1212use single_utilities:: traits:: FloatOpsTS ;
1313
1414use crate :: {
15- community_search:: leiden:: { ConsiderComms , LeidenConfig , partition:: VertexPartition } ,
16- network:: { CSRNetwork , grouping:: NetworkGrouping } ,
15+ community_search:: leiden:: { parallel :: { ConflictFreeBatcher , ParallelEvaluator } , partition:: VertexPartition , ConsiderComms , LeidenConfig } ,
16+ network:: { grouping:: NetworkGrouping , CSRNetwork } ,
1717} ;
1818
1919/// Result of evaluating a potential community move for a node.
@@ -195,110 +195,105 @@ impl LeidenOptimizer {
195195 /// Evaluates the quality improvement for moving a node to each candidate
196196 /// community and returns the community and improvement of the best move.
197197 fn find_best_community_move < N , G , P > (
198- & self ,
199- v : usize ,
200- v_comm : usize ,
201- comms : & [ usize ] ,
202- partitions : & mut [ P ] , // Changed to mutable slice
203- layer_weights : & [ N ] ,
204- max_comm_size : Option < usize > ,
205- ) -> anyhow:: Result < ( usize , N ) >
206- where
207- N : FloatOpsTS + ' static ,
208- G : NetworkGrouping ,
209- P : VertexPartition < N , G > ,
210- {
211- let mut max_comm = v_comm;
212- let time = Instant :: now ( ) ;
213- // println!("Finding best community move: {:?}", time.elapsed());
214-
215- // Pre-compute these values once instead of in the loop
216- let v_comm_size = partitions[ 0 ] . csize ( v_comm) ;
217- let epsilon_threshold = N :: from ( 10.0 ) . unwrap ( ) * <N as Float >:: epsilon ( ) ;
218-
219- let mut max_improv = if let Some ( max_size) = max_comm_size {
220- if max_size < v_comm_size {
221- <N as Float >:: neg_infinity ( )
198+ & self ,
199+ v : usize ,
200+ v_comm : usize ,
201+ comms : & [ usize ] ,
202+ partitions : & mut [ P ] ,
203+ layer_weights : & [ N ] ,
204+ max_comm_size : Option < usize > ,
205+ ) -> anyhow:: Result < ( usize , N ) >
206+ where
207+ N : FloatOpsTS + ' static ,
208+ G : NetworkGrouping ,
209+ P : VertexPartition < N , G > ,
210+ {
211+ let mut max_comm = v_comm;
212+ let time = Instant :: now ( ) ;
213+ // println!("Finding best community move: {:?}", time.elapsed());
214+
215+ // Pre-compute these values once instead of in the loop
216+ let v_comm_size = partitions[ 0 ] . csize ( v_comm) ;
217+ let epsilon_threshold = N :: from ( 10.0 ) . unwrap ( ) * <N as Float >:: epsilon ( ) ;
218+
219+ let mut max_improv = if let Some ( max_size) = max_comm_size {
220+ if max_size < v_comm_size {
221+ <N as Float >:: neg_infinity ( )
222+ } else {
223+ epsilon_threshold
224+ }
222225 } else {
223226 epsilon_threshold
227+ } ;
228+
229+ const V_SIZE : usize = 1 ;
230+
231+ if comms. is_empty ( ) {
232+ return Ok ( ( max_comm, max_improv) ) ;
224233 }
225- } else {
226- epsilon_threshold
227- } ;
228234
229- const V_SIZE : usize = 1 ; // Made it a const for better optimization
235+ // println!("Prefiltering valid comms {:?}", time.elapsed());
236+ let valid_comms: Vec < usize > = if let Some ( max_size) = max_comm_size {
237+ comms
238+ . iter ( )
239+ . copied ( )
240+ . filter ( |& comm| partitions[ 0 ] . csize ( comm) + V_SIZE <= max_size)
241+ . collect ( )
242+ } else {
243+ comms. to_vec ( )
244+ } ;
245+ // println!("Filtered valid comms: {:?}", time.elapsed());
230246
231- // Early exit if no communities to check
232- if comms. is_empty ( ) {
233- return Ok ( ( max_comm, max_improv) ) ;
234- }
235-
236- // println!("Prefiltering valid comms {:?}", time.elapsed());
237- // Pre-filter communities by size constraint to avoid repeated checks
238- let valid_comms: Vec < usize > = if let Some ( max_size) = max_comm_size {
239- comms
240- . iter ( )
241- . copied ( )
242- . filter ( |& comm| partitions[ 0 ] . csize ( comm) + V_SIZE <= max_size)
243- . collect ( )
244- } else {
245- comms. to_vec ( )
246- } ;
247- // println!("Filtered valid comms: {:?}", time.elapsed());
248-
249- // Early exit if no valid communities
250- if valid_comms. is_empty ( ) {
251- return Ok ( ( max_comm, max_improv) ) ;
252- }
247+ if valid_comms. is_empty ( ) {
248+ return Ok ( ( max_comm, max_improv) ) ;
249+ }
253250
254- // Optimized single-layer case
255- if partitions. len ( ) == 1 && layer_weights[ 0 ] == N :: one ( ) {
256- // println!("checking valid comms: {:?}", time.elapsed());
257-
258- // Get mutable reference to the single partition
259- let partition = & mut partitions [ 0 ] ;
260-
261- for & comm in & valid_comms {
262- let t = Instant :: now ( ) ;
263- let possible_improv = partition . diff_move ( v , comm ) ;
264- // println!("Executed diff move, took: {:?}", t.elapsed());
265-
266- if possible_improv > max_improv {
267- max_comm = comm ;
268- max_improv = possible_improv ;
251+ // Optimized single-layer case
252+ if partitions. len ( ) == 1 && layer_weights[ 0 ] == N :: one ( ) {
253+ // println!("checking valid comms: {:?}", time.elapsed());
254+
255+ let partition = & mut partitions [ 0 ] ;
256+
257+ for & comm in & valid_comms {
258+ let t = Instant :: now ( ) ;
259+ let possible_improv = partition . diff_move ( v , comm ) ;
260+ // println!("Executed diff move, took: {:?}", t.elapsed() );
261+
262+ if possible_improv > max_improv {
263+ max_comm = comm ;
264+ max_improv = possible_improv ;
265+ }
269266 }
270- }
271- } else {
272- // Multi-layer case
273- for & comm in & valid_comms {
274- let mut possible_improv = N :: zero ( ) ;
275-
276- for layer_idx in 0 ..partitions. len ( ) {
277- // Get mutable reference to current partition
278- let layer_improv = partitions[ layer_idx] . diff_move ( v, comm) ;
279- possible_improv += layer_weights[ layer_idx] * layer_improv;
280-
281- // Early termination optimization
282- if possible_improv + epsilon_threshold < max_improv {
283- let remaining_positive = layer_weights[ layer_idx + 1 ..]
284- . iter ( )
285- . all ( |& w| w >= N :: zero ( ) ) ;
286-
287- if remaining_positive && layer_improv <= N :: zero ( ) {
288- break ;
267+ } else {
268+ // Multi-layer case
269+ for & comm in & valid_comms {
270+ let mut possible_improv = N :: zero ( ) ;
271+
272+ for layer_idx in 0 ..partitions. len ( ) {
273+ let layer_improv = partitions[ layer_idx] . diff_move ( v, comm) ;
274+ possible_improv += layer_weights[ layer_idx] * layer_improv;
275+
276+ // Early termination optimization
277+ if possible_improv + epsilon_threshold < max_improv {
278+ let remaining_positive = layer_weights[ layer_idx + 1 ..]
279+ . iter ( )
280+ . all ( |& w| w >= N :: zero ( ) ) ;
281+
282+ if remaining_positive && layer_improv <= N :: zero ( ) {
283+ break ;
284+ }
289285 }
290286 }
291- }
292287
293- if possible_improv > max_improv {
294- max_comm = comm;
295- max_improv = possible_improv;
288+ if possible_improv > max_improv {
289+ max_comm = comm;
290+ max_improv = possible_improv;
291+ }
296292 }
297293 }
298- }
299294
300- Ok ( ( max_comm, max_improv) )
301- }
295+ Ok ( ( max_comm, max_improv) )
296+ }
302297
303298 /// Collects candidate communities that a node can potentially move to.
304299 ///
@@ -434,7 +429,9 @@ where
434429
435430 for partition in partitions. iter ( ) {
436431 if partition. node_count ( ) != n {
437- panic ! ( "Number of nodes are not equal for all graphs." ) ;
432+ return Err ( anyhow:: anyhow!(
433+ "Number of nodes are not equal for all graphs."
434+ ) ) ;
438435 }
439436 }
440437
@@ -580,6 +577,71 @@ where
580577 Ok ( total_improv)
581578 }
582579
580+
581+ fn move_nodes_parallel < N , G , P > (
582+ & mut self ,
583+ partitions : & mut [ P ] ,
584+ layer_weights : & [ N ] ,
585+ is_membership_fixed : & [ bool ] ,
586+ consider_comms : ConsiderComms ,
587+ consider_empty_community : bool ,
588+ max_comm_size : Option < usize >
589+ ) -> anyhow:: Result < N >
590+ where
591+ N : FloatOpsTS + ' static ,
592+ G : NetworkGrouping ,
593+ P : VertexPartition < N , G > {
594+ let n = partitions[ 0 ] . node_count ( ) ;
595+ let network = partitions[ 0 ] . network ( ) . clone ( ) ;
596+
597+ let mut total_improv = N :: zero ( ) ;
598+ let mut is_node_stable = is_membership_fixed. to_vec ( ) ;
599+
600+ let mut nodes: Vec < usize > = ( 0 ..n)
601+ . filter ( |& v| !is_membership_fixed[ v] )
602+ . collect ( ) ;
603+ nodes. shuffle ( & mut self . rng ) ;
604+
605+ let mut pending_nodes: VecDeque < usize > = nodes. into ( ) ;
606+ let batcher = ConflictFreeBatcher :: new ( 10_000 ) ;
607+
608+ while !pending_nodes. is_empty ( ) {
609+ let current_nodes: Vec < usize > = pending_nodes. drain ( ..) . collect ( ) ;
610+ let batches = batcher. create_batches ( & current_nodes, & network, & is_node_stable) ;
611+
612+ for batch in batches {
613+ let proposed_moves = ParallelEvaluator :: evaluate_batch ( & batch, partitions, layer_weights, consider_comms, consider_empty_community, max_comm_size) ;
614+
615+ for proposed in proposed_moves {
616+ if proposed. is_beneficial ( ) {
617+ total_improv += proposed. improvement ;
618+
619+ for partition in partitions. iter_mut ( ) {
620+ partition. move_node ( proposed. node , proposed. to_comm ) ;
621+ }
622+
623+ is_node_stable[ proposed. node ] = true ;
624+
625+ for ( neighbor, _) in network. neighbors ( proposed. node ) {
626+ if is_node_stable[ neighbor] && partitions[ 0 ] . membership ( neighbor) != proposed. to_comm && !is_membership_fixed[ neighbor] {
627+ pending_nodes. push_back ( neighbor) ;
628+ is_node_stable[ neighbor] = false ;
629+ }
630+ }
631+ }
632+ }
633+ }
634+ }
635+
636+ partitions[ 0 ] . renumber_communities ( ) ;
637+ let membership = partitions[ 0 ] . membership_vector ( ) ;
638+ for partition in partitions. iter_mut ( ) . skip ( 1 ) {
639+ partition. set_membership ( & membership) ;
640+ }
641+
642+ Ok ( total_improv)
643+ }
644+
583645 fn move_nodes_constrained < N , G , P > (
584646 & mut self ,
585647 partitions : & mut [ P ] ,
@@ -1158,13 +1220,12 @@ where
11581220 while aggregate_further {
11591221 println ! ( "Starting iteration {:?}, time: {:?}" , i, time. elapsed( ) ) ;
11601222 let improvement = match self . config . optimise_routine {
1161- super :: OptimiseRoutine :: MoveNodes => self . move_nodes (
1223+ super :: OptimiseRoutine :: MoveNodes => self . move_nodes_parallel (
11621224 & mut collapsed_partitions,
11631225 layer_weights,
11641226 & is_collapsed_membership_fixed,
11651227 self . config . consider_comms ,
11661228 self . config . consider_empty_community ,
1167- false ,
11681229 self . config . max_community_size ,
11691230 ) ?,
11701231 super :: OptimiseRoutine :: MergeNodes => self . merge_nodes (
0 commit comments