Skip to content

Commit

Permalink
Bring back the leaky bucket as this limitation is still in place
Browse files Browse the repository at this point in the history
  • Loading branch information
leshchenko1979 committed Mar 3, 2024
1 parent d77c3b2 commit 1d6ab40
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 17 deletions.
2 changes: 1 addition & 1 deletion fast_bitrix24/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6b1"
__version__ = "1.6b2"
20 changes: 13 additions & 7 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
ClientResponseError,
)

from .throttle import SlidingWindowThrottler
from .throttle import SlidingWindowThrottler, LeakyBucketThrottler
from .logger import logger
from .utils import _url_valid

BITRIX_POOL_SIZE = 50
BITRIX_RPS = 2.0

BITRIX_MAX_BATCH_SIZE = 50
BITRIX_MAX_CONCURRENT_REQUESTS = 50

Expand Down Expand Up @@ -78,7 +81,9 @@ def __init__(self, webhook, respect_velocity_policy, client):
self.successive_results = 0

# rate throttlers by method
self.throttlers = {} # dict[str, LeakyBucketLimiter]
self.method_throttlers = {} # dict[str, LeakyBucketLimiter]

self.leaky_bucket_throttler = LeakyBucketThrottler(BITRIX_POOL_SIZE, BITRIX_RPS)

@staticmethod
def standardize_webhook(webhook):
Expand Down Expand Up @@ -153,7 +158,8 @@ async def request_attempt(self, method, params=None) -> dict:
logger.debug("Response: %s", json)

request_run_time = json["time"]["operating"]
self.throttlers[method].add_request_record(request_run_time)
self.method_throttlers[method].add_request_record(request_run_time)
self.leaky_bucket_throttler.add_request_record()

return json

Expand Down Expand Up @@ -185,14 +191,14 @@ async def acquire(self, method: str):

await self.autothrottle()

async with self.limit_concurrent_requests():
async with self.limit_concurrent_requests(), self.leaky_bucket_throttler.acquire():
if self.respect_velocity_policy:
if method not in self.throttlers:
self.throttlers[method] = SlidingWindowThrottler(
if method not in self.method_throttlers:
self.method_throttlers[method] = SlidingWindowThrottler(
BITRIX_MAX_REQUEST_RUNNING_TIME, BITRIX_MEASUREMENT_PERIOD
)

async with self.throttlers[method].acquire():
async with self.method_throttlers[method].acquire():
yield

else:
Expand Down
57 changes: 52 additions & 5 deletions fast_bitrix24/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import contextlib
import time


RequestRecord = collections.namedtuple("RequestRecord", "when, duration")


Expand Down Expand Up @@ -47,15 +46,63 @@ def _calculate_needed_sleep_time(self) -> float:

def _remove_stale_records(self):
"""Remove all stale records from the record register"""
if not self._request_history:
return

cut_off = time.monotonic() - self._measurement_period
while self._request_history[-1].when < cut_off:
while self._request_history and self._request_history[-1].when < cut_off:
self._request_history.pop()

def add_request_record(self, request_duration: float):
"""Register how long the last request has taken"""
self._request_history.appendleft(
RequestRecord(time.monotonic(), request_duration)
)


class LeakyBucketThrottler:
"""The class implements a leaky bucket throttler.
The consumer may only run requests until he has used up to X requests,
after which the rate of Y requests per second will be applied.
When the consumer has hit the limit, he will have to wait.
"""

def __init__(self, pool_size: int, requests_per_second: float):
# how many requests can be in the bucket at once
self._pool_size = pool_size

# how many requests are removed from the bucket per second
self._requests_per_second = requests_per_second

# request history. left - most recent, right - least recent
self._request_history = collections.deque()

@contextlib.asynccontextmanager
async def acquire(self):
"""A context manager that will wait until it's safe to make the next request"""
await asyncio.sleep(self._calculate_needed_sleep_time())

try:
yield
finally:
self._remove_stale_records()

def _calculate_needed_sleep_time(self) -> float:
"""How much time to sleep before it's safe to make a request"""
while len(self._request_history) >= self._pool_size:
time_from_last_request = time.monotonic() - self._request_history[0]
time_to_wait = 1 / self._requests_per_second - time_from_last_request
if time_to_wait > 0:
return time_to_wait
else:
break
return 0

def add_request_record(self):
"""Register when the last request was made"""
self._request_history.appendleft(time.monotonic())

def _remove_stale_records(self):
"""Remove all stale records from the record register"""
cut_off = time.monotonic() - self._pool_size / self._requests_per_second
while self._request_history and self._request_history[-1] < cut_off:
self._request_history.pop()
2 changes: 1 addition & 1 deletion tests/test_srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ async def mock_post(url, json):

# Assert the expected behavior
assert result == {'time': {'operating': 1000}}
assert "method" in handler.throttlers
assert "method" in handler.method_throttlers
48 changes: 45 additions & 3 deletions tests/test_throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,49 @@

import pytest

from fast_bitrix24.throttle import SlidingWindowThrottler, RequestRecord
from fast_bitrix24.throttle import LeakyBucketThrottler, SlidingWindowThrottler


# Test the acquire method of the LeakyBucketThrottler class
@pytest.mark.asyncio
@pytest.mark.parametrize(
"pool_size, requests_per_second, sleep_time, test_id",
[
(5, 1.0, 0, "acquire_happy_path_no_wait"),
(5, 1.0, 1, "acquire_happy_path_with_wait"),
(1, 0.1, 10, "acquire_edge_long_wait"),
],
)
async def test_leaky_bucket(
pool_size, requests_per_second, sleep_time, test_id, monkeypatch
):
# Set up mocks
start_time = time.monotonic()

def fake_time():
return start_time

monkeypatch.setattr(time, "monotonic", fake_time)

sleep_log = []

async def fake_sleep(duration):
sleep_log.append(duration)

monkeypatch.setattr(asyncio, "sleep", fake_sleep)

# Arrange
throttler = LeakyBucketThrottler(pool_size, requests_per_second)
for _ in range(pool_size):
throttler.add_request_record()
await asyncio.sleep(sleep_time)

# Act
async with throttler.acquire() as _:
pass

# Assert
assert len(throttler._request_history) <= pool_size


@pytest.mark.parametrize(
Expand Down Expand Up @@ -57,10 +99,10 @@ def test_needed_sleep_time(
(10, 20, [5, 5.1, 5], 9.9, "happy-2"),
# Edge cases
(10, 20, [10], 10, "edge-1"),
(10, 20, [10, 0.1], 9.9, "edge-2"),
(10, 20, [10, 0.1], 9.9, "edge-2"),
],
)
async def test_leaky_bucket_limiter(
async def test_sliding_window(
max_request_running_time,
measurement_period,
request_durations,
Expand Down

0 comments on commit 1d6ab40

Please sign in to comment.