Skip to content

Commit

Permalink
fix: Rate limiter will sometimes tank request rate with many concurre…
Browse files Browse the repository at this point in the history
…nt requests (#1855)

* Do not allow old requests from decreasing the rate limit

* Reset the rate limit cap after inactivity

* Ruff 🐶

* Always leave room for requeues

* Pass in max_queue_size

* Ruff 🐶

* Xfail test due to dependency update

* Double the headroom available to workers for re-queuing

* The rate limiter should always consume a token per request

* Remove xfail

* Modify rate limiting behavior at low request rates

* Explicitly enforce minimum rate
  • Loading branch information
anticorrelator authored Dec 6, 2023
1 parent d6e88f5 commit 2530569
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 45 deletions.
14 changes: 10 additions & 4 deletions src/phoenix/experimental/evals/functions/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,16 @@ async def producer(
self,
inputs: Sequence[Any],
queue: asyncio.Queue[Tuple[int, Any]],
max_fill: int,
done_producing: asyncio.Event,
) -> None:
try:
for index, input in enumerate(inputs):
if self._TERMINATE.is_set():
break
while queue.qsize() >= max_fill:
# keep room in the queue for requeues
await asyncio.sleep(1)
await queue.put((index, input))
finally:
done_producing.set()
Expand Down Expand Up @@ -141,7 +145,7 @@ async def consumer(
termination_signal_task = asyncio.create_task(self._TERMINATE.wait())
done, pending = await asyncio.wait(
[generate_task, termination_signal_task],
timeout=60,
timeout=120,
return_when=asyncio.FIRST_COMPLETED,
)
if generate_task in done:
Expand All @@ -161,7 +165,7 @@ async def consumer(
marked_done = True
continue
else:
tqdm.write(f"Worker timeout, requeuing: {payload}")
tqdm.write("Worker timeout, requeuing")
await queue.put(item)
except Exception:
tqdm.write(f"Exception in worker: {traceback.format_exc()}")
Expand All @@ -180,10 +184,12 @@ async def execute(self, inputs: Sequence[Any]) -> List[Any]:
outputs = [self.fallback_return_value] * len(inputs)
progress_bar = tqdm(total=len(inputs), bar_format=self.tqdm_bar_format)

queue: asyncio.Queue[Tuple[int, Any]] = asyncio.Queue(maxsize=2 * self.concurrency)
max_queue_size = 5 * self.concurrency # limit the queue to bound memory usage
max_fill = max_queue_size - (2 * self.concurrency) # ensure there is always room to requeue
queue: asyncio.Queue[Tuple[int, Any]] = asyncio.Queue(maxsize=max_queue_size)
done_producing = asyncio.Event()

producer = asyncio.create_task(self.producer(inputs, queue, done_producing))
producer = asyncio.create_task(self.producer(inputs, queue, max_fill, done_producing))
consumers = [
asyncio.create_task(self.consumer(outputs, queue, done_producing, progress_bar))
for _ in range(self.concurrency)
Expand Down
25 changes: 13 additions & 12 deletions src/phoenix/experimental/evals/models/rate_limiters.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ def __init__(
self,
initial_per_second_request_rate: float,
maximum_per_second_request_rate: float = 1000,
minimum_per_second_request_rate: float = 0.1,
enforcement_window_minutes: float = 1,
rate_reduction_factor: float = 0.5,
rate_increase_factor: float = 0.01,
cooldown_seconds: float = 5,
):
now = time.time()
self._initial_rate = initial_per_second_request_rate
self.rate = initial_per_second_request_rate
self.maximum_rate = maximum_per_second_request_rate
self.minimum_rate = minimum_per_second_request_rate
self.rate_reduction_factor = rate_reduction_factor
self.enforcement_window = enforcement_window_minutes * 60
self.rate_increase_factor = rate_increase_factor
Expand All @@ -57,29 +60,27 @@ def __init__(

def increase_rate(self) -> None:
time_since_last_update = time.time() - self.last_rate_update
self.rate *= exp(self.rate_increase_factor * time_since_last_update)
self.rate = min(self.rate, self.maximum_rate)
if time_since_last_update > self.enforcement_window:
self.rate = self._initial_rate
else:
self.rate *= exp(self.rate_increase_factor * time_since_last_update)
self.rate = min(self.rate, self.maximum_rate)
self.last_rate_update = time.time()

def on_rate_limit_error(self, request_start_time: float, verbose: bool = False) -> None:
now = time.time()
if self.cooldown > abs(request_start_time - self.last_error):
if request_start_time < (self.last_error + self.cooldown):
# do not reduce the rate for concurrent requests
return

original_rate = self.rate

# the enforced rate is too high, infer an effective rate instead
requests_handled = self.max_tokens() - self.available_requests()
effective_rate = requests_handled / self.enforcement_window

self.rate = effective_rate * self.rate_reduction_factor
self.rate = original_rate * self.rate_reduction_factor
printif(
verbose, f"Reducing rate from {original_rate} to {self.rate} after rate limit error"
)

# the enforcement window determines the minimum rate
self.rate = max(self.rate, 1 / self.enforcement_window)
self.rate = max(self.rate, self.minimum_rate)

# reset request tokens on a rate limit error
self.tokens = 0
Expand Down Expand Up @@ -119,7 +120,7 @@ def wait_until_ready(

async def async_wait_until_ready(
self,
max_wait_time: float = 300,
max_wait_time: float = 10, # defeat the token bucket rate limiter at low rates (<.1 req/s)
) -> None:
start = time.time()
while (time.time() - start) < max_wait_time:
Expand Down Expand Up @@ -209,7 +210,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> GenericType:
for _attempt in range(self._max_rate_limit_retries):
try:
request_start_time = time.time()
self._throttler.wait_until_ready()
await self._throttler.async_wait_until_ready()
return await fn(*args, **kwargs)
except self._rate_limit_error:
self._throttler.on_rate_limit_error(
Expand Down
57 changes: 28 additions & 29 deletions tests/experimental/evals/models/test_rate_limiters.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,30 @@ def test_token_bucket_does_not_increase_rate_past_maximum():
assert isclose(bucket.rate, rate * 2)


def test_token_bucket_resets_rate_after_inactivity():
start = time.time()

with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=100,
cooldown_seconds=5,
)

with warp_time(start + 5):
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
assert isclose(bucket.rate, rate * 2)

with warp_time(start + 100):
bucket.wait_until_ready()
assert isclose(bucket.rate, rate)


def test_token_bucket_decreases_rate():
start = time.time()

Expand Down Expand Up @@ -321,35 +345,10 @@ def test_token_bucket_decreases_rate_once_per_cooldown_period():
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25), "3 seconds is still within the cooldown period"

with warp_time(start + 6):
# mock "available_requests" to simulate full usage
mock_available_requests = mock.MagicMock()
mock_available_requests.return_value = 0
bucket.available_requests = mock_available_requests
with warp_time(start - 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 6.25)


def test_token_bucket_decreases_rate_depending_on_usage():
start = time.time()

with warp_time(start):
rate = 100
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=0.25,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
# mock "available_requests" to simulate actual usage
mock_available_requests = mock.MagicMock()
mock_available_requests.return_value = rate * 30
bucket.available_requests = mock_available_requests

effective_rate = rate * 30 / 60
expected_rate = effective_rate * 0.25
assert isclose(bucket.rate, 25), "requests before the rate limited request are ignored"

with warp_time(start + 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, expected_rate)
assert isclose(bucket.rate, 6.25)

0 comments on commit 2530569

Please sign in to comment.