diff --git a/taskiq_redis/schedule_source.py b/taskiq_redis/schedule_source.py index bc53141..fd3d922 100644 --- a/taskiq_redis/schedule_source.py +++ b/taskiq_redis/schedule_source.py @@ -117,7 +117,6 @@ def __init__( self, url: str, prefix: str = "schedule", - buffer_size: int = 50, serializer: Optional[TaskiqSerializer] = None, **connection_kwargs: Any, ) -> None: @@ -126,7 +125,6 @@ def __init__( url, **connection_kwargs, ) - self.buffer_size = buffer_size if serializer is None: serializer = PickleSerializer() self.serializer = serializer @@ -156,19 +154,14 @@ async def get_schedules(self) -> List[ScheduledTask]: :return: list of schedules. """ schedules = [] - buffer = [] async for key in self.redis.scan_iter(f"{self.prefix}:*"): # type: ignore[attr-defined] - buffer.append(key) - if len(buffer) >= self.buffer_size: - schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined] - buffer = [] - if buffer: - schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined] - return [ - model_validate(ScheduledTask, self.serializer.loadb(schedule)) - for schedule in schedules - if schedule - ] + raw_schedule = await self.redis.get(key) # type: ignore[attr-defined] + parsed_schedule = model_validate( + ScheduledTask, + self.serializer.loadb(raw_schedule), + ) + schedules.append(parsed_schedule) + return schedules async def post_send(self, task: ScheduledTask) -> None: """Delete a task after it's completed.""" diff --git a/tests/test_schedule_source.py b/tests/test_schedule_source.py index ed245fd..fafb17b 100644 --- a/tests/test_schedule_source.py +++ b/tests/test_schedule_source.py @@ -196,10 +196,23 @@ async def test_cluster_post_run_time(redis_cluster_url: str) -> None: @pytest.mark.anyio -async def test_cluster_buffer(redis_cluster_url: str) -> None: +async def test_cluster_get_schedules(redis_cluster_url: str) -> None: + """ + Test of a redis cluster source. + + This test checks that if the schedules are located on different nodes, + the source will still be able to get them all. + + To simulate this we set a specific shard key for each schedule. + The shard keys are from this gist: + + https://gist.githubusercontent.com/dvirsky/93f43277317f629bb06e858946416f7e/raw/b0438faf6f5a0020c12a0730f6cd6ac4bdc4b171/crc16_slottable.h + + """ prefix = uuid.uuid4().hex - source = RedisClusterScheduleSource(redis_cluster_url, prefix=prefix, buffer_size=1) + source = RedisClusterScheduleSource(redis_cluster_url, prefix=prefix) schedule1 = ScheduledTask( + schedule_id=r"id-{06S}", task_name="test_task1", labels={}, args=[], @@ -207,6 +220,7 @@ async def test_cluster_buffer(redis_cluster_url: str) -> None: cron="* * * * *", ) schedule2 = ScheduledTask( + schedule_id=r"id-{4Rs}", task_name="test_task2", labels={}, args=[],