Skip to content
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ MANIFEST
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
# Unit tests / coverage reports
htmlcov/
.tox/
.nox/
Expand Down
2 changes: 1 addition & 1 deletion app/api/endpoints/predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_redis_client(request: Request) -> redis.Redis:
async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)):
correlation_id = str(uuid.uuid4())
job = ImageJob(
correlationId=correlationId,
correlationId=correlation_id,
presignedUrl=job_request.presigned_url,
replyQueue=settings.STREAM_RESULT,
callbackUrl=None,
Expand Down
22 changes: 10 additions & 12 deletions app/core/lifespan.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@

import asyncio
from contextlib import asynccontextmanager
import redis.asyncio as redis

import anyio.to_thread
import redis
from fastapi import FastAPI

from app.worker.redis_client import redis_client
from app.core.config import settings
from app.worker.worker import JobWorker

@asynccontextmanager
async def lifespan(app: FastAPI):
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

스레드 제한 수(100)가 하드코딩되어 있습니다. 이러한 값은 애플리케이션 설정(app/core/config.py)으로 옮겨 관리하는 것이 유지보수 측면에서 더 좋습니다.


try:
await redis_client.xgroup_create(
name=settings.STREAM_JOB,
groupname=settings.GROUP_NAME,
id="$",
mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
redis_client.xgroup_create(
stream_name=settings.STREAM_JOB,
group_name=settings.GROUP_NAME,
)
Comment on lines +18 to +21

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

redis_client.xgroup_create 메서드는 코루틴이므로 await 키워드를 사용하여 호출해야 합니다. 그렇지 않으면 메서드가 실행되지 않고 coroutine object is not awaited 런타임 경고가 발생합니다.

Suggested change
redis_client.xgroup_create(
stream_name=settings.STREAM_JOB,
group_name=settings.GROUP_NAME,
)
await redis_client.xgroup_create(
stream_name=settings.STREAM_JOB,
group_name=settings.GROUP_NAME,
)


worker = JobWorker(redis_client)
worker_task = asyncio.create_task(worker.run())
Expand Down
5 changes: 3 additions & 2 deletions app/services/predictor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from PIL import Image
import json
from pathlib import Path
from io import BytesIO

class LightCNN(nn.Module):
def __init__(self, num_classes):
Expand Down Expand Up @@ -53,8 +54,8 @@ def _load_model(self, model_path: Path) -> LightCNN:
model.eval()
return model

def predict(self, image_path: Path) -> tuple[str, str, float]:
image = Image.open(image_path).convert('RGB')
def predict(self, stream_file: BytesIO) -> tuple[str, str, float]:
image = Image.open(stream_file).convert('RGB')
input_tensor = self.transform(image).unsqueeze(0).to(self.device)

with torch.no_grad():
Expand Down
8 changes: 4 additions & 4 deletions app/services/s3_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

import boto3
from botocore.config import Config as BotoConfig
from pathlib import Path
import requests
from io import BytesIO

from app.core.config import settings

Expand All @@ -18,10 +18,10 @@ def __init__(self):
),
)

def download_file_from_presigned_url(self, presigned_url: str, destination: Path):
def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO:
response = requests.get(presigned_url)
response.raise_for_status()
with open(destination, "wb") as f:
f.write(response.content)

return BytesIO(response.content) # response 안의 content Stream으로 처리
Comment on lines +21 to +25

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

requests.get()은 동기적인(blocking) I/O 호출이므로, 비동기 애플리케이션의 이벤트 루프를 막을 수 있습니다. 비동기 HTTP 클라이언트 라이브러리인 httpxaiohttp를 사용하여 이 부분을 비동기적으로 구현하는 것을 강력히 권장합니다. 이렇게 하면 asyncio.to_thread를 사용하지 않고도 효율적인 I/O 처리가 가능합니다.

예시:

import httpx
from io import BytesIO

async def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO:
    async with httpx.AsyncClient() as client:
        response = await client.get(presigned_url)
        response.raise_for_status()
        return BytesIO(response.content)


s3_service = S3Service()
107 changes: 102 additions & 5 deletions app/worker/redis_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,107 @@
import asyncio
import redis

from redis.asyncio import Redis as AsyncRedis
from pydantic import BaseModel, Field
from typing import Any, Dict
from typing import Any, Dict, Optional, List
from app.core.config import settings

redis_client = redis.asyncio.from_url(settings.REDIS_URL, decode_responses=True)

# Redis Stream Definition
class PublishRequest(BaseModel):
stream: str = Field(default=settings.JOB_STREAM, description="Redis Stream Job name")
payload: Dict[str, Any]
stream: str = Field(default=settings.STREAM_JOB, description="Redis Stream Job name")
payload: Dict[str, Any]

class RedisStreamClient:

def __init__(self):
self.redis_client = AsyncRedis.from_url(
url=settings.REDIS_URL,
decode_responses=True,
)

@classmethod
def init(cls):
broker = cls()
return broker

# Fast API 에서 Publish
async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str:
return await self.redis_client.xadd(stream_name, fields)

