55import time
66from pathlib import Path
77from typing import List , Dict , Any , Optional , Callable
8+ from dataclasses import dataclass
89
910from ..config import Config
1011from ..services import QdrantClient
1415from .progressive_metadata import ProgressiveMetadata
1516
1617
18+ @dataclass
19+ class ThroughputStats :
20+ """Statistics for tracking indexing throughput and throttling."""
21+
22+ files_per_minute : float = 0.0
23+ chunks_per_minute : float = 0.0
24+ embedding_requests_per_minute : float = 0.0
25+ is_throttling : bool = False
26+ throttle_reason : str = ""
27+ average_processing_time_per_file : float = 0.0
28+
29+
1730class SmartIndexer (GitAwareDocumentProcessor ):
1831 """Smart indexer with progressive metadata and resumability."""
1932
@@ -197,13 +210,20 @@ def _do_incremental_index(
197210 def _process_files_with_metadata (
198211 self , files : List [Path ], batch_size : int , progress_callback : Optional [Callable ]
199212 ) -> ProcessingStats :
200- """Process files with progressive metadata updates."""
213+ """Process files with progressive metadata updates and throughput monitoring ."""
201214
202215 stats = ProcessingStats ()
203216 stats .start_time = time .time ()
204217
205218 batch_points = []
206219
220+ # Throughput tracking
221+ throughput_window_start = time .time ()
222+ throughput_window_files = 0
223+ throughput_window_chunks = 0
224+ throughput_window_size = 60.0 # 1 minute window
225+ last_throttle_check = time .time ()
226+
207227 def update_metadata (chunks_count = 0 , failed = False ):
208228 """Update metadata after each file."""
209229 self .progressive_metadata .update_progress (
@@ -212,6 +232,55 @@ def update_metadata(chunks_count=0, failed=False):
212232 failed_files = 1 if failed else 0 ,
213233 )
214234
235+ def calculate_throughput () -> ThroughputStats :
236+ """Calculate current throughput and detect throttling."""
237+ current_time = time .time ()
238+ elapsed = current_time - throughput_window_start
239+
240+ if elapsed <= 0 :
241+ return ThroughputStats ()
242+
243+ # Calculate rates per minute
244+ files_per_min = (throughput_window_files / elapsed ) * 60
245+ chunks_per_min = (throughput_window_chunks / elapsed ) * 60
246+ avg_time_per_file = elapsed / max (throughput_window_files , 1 )
247+
248+ # Detect throttling by checking embedding provider
249+ is_throttling = False
250+ throttle_reason = ""
251+
252+ # Check if we're using VoyageAI and detect rate limiting
253+ provider_name = self .embedding_provider .get_provider_name ()
254+ if provider_name == "voyage-ai" :
255+ # Check if rate limiter indicates throttling
256+ if hasattr (self .embedding_provider , "rate_limiter" ):
257+ rate_limiter = self .embedding_provider .rate_limiter
258+ wait_time = rate_limiter .wait_time (100 ) # Estimate for 100 tokens
259+ if wait_time > 0.5 : # If we need to wait more than 0.5 seconds
260+ is_throttling = True
261+ throttle_reason = f"API rate limiting (wait: { wait_time :.1f} s)"
262+ elif rate_limiter .request_tokens < 10 : # Low on request tokens
263+ is_throttling = True
264+ throttle_reason = "API request quota running low"
265+
266+ # Detect slow processing (could indicate network issues or service slowdown)
267+ if (
268+ avg_time_per_file > 5.0 and not is_throttling
269+ ): # More than 5 seconds per file
270+ is_throttling = True
271+ throttle_reason = (
272+ f"Slow processing detected ({ avg_time_per_file :.1f} s/file)"
273+ )
274+
275+ return ThroughputStats (
276+ files_per_minute = files_per_min ,
277+ chunks_per_minute = chunks_per_min ,
278+ embedding_requests_per_minute = chunks_per_min , # Assuming 1 request per chunk
279+ is_throttling = is_throttling ,
280+ throttle_reason = throttle_reason ,
281+ average_processing_time_per_file = avg_time_per_file ,
282+ )
283+
215284 for i , file_path in enumerate (files ):
216285 points = []
217286
@@ -222,9 +291,11 @@ def update_metadata(chunks_count=0, failed=False):
222291 if points :
223292 batch_points .extend (points )
224293 stats .chunks_created += len (points )
294+ throughput_window_chunks += len (points )
225295
226296 stats .files_processed += 1
227297 stats .total_size += file_path .stat ().st_size
298+ throughput_window_files += 1
228299
229300 # Process batch if full
230301 if len (batch_points ) >= batch_size :
@@ -235,9 +306,39 @@ def update_metadata(chunks_count=0, failed=False):
235306 # Update metadata after successful processing
236307 update_metadata (chunks_count = len (points ), failed = False )
237308
238- # Call progress callback
309+ # Calculate throughput every 30 seconds or every 50 files
310+ current_time = time .time ()
311+ if (current_time - last_throttle_check > 30 ) or (i % 50 == 0 and i > 0 ):
312+ throughput_stats = calculate_throughput ()
313+ last_throttle_check = current_time
314+
315+ # Reset throughput window if it's been more than window size
316+ if current_time - throughput_window_start > throughput_window_size :
317+ throughput_window_start = current_time
318+ throughput_window_files = 0
319+ throughput_window_chunks = 0
320+
321+ # Call progress callback with throughput info
239322 if progress_callback :
240- progress_callback (i + 1 , len (files ), file_path )
323+ throughput_stats = calculate_throughput ()
324+
325+ # Create enhanced info string
326+ info_parts = []
327+ if throughput_stats .files_per_minute > 0 :
328+ info_parts .append (
329+ f"{ throughput_stats .files_per_minute :.1f} files/min"
330+ )
331+ if throughput_stats .chunks_per_minute > 0 :
332+ info_parts .append (
333+ f"{ throughput_stats .chunks_per_minute :.1f} chunks/min"
334+ )
335+ if throughput_stats .is_throttling :
336+ info_parts .append (f"🐌 { throughput_stats .throttle_reason } " )
337+ elif throughput_stats .files_per_minute > 60 : # Fast processing
338+ info_parts .append ("🚀 Full speed" )
339+
340+ info = " | " .join (info_parts ) if info_parts else None
341+ progress_callback (i + 1 , len (files ), file_path , info = info )
241342
242343 except Exception as e :
243344 stats .failed_files += 1
0 commit comments