diff --git a/src/phoenix/experimental/evals/functions/classify.py b/src/phoenix/experimental/evals/functions/classify.py index 10cbf1da30..a8e36d3c07 100644 --- a/src/phoenix/experimental/evals/functions/classify.py +++ b/src/phoenix/experimental/evals/functions/classify.py @@ -103,12 +103,16 @@ async def producer( self, inputs: Sequence[Any], queue: asyncio.Queue[Tuple[int, Any]], + max_fill: int, done_producing: asyncio.Event, ) -> None: try: for index, input in enumerate(inputs): if self._TERMINATE.is_set(): break + while queue.qsize() >= max_fill: + # keep room in the queue for requeues + await asyncio.sleep(1) await queue.put((index, input)) finally: done_producing.set() @@ -141,7 +145,7 @@ async def consumer( termination_signal_task = asyncio.create_task(self._TERMINATE.wait()) done, pending = await asyncio.wait( [generate_task, termination_signal_task], - timeout=60, + timeout=120, return_when=asyncio.FIRST_COMPLETED, ) if generate_task in done: @@ -161,7 +165,7 @@ async def consumer( marked_done = True continue else: - tqdm.write(f"Worker timeout, requeuing: {payload}") + tqdm.write("Worker timeout, requeuing") await queue.put(item) except Exception: tqdm.write(f"Exception in worker: {traceback.format_exc()}") @@ -180,10 +184,12 @@ async def execute(self, inputs: Sequence[Any]) -> List[Any]: outputs = [self.fallback_return_value] * len(inputs) progress_bar = tqdm(total=len(inputs), bar_format=self.tqdm_bar_format) - queue: asyncio.Queue[Tuple[int, Any]] = asyncio.Queue(maxsize=2 * self.concurrency) + max_queue_size = 5 * self.concurrency # limit the queue to bound memory usage + max_fill = max_queue_size - (2 * self.concurrency) # ensure there is always room to requeue + queue: asyncio.Queue[Tuple[int, Any]] = asyncio.Queue(maxsize=max_queue_size) done_producing = asyncio.Event() - producer = asyncio.create_task(self.producer(inputs, queue, done_producing)) + producer = asyncio.create_task(self.producer(inputs, queue, max_fill, done_producing)) consumers = [ asyncio.create_task(self.consumer(outputs, queue, done_producing, progress_bar)) for _ in range(self.concurrency) diff --git a/src/phoenix/experimental/evals/models/rate_limiters.py b/src/phoenix/experimental/evals/models/rate_limiters.py index d54e70f560..bdd3168d49 100644 --- a/src/phoenix/experimental/evals/models/rate_limiters.py +++ b/src/phoenix/experimental/evals/models/rate_limiters.py @@ -38,14 +38,17 @@ def __init__( self, initial_per_second_request_rate: float, maximum_per_second_request_rate: float = 1000, + minimum_per_second_request_rate: float = 0.1, enforcement_window_minutes: float = 1, rate_reduction_factor: float = 0.5, rate_increase_factor: float = 0.01, cooldown_seconds: float = 5, ): now = time.time() + self._initial_rate = initial_per_second_request_rate self.rate = initial_per_second_request_rate self.maximum_rate = maximum_per_second_request_rate + self.minimum_rate = minimum_per_second_request_rate self.rate_reduction_factor = rate_reduction_factor self.enforcement_window = enforcement_window_minutes * 60 self.rate_increase_factor = rate_increase_factor @@ -57,29 +60,27 @@ def __init__( def increase_rate(self) -> None: time_since_last_update = time.time() - self.last_rate_update - self.rate *= exp(self.rate_increase_factor * time_since_last_update) - self.rate = min(self.rate, self.maximum_rate) + if time_since_last_update > self.enforcement_window: + self.rate = self._initial_rate + else: + self.rate *= exp(self.rate_increase_factor * time_since_last_update) + self.rate = min(self.rate, self.maximum_rate) self.last_rate_update = time.time() def on_rate_limit_error(self, request_start_time: float, verbose: bool = False) -> None: now = time.time() - if self.cooldown > abs(request_start_time - self.last_error): + if request_start_time < (self.last_error + self.cooldown): # do not reduce the rate for concurrent requests return original_rate = self.rate - # the enforced rate is too high, infer an effective rate instead - requests_handled = self.max_tokens() - self.available_requests() - effective_rate = requests_handled / self.enforcement_window - - self.rate = effective_rate * self.rate_reduction_factor + self.rate = original_rate * self.rate_reduction_factor printif( verbose, f"Reducing rate from {original_rate} to {self.rate} after rate limit error" ) - # the enforcement window determines the minimum rate - self.rate = max(self.rate, 1 / self.enforcement_window) + self.rate = max(self.rate, self.minimum_rate) # reset request tokens on a rate limit error self.tokens = 0 @@ -119,7 +120,7 @@ def wait_until_ready( async def async_wait_until_ready( self, - max_wait_time: float = 300, + max_wait_time: float = 10, # defeat the token bucket rate limiter at low rates (<.1 req/s) ) -> None: start = time.time() while (time.time() - start) < max_wait_time: @@ -209,7 +210,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> GenericType: for _attempt in range(self._max_rate_limit_retries): try: request_start_time = time.time() - self._throttler.wait_until_ready() + await self._throttler.async_wait_until_ready() return await fn(*args, **kwargs) except self._rate_limit_error: self._throttler.on_rate_limit_error( diff --git a/tests/experimental/evals/models/test_rate_limiters.py b/tests/experimental/evals/models/test_rate_limiters.py index e639347492..6facded6b5 100644 --- a/tests/experimental/evals/models/test_rate_limiters.py +++ b/tests/experimental/evals/models/test_rate_limiters.py @@ -282,6 +282,30 @@ def test_token_bucket_does_not_increase_rate_past_maximum(): assert isclose(bucket.rate, rate * 2) +def test_token_bucket_resets_rate_after_inactivity(): + start = time.time() + + with freeze_time(start): + rate = 0.1 + bucket = AdaptiveTokenBucket( + initial_per_second_request_rate=rate, + maximum_per_second_request_rate=rate * 2, + enforcement_window_minutes=1, + rate_reduction_factor=1, + rate_increase_factor=100, + cooldown_seconds=5, + ) + + with warp_time(start + 5): + assert bucket.available_requests() == 0.5, "should have accumulated half a request" + bucket.wait_until_ready() + assert isclose(bucket.rate, rate * 2) + + with warp_time(start + 100): + bucket.wait_until_ready() + assert isclose(bucket.rate, rate) + + def test_token_bucket_decreases_rate(): start = time.time() @@ -321,35 +345,10 @@ def test_token_bucket_decreases_rate_once_per_cooldown_period(): bucket.on_rate_limit_error(request_start_time=time.time()) assert isclose(bucket.rate, 25), "3 seconds is still within the cooldown period" - with warp_time(start + 6): - # mock "available_requests" to simulate full usage - mock_available_requests = mock.MagicMock() - mock_available_requests.return_value = 0 - bucket.available_requests = mock_available_requests + with warp_time(start - 6): bucket.on_rate_limit_error(request_start_time=time.time()) - assert isclose(bucket.rate, 6.25) - - -def test_token_bucket_decreases_rate_depending_on_usage(): - start = time.time() - - with warp_time(start): - rate = 100 - bucket = AdaptiveTokenBucket( - initial_per_second_request_rate=rate, - maximum_per_second_request_rate=rate * 2, - enforcement_window_minutes=1, - rate_reduction_factor=0.25, - rate_increase_factor=0.01, - cooldown_seconds=5, - ) - # mock "available_requests" to simulate actual usage - mock_available_requests = mock.MagicMock() - mock_available_requests.return_value = rate * 30 - bucket.available_requests = mock_available_requests - - effective_rate = rate * 30 / 60 - expected_rate = effective_rate * 0.25 + assert isclose(bucket.rate, 25), "requests before the rate limited request are ignored" + with warp_time(start + 6): bucket.on_rate_limit_error(request_start_time=time.time()) - assert isclose(bucket.rate, expected_rate) + assert isclose(bucket.rate, 6.25)