# Group 단위로 읽어오기
async def xreadgroup(
self,
group_name: str,
consumer_name: str,
stream_name: str,
count: Optional[int] = None,
block: Optional[int] = None, # ms 단위
id: str = ">", # 새 메시지만 읽기
) -> List[tuple]:
try:
# Create the consumer group (존재하지 않을 때)
self.redis_client.xgroup_create(
stream_name, group_name, id="$", mkstream=True
)
except redis.exceptions.ResponseError as e:
# 이미 존재할 때
if "BUSYGROUP" not in str(e):
raise
Comment on lines +41 to +49

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

xreadgroup을 호출할 때마다 소비자 그룹이 존재하는지 확인하고 생성하는 로직은 비효율적입니다. 그룹 생성은 애플리케이션 시작 시 lifespan 컨텍스트에서 한 번만 수행하는 것이 좋습니다. 또한 43행의 self.redis_client.xgroup_create 호출에 await가 누락되어 코드가 올바르게 동작하지 않습니다. 이 try-except 블록은 제거하는 것이 바람직합니다.


streams = {stream_name: id}
response = await self.redis_client.xreadgroup(
group_name, # groupname (positional)
consumer_name, # consumername (positional)
streams, # {stream: id} (positional)
count=count,
block=block,
)
return response

# Consumer 처리 완료
async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int:
return await self.redis_client.xack(stream_name, group_name, *message_ids)

# 완료 시 삭제
async def xack_and_del(self, stream_name: str, group_name: str, message_ids: str) -> int:

acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids)

# XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL)
if acked_count > 0:
await self.redis_client.xdel(stream_name, *message_ids)

return acked_count

async def xgroup_create(self, stream_name: str, group_name: str, id: str = "$") -> bool:
try:
self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

self.redis_clientAsyncRedis 인스턴스이므로, xgroup_create 메서드는 코루틴입니다. await 키워드를 사용하여 호출해야 합니다.

Suggested change
self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True)
await self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True)

return True
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
print(f"Consumer group '{group_name}' already exists.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

애플리케이션 로깅에 print()를 사용하는 것보다 표준 logging 모듈을 사용하는 것이 좋습니다. 로깅 레벨, 포맷, 출력 위치(파일, 콘솔 등)를 유연하게 관리할 수 있어 운영 및 디버깅에 더 유리합니다.

return False
raise e

# 메시지 재처리 지원
async def xclaim(
self,
stream_name: str,
group_name: str,
consumer_name: str,
min_idle_time: int,
message_ids: List[str],
) -> List[tuple]:

return await self.redis_client.xclaim(
stream_name=stream_name,
group_name=group_name,
consumer_name=consumer_name,
min_idle_time=min_idle_time,
message_ids=message_ids,
)

async def aclose(self):
await self.redis_client.close()

redis_client = RedisStreamClient.init()
12 changes: 3 additions & 9 deletions app/worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

import asyncio
import json
from pathlib import Path
import redis.asyncio as redis
from datetime import datetime

Expand All @@ -13,13 +11,10 @@
async def process_image_scan(job: ImageJob, redis_client: redis.Redis):
correlation_id = job.correlationId
print(f"[task] Start image scan for job_id={correlation_id}")

temp_image_path = Path(f"/tmp/{correlation_id}.jpg")

try:
s3_service.download_file_from_presigned_url(job.presignedUrl, temp_image_path)
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

asyncio.to_thread의 사용법이 잘못되었습니다. to_thread는 첫 번째 인자로 실행할 동기 함수를 받고, 나머지 인자들은 그 함수에 전달될 인자들입니다. 현재 코드는 s3_service.download_file_from_presigned_url 함수를 먼저 호출하여 이벤트 루프를 블로킹하고, 그 반환 값을 to_thread로 넘기고 있습니다.

Suggested change
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl))
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url, job.presignedUrl)


pill_name, label, confidence = predictor_service.predict(temp_image_path)
pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

asyncio.to_thread는 동기 함수를 별도의 스레드에서 실행하기 위한 것입니다. 하지만 predictor_service.predictasync def로 정의된 코루틴이므로 to_thread로 호출할 수 없습니다. 이는 TypeError를 발생시킵니다. predictor_service.predict를 일반 동기 함수로 수정한 후 to_thread를 올바르게 사용해야 합니다.

Suggested change
pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file))
pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict, stream_file)


finished_at = datetime.utcnow().isoformat()

