Skip to content

Commit 74b2348

Browse files
authored
Merge pull request #52 from TogetherCrew/feat/mediawiki-batch-ingestion
feat: Implement batch processing for document ingestion in MediawikiETL
2 parents 9c76678 + d377801 commit 74b2348

1 file changed

Lines changed: 23 additions & 1 deletion

File tree

hivemind_etl/mediawiki/etl.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
import shutil
4+
from concurrent.futures import ThreadPoolExecutor, as_completed
45

56
from llama_index.core import Document
67
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
@@ -100,7 +101,28 @@ def load(self, documents: list[Document]) -> None:
100101
ingestion_pipeline = CustomIngestionPipeline(
101102
self.community_id, collection_name=self.platform_id
102103
)
103-
ingestion_pipeline.run_pipeline(documents)
104+
105+
# Process batches in parallel using ThreadPoolExecutor
106+
batch_size = 1000
107+
batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)]
108+
109+
with ThreadPoolExecutor() as executor:
110+
# Submit all batch processing tasks
111+
future_to_batch = {
112+
executor.submit(ingestion_pipeline.run_pipeline, batch): i
113+
for i, batch in enumerate(batches)
114+
}
115+
116+
# Process completed batches and handle any errors
117+
for future in as_completed(future_to_batch):
118+
batch_idx = future_to_batch[future]
119+
try:
120+
future.result() # This will raise any exceptions that occurred
121+
logging.info(f"Successfully loaded batch {batch_idx} of {len(batches)} documents into Qdrant!")
122+
except Exception as e:
123+
logging.error(f"Error processing batch {batch_idx}: {e}")
124+
raise # Re-raise the exception to stop the process
125+
104126
logging.info(f"Loaded {len(documents)} documents into Qdrant!")
105127

106128
if self.delete_dump_after_load:

0 commit comments

Comments
 (0)