The request rate limiter using Leaky-bucket algorithm
This module can be used to apply rate-limit for API request. User defines window duration and the limit of function calls within such interval.
from pyrate_limiter import (
BucketFullException,
Duration,
RequestRate,
Limiter,
MemoryListBucket,
MemoryQueueBucket,
SQLiteBucket,
RedisBucket,
RedisClusterBucket,
)A few different bucket backends are available, which can be selected using the bucket_class
argument for Limiter. Any additional backend-specific arguments can be passed
via bucket_kwargs.
The default bucket is stored in memory, backed by a queue.Queue. A list implementation is also available:
from pyrate_limiter import Limiter, MemoryListBucket
limiter = Limiter(bucket_class=MemoryListBucket)If you need to persist the bucket state, a SQLite backend is available.
By default it will store the state in the system temp directory, and you can use
the path argument to use a different location:
from pyrate_limiter import Limiter, SQLiteBucket
limiter = Limiter(
bucket_class=SQLiteBucket,
bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)If you have a larger, distributed application, Redis is an ideal backend. This option requires redis-py.
You can use the redis_pool argument to pass any connection settings:
from pyrate_limiter import Limiter, RedisBucket
from redis import ConnectionPool
redis_pool = ConnectionPool(host='localhost', port=6379, db=0)
limiter = Limiter(
bucket_class=RedisBucket,
bucket_kwargs={'redis_pool': redis_pool},
)Redis clusters are also supported, which requires redis-py-cluster:
from pyrate_limiter import Limiter, RedisClusterBucket
limiter = Limiter(bucket_class=RedisClusterBucket)If these don't suit your needs, you can also create your own bucket backend by extending pyrate_limiter.bucket.AbstractBucket.
Considering API throttling logic for usual business models of Subscription, we usually see strategies somewhat similar to these.
Some commercial/free API (Linkedin, Github etc)
- 500 requests/hour, and
- 1000 requests/day, and
- maximum 10,000 requests/month-
RequestRateclass is designed to describe this strategies - eg for the above strategies we have a Rate-Limiter defined as following
hourly_rate = RequestRate(500, Duration.HOUR) # maximum 500 requests/hour
daily_rate = RequestRate(1000, Duration.DAY) # maximum 1000 requests/day
monthly_rate = RequestRate(10000, Duration.MONTH) # and so on
limiter = Limiter(hourly_rate, daily_rate, monthly_rate, *other_rates, bucket_class=MemoryListBucket) # default is MemoryQueueBucket
# usage
identity = user_id # or ip-address, or maybe both
limiter.try_acquire(identity)As the logic is pretty self-explainatory, note that the superior rate-limit must come after the inferiors, ie 1000 req/day must be declared after an hourly-rate-limit, and the daily-limit must be larger than hourly-limit.
-
bucket_classis the type of bucket that holds request. It could be an in-memory data structure like Python List (MemoryListBucket), or QueueMemoryQueueBucket. -
For microservices or decentralized platform, multiple rate-Limiter may share a single store for storing request-rate history, ie
Redis. This lib provides ready-to-useRedisBucket(redis-pyis required), andRedisClusterBucket(withredis-py-clusterbeing required). The usage difference is when using Redis, a namingprefixmust be provide so the keys can be distinct for each item's identity.
from pyrate_limiter.bucket import RedisBucket, RedisClusterBucket
from redis import ConnectionPool
redis_pool = ConnectionPool.from_url('redis://localhost:6379')
rate = RequestRate(3, 5 * Duration.SECOND)
bucket_kwargs = {
"redis_pool": redis_pool,
"bucket_name": "my-ultimate-bucket-prefix"
}
# so each item buckets will have a key name as
# my-ultimate-bucket-prefix__item-identity
limiter = Limiter(rate, bucket_class=RedisBucket, bucket_kwargs=bucket_kwargs)
# or RedisClusterBucket when used with a redis cluster
# limiter = Limiter(rate, bucket_class=RedisClusterBucket, bucket_kwargs=bucket_kwargs)
item = 'vutran_item'
limiter.try_acquire(item)If the Bucket is full, an exception BucketFullException will be raised, with meta-info about the identity it received, the rate that has raised, and the remaining time until the next request can be processed.
rate = RequestRate(3, 5 * Duration.SECOND)
limiter = Limiter(rate)
item = 'vutran'
has_raised = False
try:
for _ in range(4):
limiter.try_acquire(item)
sleep(1)
except BucketFullException as err:
has_raised = True
assert str(err)
# Bucket for vutran with Rate 3/5 is already full
assert isinstance(err.meta_info, dict)
# {'error': 'Bucket for vutran with Rate 3/5 is already full', 'identity': 'tranvu', 'rate': '5/5', 'remaining_time': 2}- *RequestRate may be required to
reseton a fixed schedule, eg: every first-day of a month
Rate-limiting is also available in decorator form, using Limiter.ratelimit. Example:
@limiter.ratelimit(item)
def my_function():
do_stuff()As with Limiter.try_acquire, if calls to the wrapped function exceed the rate limits you
defined, a BucketFullException will be raised.
In some cases, you may want to simply slow down your calls to stay within the rate limits instead of
canceling them. In that case you can use the delay flag, optionally with a max_delay
(in seconds) that you are willing to wait in between calls.
Example:
@limiter.ratelimit(item, delay=True, max_delay=10)
def my_function():
do_stuff()In this case, calls may be delayed by at most 10 seconds to stay within the rate limits; any longer
than that, and a BucketFullException will be raised instead. Without specifying max_delay, calls
will be delayed as long as necessary.
Limiter.ratelimit also works as a contextmanager:
def my_function():
with limiter.ratelimit(item, delay=True):
do_stuff()All the above features of Limiter.ratelimit also work on async functions:
@limiter.ratelimit(item, delay=True)
async def my_function():
await do_stuff()
async def my_function():
async with limiter.ratelimit(item):
await do_stuff()When delays are enabled, asyncio.sleep will be used instead of time.sleep.
To prove that pyrate-limiter is working as expected, here is a complete example to demonstrate rate-limiting with delays:
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate
limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27
@limiter.ratelimit("test", delay=True)
def limited_function(start_time):
print(f"t + {(time() - start_time):.5f}")
start_time = time()
for _ in range(n_requests):
limited_function(start_time)
print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")And an equivalent example for async usage:
import asyncio
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate
limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27
@limiter.ratelimit("test", delay=True)
async def limited_function(start_time):
print(f"t + {(time() - start_time):.5f}")
async def test_ratelimit():
start_time = time()
tasks = [limited_function(start_time) for _ in range(n_requests)]
await asyncio.gather(*tasks)
print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")
asyncio.run(test_ratelimit())Limiter can be used with any custom time source. For example user may want to use
Redis time to use same time in distributed installation. By default time.monotonic is
used. To adjust time source use time_function parameter with any no arguments function.
from datetime import datetime
from pyrate_limiter import Duration, Limiter, RequestRate
from time import time
limiter_datetime = Limiter(RequestRate(5, Duration.SECOND), time_function=lambda: datetime.utcnow().timestamp())
limiter_time = Limiter(RequestRate(5, Duration.SECOND), time_function=time)- Sometimes, we need a rate-limiter to protect our API from spamming/ddos attack. Some usual strategies for this could be as following
1. No more than 100 requests/minute, or
2. 100 request per minute, and no more than 300 request per hourWhen the number of incoming requets go beyond the limit, we can either do..
1. Raise a 429 Http Error, or
2. Keep the incoming requests, wait then slowly process them one by one.https://www.keycdn.com/support/rate-limiting#types-of-rate-limits
- *Sometimes, we may need to apply specific rate-limiting strategies based on schedules/region or some other metrics. It
requires the capability to
switchthe strategies instantly without re-deploying the whole service.
- To setup local development, Poetry and Python 3.6 is required. Python can be installed using Pyenv or normal installation from binary source. To install poetry, follow the official guideline (https://python-poetry.org/docs/#installation).
Then, in the repository directory...
$ poetry install- Other than built-in Poetry commands, there are some custom commands defined in scripts.py. What you should care about are:
- Run test with:
poetry run test - Format code base:
poetry run format - To run test with coverage:
poetry run cover - Check for lint error:
poetry run lint
- Run test with:
We have CICD running on Travis to do the checking, testing and publishing work. So, there are few small notes when making Pull Request:
- All existing tests must pass (Of course!)
- Reduction in Coverage shall result in failure. (below 98% is not accepted)
- When you are making bug fixes, or adding more features, remember to bump the version number in pyproject.toml. The number should follow semantic-versioning rules
Todo-items marked with (*) are planned for v3 release.