Expand All @@ -43,5 +38,4 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis):
except Exception as e:
print(f"[task] Failed to process job_id={correlation_id}: {e}")
finally:
if temp_image_path.exists():
temp_image_path.unlink()
print(f"[task] Image scan finished for job_id={correlation_id}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

이 로그 메시지는 36행에 이미 존재하여 중복됩니다. finally 블록은 항상 실행되므로 36행의 로그를 제거하고 이 로그만 남기는 것이 더 깔끔합니다. 또한 print 대신 logging 모듈 사용을 고려해보세요.

35 changes: 24 additions & 11 deletions app/worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

import asyncio
import redis.asyncio as redis
from app.core.config import Settings
from app.worker.redis_client import redis_client
from app.core.config import settings
from app.schemas.job import ImageJob
from app.worker.tasks import process_image_scan

class JobWorker:
def __init__(self, redis_client: redis.Redis):
def __init__(self, redis_client: redis_client):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

redis_client의 타입 힌트가 redis_client 인스턴스 자체로 되어 있습니다. RedisStreamClient 클래스로 지정해야 올바른 타입 힌트가 됩니다. 파일 상단에 from app.worker.redis_client import RedisStreamClient를 추가하고 타입 힌트를 수정해주세요.

Suggested change
def __init__(self, redis_client: redis_client):
def __init__(self, redis_client: "RedisStreamClient"):

self.redis_client = redis_client

async def run(self):
Expand All @@ -17,9 +17,9 @@ async def run(self):
while True:
try:
resp = await self.redis_client.xreadgroup(
groupname=settings.GROUP_NAME,
consumername=settings.CONSUMER_NAME,
streams={settings.STREAM_JOB: ">"},
group_name=settings.GROUP_NAME,
consumer_name=settings.CONSUMER_NAME,
stream_name=settings.STREAM_JOB,
count=10,
block=5000,
)
Expand All @@ -28,9 +28,15 @@ async def run(self):
for msg_id, fields in entries:
try:
job = ImageJob.model_validate_json(fields["json"])
await process_image_scan(job, redis_client)
# 처리 성공 시에만 ack
await redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
task = asyncio.create_task(process_image_scan(job, redis_client))
print(f"[worker] {task} 발행 성공")
# 처리 성공 시에만 ack 후 del
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
Comment on lines +34 to +39

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

self.redis_client.xack_and_del 메서드는 message_ids 인자로 문자열 리스트(List[str])를 기대하지만, 단일 문자열 msg_id가 전달되고 있습니다. [msg_id]와 같이 리스트로 감싸서 전달해야 합니다.

Suggested change
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, [msg_id])
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))

except asyncio.CancelledError:
# 취소되면 재전송되도록 ack 하지 않음
raise
Expand All @@ -54,8 +60,15 @@ async def run(self):
for msg_id, fields in claimed:
try:
job = ImageJob.model_validate_json(fields["json"])
asyncio.create_task(process_image_scan(job, self.redis_client))
await self.redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
task = asyncio.create_task(process_image_scan(job, redis_client))
print(f"[worker] {task} 발행 성공")
# 처리 성공 시에만 ack 후 del
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
Comment on lines +66 to +71

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

여기에서도 self.redis_client.xack_and_del 메서드에 msg_id를 리스트([msg_id])로 전달해야 합니다. 또한, 28-48행의 로직과 이 부분이 거의 동일하므로, 중복을 제거하기 위해 별도의 헬퍼 메서드로 추출하는 것을 고려해보세요.

Suggested change
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, [msg_id])
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))

except Exception as e:
await self.redis_client.xadd(
f"{settings.STREAM_JOB}:DLQ",
Expand Down
Empty file added tests/__init__.py
Empty file.
39 changes: 39 additions & 0 deletions tests/s3_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import io
import pytest
from unittest.mock import patch, MagicMock

from app.services.s3_service import s3_service

class TestDownloadFile:
@patch("app.services.s3_service.requests.get")
def test_download_file_success(self, mock_get):
# arrange
mock_get.return_value = MagicMock()
mock_get.return_value.content = b"test file content"
mock_get.return_value.raise_for_status = MagicMock()

# act
obj = s3_service
result = obj.download_file_from_presigned_url(
"http://fake-url.com"
)

# assert
assert isinstance(result, io.BytesIO)
assert result.getvalue() == b"test file content"
mock_get.assert_called_once_with(
"http://fake-url.com"
)
mock_get.return_value.raise_for_status.assert_called_once()

@patch("app.services.s3_service.requests.get")
def test_download_file_http_error(self, mock_get):
# arrange
mock_get.return_value = MagicMock()
mock_get.return_value.raise_for_status.side_effect = Exception("HTTP Error")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

HTTP 오류를 테스트할 때 requests.exceptions.HTTPError와 같이 더 구체적인 예외 타입을 확인하는 것이 좋습니다. Exception은 너무 광범위하여 의도치 않은 다른 예외 상황에서도 테스트가 통과될 수 있습니다.


obj = s3_service

# act & assert
with pytest.raises(Exception, match="HTTP Error"):
obj.download_file_from_presigned_url("http://fake-url.com")
10 changes: 10 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from fastapi.testclient import TestClient

from app.main import app

client = TestClient(app=app)

def test_health_check():
response = client.get("/api/v1/health")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

app/main.py에 정의된 health check 엔드포인트는 /health입니다. 테스트 코드에서 /api/v1/health로 잘못된 URL을 호출하고 있습니다.

Suggested change
response = client.get("/api/v1/health")
response = client.get("/health")

assert response.status_code == 200
assert response.json() == {"status": "ok"}