From 4e683186e6132cdcd4643718746182270cf5d358 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:50:19 +0900 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20pytest=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- tests/s3_service_test.py | 39 +++++++++++++++++++++++++++++++++++++++ tests/test_main.py | 10 ++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 tests/s3_service_test.py create mode 100644 tests/test_main.py diff --git a/.gitignore b/.gitignore index 3b1d388..debaf2a 100644 --- a/.gitignore +++ b/.gitignore @@ -40,7 +40,7 @@ MANIFEST pip-log.txt pip-delete-this-directory.txt -# Unit test / coverage reports +# Unit tests / coverage reports htmlcov/ .tox/ .nox/ diff --git a/tests/s3_service_test.py b/tests/s3_service_test.py new file mode 100644 index 0000000..fe884b2 --- /dev/null +++ b/tests/s3_service_test.py @@ -0,0 +1,39 @@ +import io +import pytest +from unittest.mock import patch, MagicMock + +from app.services.s3_service import s3_service + +class TestDownloadFile: + @patch("app.services.s3_service.requests.get") + def test_download_file_success(self, mock_get): + # arrange + mock_get.return_value = MagicMock() + mock_get.return_value.content = b"test file content" + mock_get.return_value.raise_for_status = MagicMock() + + # act + obj = s3_service + result = obj.download_file_from_presigned_url( + "http://fake-url.com" + ) + + # assert + assert isinstance(result, io.BytesIO) + assert result.getvalue() == b"test file content" + mock_get.assert_called_once_with( + "http://fake-url.com" + ) + mock_get.return_value.raise_for_status.assert_called_once() + + @patch("app.services.s3_service.requests.get") + def test_download_file_http_error(self, mock_get): + # arrange + mock_get.return_value = MagicMock() + mock_get.return_value.raise_for_status.side_effect = Exception("HTTP Error") + + obj = s3_service + + # act & assert + with pytest.raises(Exception, match="HTTP Error"): + obj.download_file_from_presigned_url("http://fake-url.com") \ No newline at end of file diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..d0ad861 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,10 @@ +from fastapi.testclient import TestClient + +from app.main import app + +client = TestClient(app=app) + +def test_health_check(): + response = client.get("/api/v1/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} \ No newline at end of file From fcb9b54dd10e596b626d78c742d0a214efa9d668 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:50:27 +0900 Subject: [PATCH 2/9] =?UTF-8?q?feat:=20pytest=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From 2dbc3a914f1893bf9660fbeeeff368e30528736d Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:51:35 +0900 Subject: [PATCH 3/9] =?UTF-8?q?fix:=20S3=EC=9D=98=20=EC=9D=B4=EB=AF=B8?= =?UTF-8?q?=EC=A7=80=20Path=20=EC=A0=80=EC=9E=A5=20=EC=82=AD=EC=A0=9C=20->?= =?UTF-8?q?=20=EC=9D=B4=EB=AF=B8=EC=A7=80=20=EC=9E=90=EC=B2=B4=EB=A5=BC=20?= =?UTF-8?q?=EC=9D=B8=EC=9E=90=EB=A1=9C=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 2 +- app/services/s3_service.py | 8 ++++---- app/worker/tasks.py | 12 +++--------- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index bfa2e55..06fd1b2 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -14,7 +14,7 @@ def get_redis_client(request: Request) -> redis.Redis: async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): correlation_id = str(uuid.uuid4()) job = ImageJob( - correlationId=correlationId, + correlationId=correlation_id, presignedUrl=job_request.presigned_url, replyQueue=settings.STREAM_RESULT, callbackUrl=None, diff --git a/app/services/s3_service.py b/app/services/s3_service.py index b85d4fc..8cf7719 100644 --- a/app/services/s3_service.py +++ b/app/services/s3_service.py @@ -1,8 +1,8 @@ import boto3 from botocore.config import Config as BotoConfig -from pathlib import Path import requests +from io import BytesIO from app.core.config import settings @@ -18,10 +18,10 @@ def __init__(self): ), ) - def download_file_from_presigned_url(self, presigned_url: str, destination: Path): + def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO: response = requests.get(presigned_url) response.raise_for_status() - with open(destination, "wb") as f: - f.write(response.content) + + return BytesIO(response.content) # response 안의 content Stream으로 처리 s3_service = S3Service() diff --git a/app/worker/tasks.py b/app/worker/tasks.py index f3b2eed..a2a3bf2 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -1,7 +1,5 @@ import asyncio -import json -from pathlib import Path import redis.asyncio as redis from datetime import datetime @@ -13,13 +11,10 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis): correlation_id = job.correlationId print(f"[task] Start image scan for job_id={correlation_id}") - - temp_image_path = Path(f"/tmp/{correlation_id}.jpg") - try: - s3_service.download_file_from_presigned_url(job.presignedUrl, temp_image_path) + stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl)) - pill_name, label, confidence = predictor_service.predict(temp_image_path) + pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file)) finished_at = datetime.utcnow().isoformat() @@ -43,5 +38,4 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis): except Exception as e: print(f"[task] Failed to process job_id={correlation_id}: {e}") finally: - if temp_image_path.exists(): - temp_image_path.unlink() + print(f"[task] Image scan finished for job_id={correlation_id}") From c4d5482eedee301639e7c0fd4757a3cbaf8edff2 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:53:38 +0900 Subject: [PATCH 4/9] =?UTF-8?q?fix:=20S3=EC=9D=98=20=EC=9D=B4=EB=AF=B8?= =?UTF-8?q?=EC=A7=80=20Path=20=EC=A0=80=EC=9E=A5=20=EC=82=AD=EC=A0=9C=20->?= =?UTF-8?q?=20=EC=9D=B4=EB=AF=B8=EC=A7=80=20=EC=9E=90=EC=B2=B4=EB=A5=BC=20?= =?UTF-8?q?=EC=9D=B8=EC=9E=90=EB=A1=9C=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/predictor_service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/services/predictor_service.py b/app/services/predictor_service.py index c06efcd..caa94c2 100644 --- a/app/services/predictor_service.py +++ b/app/services/predictor_service.py @@ -6,6 +6,7 @@ from PIL import Image import json from pathlib import Path +from io import BytesIO class LightCNN(nn.Module): def __init__(self, num_classes): @@ -53,8 +54,8 @@ def _load_model(self, model_path: Path) -> LightCNN: model.eval() return model - def predict(self, image_path: Path) -> tuple[str, str, float]: - image = Image.open(image_path).convert('RGB') + async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: + image = Image.open(stream_file).convert('RGB') input_tensor = self.transform(image).unsqueeze(0).to(self.device) with torch.no_grad(): From a47a0569bb8aad117acfb44fb2cac3a6896021c8 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:54:02 +0900 Subject: [PATCH 5/9] =?UTF-8?q?fix:=20=ED=95=98=EB=82=98=EC=9D=98=20?= =?UTF-8?q?=EA=B3=B5=ED=86=B5=EB=90=9C=20AsyncRedis=EB=A5=BC=20=EC=82=AC?= =?UTF-8?q?=EC=9A=A9=ED=95=98=EB=8F=84=EB=A1=9D=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/lifespan.py | 4 +- app/worker/redis_client.py | 95 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 6 deletions(-) diff --git a/app/core/lifespan.py b/app/core/lifespan.py index 9bdc15f..528e0bd 100644 --- a/app/core/lifespan.py +++ b/app/core/lifespan.py @@ -4,12 +4,14 @@ import redis.asyncio as redis from fastapi import FastAPI +from app.worker.redis_client import redis_client from app.core.config import settings from app.worker.worker import JobWorker @asynccontextmanager async def lifespan(app: FastAPI): - redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True) + limiter = asyncio.to_thread.current_default_thread_limiter() + limiter.total_tokens = 50 try: await redis_client.xgroup_create( diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 8b7339b..8f75b56 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -1,10 +1,95 @@ +import asyncio import redis + +from redis.asyncio import Redis as AsyncRedis from pydantic import BaseModel, Field -from typing import Any, Dict +from typing import Any, Dict, Optional, List from app.core.config import settings -redis_client = redis.asyncio.from_url(settings.REDIS_URL, decode_responses=True) - +# Redis Stream Definition class PublishRequest(BaseModel): - stream: str = Field(default=settings.JOB_STREAM, description="Redis Stream Job name") - payload: Dict[str, Any] \ No newline at end of file + stream: str = Field(default=settings.STREAM_JOB, description="Redis Stream Job name") + payload: Dict[str, Any] + +class RedisStreamClient: + + def __init__(self): + self.redis_client = AsyncRedis.from_url( + url=settings.REDIS_URL, + decode_responses=True, + ) + + @classmethod + def init(cls): + broker = cls() + return broker + + # Fast API 에서 Publish + async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: + await self.redis_client.xadd(stream_name, fields) + + # Group 단위로 읽어오기 + async def xreadgroup( + self, + group_name: str, + consumer_name: str, + stream_name: str, + count: Optional[int] = None, + block: Optional[int] = None, + id: str = ">", + ) -> List[tuple]: + try: + # Create the consumer group ( 존재하지 않을 때 ) + self.redis_client.xgroup_create(stream_name, group_name, id="$", mkstream=True) + except redis.exceptions.ResponseError as e: + # 이미 존재할 때 + if "BUSYGROUP" not in str(e): + raise e + + response = self.redis_client.xreadgroup( + groupname=group_name, + consumername=consumer_name, + streams={stream_name: id}, + count=count, + block=block, + ) + return response + + # Consumer 처리 완료 + def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + return self.redis_client.xack(stream_name, group_name, *message_ids) + + # 완료 시 삭제 + def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + + acked_count = self.redis_client.xack(stream_name, group_name, *message_ids) + + # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) + if acked_count > 0: + self.redis_client.xdel(stream_name, *message_ids) + + return acked_count + + + # 메시지 재처리 지원 + def xclaim( + self, + stream_name: str, + group_name: str, + consumer_name: str, + min_idle_time: int, + message_ids: List[str], + ) -> List[tuple]: + + return self.redis_client.xclaim( + name=stream_name, + groupname=group_name, + consumername=consumer_name, + min_idle_time=min_idle_time, + message_ids=message_ids, + ) + + def close(self): + self.redis_client.disconnect() + +redis_client = RedisStreamClient.init() \ No newline at end of file From f7376e2f53054261a082694a3bba05c118b3513d Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:54:39 +0900 Subject: [PATCH 6/9] =?UTF-8?q?feat:=20XACK=20=EC=9D=B4=ED=9B=84=20XDEL?= =?UTF-8?q?=EC=9D=84=20=ED=86=B5=ED=95=9C=20=EC=99=84=EB=A3=8C=EC=97=90=20?= =?UTF-8?q?=EB=8C=80=ED=95=9C=20=EB=A9=94=EC=8B=9C=EC=A7=80=20=EC=82=AD?= =?UTF-8?q?=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index e48e62c..fc9966d 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -1,12 +1,12 @@ import asyncio -import redis.asyncio as redis -from app.core.config import Settings +from app.worker.redis_client import redis_client +from app.core.config import settings from app.schemas.job import ImageJob from app.worker.tasks import process_image_scan class JobWorker: - def __init__(self, redis_client: redis.Redis): + def __init__(self, redis_client: redis_client): self.redis_client = redis_client async def run(self): @@ -28,9 +28,15 @@ async def run(self): for msg_id, fields in entries: try: job = ImageJob.model_validate_json(fields["json"]) - await process_image_scan(job, redis_client) - # 처리 성공 시에만 ack - await redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) except asyncio.CancelledError: # 취소되면 재전송되도록 ack 하지 않음 raise @@ -54,8 +60,15 @@ async def run(self): for msg_id, fields in claimed: try: job = ImageJob.model_validate_json(fields["json"]) - asyncio.create_task(process_image_scan(job, self.redis_client)) - await self.redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) except Exception as e: await self.redis_client.xadd( f"{settings.STREAM_JOB}:DLQ", From b915cb5676a36022f6633d118b4c0d4e182800ad Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 04:31:56 +0900 Subject: [PATCH 7/9] =?UTF-8?q?fix:=20Redis=EB=A5=BC=20=EB=B9=84=EB=8F=99?= =?UTF-8?q?=EA=B8=B0=20=EA=B8=B0=EB=B0=98=EC=9C=BC=EB=A1=9C=20=EB=B3=80?= =?UTF-8?q?=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/lifespan.py | 22 ++++++--------- app/worker/redis_client.py | 56 +++++++++++++++++++++++--------------- app/worker/worker.py | 6 ++-- 3 files changed, 46 insertions(+), 38 deletions(-) diff --git a/app/core/lifespan.py b/app/core/lifespan.py index 528e0bd..2db715d 100644 --- a/app/core/lifespan.py +++ b/app/core/lifespan.py @@ -1,7 +1,9 @@ import asyncio from contextlib import asynccontextmanager -import redis.asyncio as redis + +import anyio.to_thread +import redis from fastapi import FastAPI from app.worker.redis_client import redis_client @@ -10,19 +12,13 @@ @asynccontextmanager async def lifespan(app: FastAPI): - limiter = asyncio.to_thread.current_default_thread_limiter() - limiter.total_tokens = 50 + limiter = anyio.to_thread.current_default_thread_limiter() + limiter.total_tokens = 100 - try: - await redis_client.xgroup_create( - name=settings.STREAM_JOB, - groupname=settings.GROUP_NAME, - id="$", - mkstream=True - ) - except redis.ResponseError as e: - if "BUSYGROUP" not in str(e): - raise + redis_client.xgroup_create( + stream_name=settings.STREAM_JOB, + group_name=settings.GROUP_NAME, + ) worker = JobWorker(redis_client) worker_task = asyncio.create_task(worker.run()) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 8f75b56..01a4859 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -26,7 +26,7 @@ def init(cls): # Fast API 에서 Publish async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: - await self.redis_client.xadd(stream_name, fields) + return await self.redis_client.xadd(stream_name, fields) # Group 단위로 읽어오기 async def xreadgroup( @@ -35,44 +35,56 @@ async def xreadgroup( consumer_name: str, stream_name: str, count: Optional[int] = None, - block: Optional[int] = None, - id: str = ">", + block: Optional[int] = None, # ms 단위 + id: str = ">", # 새 메시지만 읽기 ) -> List[tuple]: try: - # Create the consumer group ( 존재하지 않을 때 ) - self.redis_client.xgroup_create(stream_name, group_name, id="$", mkstream=True) + # Create the consumer group (존재하지 않을 때) + self.redis_client.xgroup_create( + stream_name, group_name, id="$", mkstream=True + ) except redis.exceptions.ResponseError as e: # 이미 존재할 때 if "BUSYGROUP" not in str(e): - raise e + raise - response = self.redis_client.xreadgroup( - groupname=group_name, - consumername=consumer_name, - streams={stream_name: id}, + streams = {stream_name: id} + response = await self.redis_client.xreadgroup( + group_name, # groupname (positional) + consumer_name, # consumername (positional) + streams, # {stream: id} (positional) count=count, block=block, ) return response # Consumer 처리 완료 - def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: - return self.redis_client.xack(stream_name, group_name, *message_ids) + async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: - acked_count = self.redis_client.xack(stream_name, group_name, *message_ids) + acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) if acked_count > 0: - self.redis_client.xdel(stream_name, *message_ids) + await self.redis_client.xdel(stream_name, *message_ids) return acked_count + async def xgroup_create(self, stream_name: str, group_name: str, id: str = "$") -> bool: + try: + self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) + return True + except redis.exceptions.ResponseError as e: + if "BUSYGROUP" in str(e): + print(f"Consumer group '{group_name}' already exists.") + return False + raise e # 메시지 재처리 지원 - def xclaim( + async def xclaim( self, stream_name: str, group_name: str, @@ -81,15 +93,15 @@ def xclaim( message_ids: List[str], ) -> List[tuple]: - return self.redis_client.xclaim( - name=stream_name, - groupname=group_name, - consumername=consumer_name, + return await self.redis_client.xclaim( + stream_name=stream_name, + group_name=group_name, + consumer_name=consumer_name, min_idle_time=min_idle_time, message_ids=message_ids, ) - def close(self): - self.redis_client.disconnect() + async def aclose(self): + await self.redis_client.close() redis_client = RedisStreamClient.init() \ No newline at end of file diff --git a/app/worker/worker.py b/app/worker/worker.py index fc9966d..fd398fc 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -17,9 +17,9 @@ async def run(self): while True: try: resp = await self.redis_client.xreadgroup( - groupname=settings.GROUP_NAME, - consumername=settings.CONSUMER_NAME, - streams={settings.STREAM_JOB: ">"}, + group_name=settings.GROUP_NAME, + consumer_name=settings.CONSUMER_NAME, + stream_name={settings.STREAM_JOB: ">"}, count=10, block=5000, ) From 46c32a104bf8c63995d24ab633936be161ce730b Mon Sep 17 00:00:00 2001 From: youyeon11 Date: Wed, 3 Sep 2025 17:37:47 +0900 Subject: [PATCH 8/9] =?UTF-8?q?fix:=20predict=20=ED=95=A8=EC=88=98=20async?= =?UTF-8?q?=20=EC=82=AD=EC=A0=9C(Pillow,=20pytorch=EC=99=80=EC=9D=98=20?= =?UTF-8?q?=ED=98=B8=ED=99=98)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/predictor_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/predictor_service.py b/app/services/predictor_service.py index caa94c2..175003c 100644 --- a/app/services/predictor_service.py +++ b/app/services/predictor_service.py @@ -54,7 +54,7 @@ def _load_model(self, model_path: Path) -> LightCNN: model.eval() return model - async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: + def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: image = Image.open(stream_file).convert('RGB') input_tensor = self.transform(image).unsqueeze(0).to(self.device) From 437caf4c556aafc3f4ee0eeb8de4f47059000195 Mon Sep 17 00:00:00 2001 From: youyeon11 Date: Wed, 3 Sep 2025 17:38:32 +0900 Subject: [PATCH 9/9] =?UTF-8?q?fix:=20msg=20List->=20str=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/redis_client.py | 2 +- app/worker/worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 01a4859..4f56c75 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -63,7 +63,7 @@ async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + async def xack_and_del(self, stream_name: str, group_name: str, message_ids: str) -> int: acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) diff --git a/app/worker/worker.py b/app/worker/worker.py index fd398fc..2db59b1 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -19,7 +19,7 @@ async def run(self): resp = await self.redis_client.xreadgroup( group_name=settings.GROUP_NAME, consumer_name=settings.CONSUMER_NAME, - stream_name={settings.STREAM_JOB: ">"}, + stream_name=settings.STREAM_JOB, count=10, block=5000, )