@@ -32,7 +32,7 @@ class ParallelProcessing: ...
3232 HookFunction ,
3333)
3434from typing_extensions import Generic , ParamSpec
35- from typing import List , Callable , Optional , Union , Mapping , Sequence , Tuple
35+ from typing import List , Callable , Optional , Union , Mapping , Sequence , Tuple , Generator
3636
3737
3838Threads : set ['Thread' ] = set ()
@@ -387,15 +387,19 @@ def _wrap_function(self, function: TargetFunction) -> TargetFunction:
387387 @wraps (function )
388388 def wrapper (
389389 index : int ,
390- data_chunk : Sequence [_Dataset_T ],
390+ length : int ,
391+ data_chunk : Generator [_Dataset_T , None , None ],
391392 * args : _Target_P .args ,
392393 ** kwargs : _Target_P .kwargs ,
393394 ) -> List [_Target_T ]:
394395 computed : List [Data_Out ] = []
395- for i , data_entry in enumerate (data_chunk ):
396+
397+ i = 0
398+ for data_entry in data_chunk :
396399 v = function (data_entry , * args , ** kwargs )
397400 computed .append (v )
398- self ._threads [index ].progress = round ((i + 1 ) / len (data_chunk ), 5 )
401+ self ._threads [index ].progress = round ((i + 1 ) / length , 5 )
402+ i += 1
399403
400404 self ._completed += 1
401405 if self ._completed == len (self ._threads ):
@@ -507,15 +511,23 @@ def start(self) -> None:
507511 i : v for i , v in self .overflow_kwargs .items () if i != 'name' and i != 'args'
508512 }
509513
510- for i , data_chunk in enumerate (chunk_split (self .dataset , max_threads )):
514+ i = 0
515+ for chunkStart , chunkEnd in chunk_split (len (self .dataset ), max_threads ):
511516 chunk_thread = Thread (
512517 target = self .function ,
513- args = [i , data_chunk , * parsed_args , * self .overflow_args ],
518+ args = [
519+ i ,
520+ chunkEnd - chunkStart ,
521+ (self .dataset [x ] for x in range (chunkStart , chunkEnd )),
522+ * parsed_args ,
523+ * self .overflow_args ,
524+ ],
514525 name = name_format and name_format % i or None ,
515526 ** self .overflow_kwargs ,
516527 )
517528 self ._threads .append (_ThreadWorker (chunk_thread , 0 ))
518529 chunk_thread .start ()
530+ i += 1
519531
520532
521533# Handle abrupt exit
0 commit comments