From eb4933649dd61f82fa3b1b3dd123b5435659efa5 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 31 Aug 2025 02:34:14 +0900 Subject: [PATCH 01/31] =?UTF-8?q?feat:=20=EB=AC=B4=EC=A4=91=EB=8B=A8=20?= =?UTF-8?q?=EB=B0=B0=ED=8F=AC=20=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8=20?= =?UTF-8?q?=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/deploy.sh | 78 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 deployment/deploy.sh diff --git a/deployment/deploy.sh b/deployment/deploy.sh new file mode 100644 index 0000000..39c4ac0 --- /dev/null +++ b/deployment/deploy.sh @@ -0,0 +1,78 @@ +#!/bin/bash + +set -e + +ERR_MSG='' + +trap 'echo "Error occured: $ERR_MSG. Exiting deploy script."; exit 1' ERR + + +# 현재 포트 파악 +if sudo docker ps --filter "name=app-blue" --quiet | grep -E .; then + echo "Blue down, Green Up " + BEFORE_COMPOSE_COLOR="blue" + AFTER_COMPOSE_COLOR="green" + HOST_PORT="8001" +else + echo "Green down, Blue up" + BEFORE_COMPOSE_COLOR="green" + AFTER_COMPOSE_COLOR="blue" + HOST_PORT="8000" +fi + +echo "Pulling image: ${IMAGE}" +# docker pull +docker pull ${ECR_URI}/dearbelly-cv:latest + +# 새 컨테이너 실행 +docker run --gpus all -d \ + --name "app-${AFTER_COLOR}" \ + --env-file ./.env \ + -p "${HOST_PORT}:8000" \ + ${ECR_URI}/dearbelly-cv:latest + +# 새 컨테이너가 running 될 때까지 대기 +for i in $(seq 1 60); do + if docker ps --filter "name=^/app-${AFTER_COLOR}$" --filter "status=running" --format '{{.Names}}' | grep -q .; then + echo "New app-${AFTER_COLOR} container is running." + break + fi + sleep 1 + if [ "$i" -eq 60 ]; then + echo "New container failed to start in time." >&2 + exit 1 + fi +done + +# 새로운 컨테이너 확인 후 Nginx 설정 변경 +if docker ps --filter "name=app-${AFTER_COMPOSE_COLOR}" --filter "status=running" --format '{{.Names}}' | grep -q .; then + echo "New app-${AFTER_COMPOSE_COLOR} container is running." + # reload nginx + NGINX_ID=$(sudo docker ps --filter "name=nginx" --quiet) + NGINX_CONFIG="/home/ubuntu/deployment/nginx.conf" + + echo "Switching Nginx upstream config..." + if ! sed -i "s/app-${BEFORE_COMPOSE_COLOR}:8000/app-${AFTER_COMPOSE_COLOR}:8000/" $NGINX_CONFIG; then + echo "Error occured: Failed to update Nginx config. Exiting deploy script." + exit 1 + fi + + echo "Reloding Nginx in Container" + if ! docker exec $NGINX_ID nginx -s reload; then + ERR_MSG='Failed to update Nginx config' + exit 1 + fi + + if ! docker compose restart nginx; then + ERR_MSG='Failed to reload Nginx' + exit 1 + fi + + # 이전 컨테이너 종료 + docker stop app-${BEFORE_COMPOSE_COLOR} + docker rm app-${BEFORE_COMPOSE_COLOR} + docker image prune -af +fi + +echo "Deployment success." +exit 0 \ No newline at end of file From 999c6ce12c481670f045b62e0247e6579c29ea2d Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 31 Aug 2025 02:34:26 +0900 Subject: [PATCH 02/31] =?UTF-8?q?feat:=20=ED=8F=AC=ED=8A=B8=20=EB=A7=A4?= =?UTF-8?q?=ED=95=91=EC=9D=84=20=EC=9C=84=ED=95=9C=20Nginx=20=EC=84=A4?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/nginx.conf | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 deployment/nginx.conf diff --git a/deployment/nginx.conf b/deployment/nginx.conf new file mode 100644 index 0000000..a4bff5b --- /dev/null +++ b/deployment/nginx.conf @@ -0,0 +1,21 @@ +events {} + +http { + upstream backend { + server app-blue:8000; + } + + server { + listen 80; + + location / { + proxy_pass http://backend/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + } + } +} \ No newline at end of file From 494d3c142b67d3483b61e76f1ddeacf39d6203ec Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 31 Aug 2025 02:34:38 +0900 Subject: [PATCH 03/31] =?UTF-8?q?feat:=20=EC=8A=A4=ED=81=AC=EB=A6=BD?= =?UTF-8?q?=ED=8A=B8=20=EC=8B=A4=ED=96=89=EC=9C=BC=EB=A1=9C=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/workflow.yml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 09809c7..fbabb99 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -37,6 +37,14 @@ jobs: - name: Checkout Code uses: actions/checkout@v4 + - name: Transfer deploy file to EC2 + uses: appleboy/scp-action@master + with: + host: ${{ secrets.REMOTE_HOST }} + username: ${{ secrets.REMOTE_USER }} + key: ${{ secrets.SSH_PRIVATE_KEY }} + source: ./deployment/ + target: /home/ubuntu/deployment deploy: name: Deploy @@ -52,8 +60,8 @@ jobs: port: 22 script: | # 1. cd - mkdir -p ~/dearbelly - cd ~/dearbelly + mkdir -p ~/dearbelly/deployment + cd ~/dearbelly/deployment # 2. .env file echo "${{ secrets.ENV }}" > .env @@ -69,5 +77,5 @@ jobs: docker pull ${{ secrets.ECR_URI }}/dearbelly-cv:latest # 6. docker start - # TODO : 스크립트 작성 - docker run -d --name app-blue -p 8000:8000 ${{ secrets.ECR_URI }}/dearbelly-cv:latest \ No newline at end of file + chmod +x deploy.sh + source deploy.sh \ No newline at end of file From b5a123657f997f363a439fca1db9e0e4523ba70c Mon Sep 17 00:00:00 2001 From: kite_U Date: Tue, 2 Sep 2025 00:14:41 +0900 Subject: [PATCH 04/31] =?UTF-8?q?feat:=20docker-compose.yml=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/docker-compose.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 deployment/docker-compose.yml diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml new file mode 100644 index 0000000..4d256c2 --- /dev/null +++ b/deployment/docker-compose.yml @@ -0,0 +1,30 @@ +services: + app-blue: + image: ${ECR_URI}/dearbelly-dv:latest + ports: + - "8000:8000" + env_file: + - /home/ubuntu/dearbelly/deployment/.env + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + container_name: app-blue + + app-green: + image: ${ECR_URI}/dearbelly-dv:latest + ports: + - "8001:8000" + env_file: + - /home/ubuntu/dearbelly/deployment/.env + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [ gpu ] + container_name: app-green \ No newline at end of file From 71b6f134bc0d7158223d59e8327cb59715e7424c Mon Sep 17 00:00:00 2001 From: kite_U Date: Tue, 2 Sep 2025 00:17:31 +0900 Subject: [PATCH 05/31] =?UTF-8?q?feat:=20docker-compose.yml=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/deploy.sh | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/deployment/deploy.sh b/deployment/deploy.sh index 39c4ac0..0e5933f 100644 --- a/deployment/deploy.sh +++ b/deployment/deploy.sh @@ -22,14 +22,9 @@ fi echo "Pulling image: ${IMAGE}" # docker pull -docker pull ${ECR_URI}/dearbelly-cv:latest +docker compose pull ${ECR_URI}/dearbelly-cv:latest +docker compose up -d --no-deps --force-recreate app-${AFTER_COMPOSE_COLOR} -# 새 컨테이너 실행 -docker run --gpus all -d \ - --name "app-${AFTER_COLOR}" \ - --env-file ./.env \ - -p "${HOST_PORT}:8000" \ - ${ECR_URI}/dearbelly-cv:latest # 새 컨테이너가 running 될 때까지 대기 for i in $(seq 1 60); do From 3982bac7b94be7301e30db76d3f18e16f144888c Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 00:24:41 +0900 Subject: [PATCH 06/31] =?UTF-8?q?fix:=20docker=20compose=20pull=20?= =?UTF-8?q?=EC=9D=B4=EB=AF=B8=EC=A7=80=EB=AA=85=20->=20=EC=84=9C=EB=B9=84?= =?UTF-8?q?=EC=8A=A4=EB=AA=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/deploy.sh | 26 +++----------------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/deployment/deploy.sh b/deployment/deploy.sh index 0e5933f..7cbf316 100644 --- a/deployment/deploy.sh +++ b/deployment/deploy.sh @@ -22,12 +22,12 @@ fi echo "Pulling image: ${IMAGE}" # docker pull -docker compose pull ${ECR_URI}/dearbelly-cv:latest +docker compose pull app-${AFTER_COMPOSE_COLOR} docker compose up -d --no-deps --force-recreate app-${AFTER_COMPOSE_COLOR} # 새 컨테이너가 running 될 때까지 대기 -for i in $(seq 1 60); do +for i in $(seq 1 600); do if docker ps --filter "name=^/app-${AFTER_COLOR}$" --filter "status=running" --format '{{.Names}}' | grep -q .; then echo "New app-${AFTER_COLOR} container is running." break @@ -50,24 +50,4 @@ if docker ps --filter "name=app-${AFTER_COMPOSE_COLOR}" --filter "status=running if ! sed -i "s/app-${BEFORE_COMPOSE_COLOR}:8000/app-${AFTER_COMPOSE_COLOR}:8000/" $NGINX_CONFIG; then echo "Error occured: Failed to update Nginx config. Exiting deploy script." exit 1 - fi - - echo "Reloding Nginx in Container" - if ! docker exec $NGINX_ID nginx -s reload; then - ERR_MSG='Failed to update Nginx config' - exit 1 - fi - - if ! docker compose restart nginx; then - ERR_MSG='Failed to reload Nginx' - exit 1 - fi - - # 이전 컨테이너 종료 - docker stop app-${BEFORE_COMPOSE_COLOR} - docker rm app-${BEFORE_COMPOSE_COLOR} - docker image prune -af -fi - -echo "Deployment success." -exit 0 \ No newline at end of file + fi \ No newline at end of file From ff22c6e18cec9505a5605fe7537d845df7521852 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 00:24:50 +0900 Subject: [PATCH 07/31] =?UTF-8?q?fix:=20=EC=9D=B4=EB=AF=B8=EC=A7=80=20?= =?UTF-8?q?=EC=9D=B4=EB=A6=84=20=EC=98=A4=ED=83=80=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 4d256c2..b7d1c2b 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -1,6 +1,6 @@ services: app-blue: - image: ${ECR_URI}/dearbelly-dv:latest + image: ${ECR_URI}/dearbelly-cv:latest ports: - "8000:8000" env_file: @@ -15,7 +15,7 @@ services: container_name: app-blue app-green: - image: ${ECR_URI}/dearbelly-dv:latest + image: ${ECR_URI}/dearbelly-cv:latest ports: - "8001:8000" env_file: From 4e683186e6132cdcd4643718746182270cf5d358 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:50:19 +0900 Subject: [PATCH 08/31] =?UTF-8?q?feat:=20pytest=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- tests/s3_service_test.py | 39 +++++++++++++++++++++++++++++++++++++++ tests/test_main.py | 10 ++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 tests/s3_service_test.py create mode 100644 tests/test_main.py diff --git a/.gitignore b/.gitignore index 3b1d388..debaf2a 100644 --- a/.gitignore +++ b/.gitignore @@ -40,7 +40,7 @@ MANIFEST pip-log.txt pip-delete-this-directory.txt -# Unit test / coverage reports +# Unit tests / coverage reports htmlcov/ .tox/ .nox/ diff --git a/tests/s3_service_test.py b/tests/s3_service_test.py new file mode 100644 index 0000000..fe884b2 --- /dev/null +++ b/tests/s3_service_test.py @@ -0,0 +1,39 @@ +import io +import pytest +from unittest.mock import patch, MagicMock + +from app.services.s3_service import s3_service + +class TestDownloadFile: + @patch("app.services.s3_service.requests.get") + def test_download_file_success(self, mock_get): + # arrange + mock_get.return_value = MagicMock() + mock_get.return_value.content = b"test file content" + mock_get.return_value.raise_for_status = MagicMock() + + # act + obj = s3_service + result = obj.download_file_from_presigned_url( + "http://fake-url.com" + ) + + # assert + assert isinstance(result, io.BytesIO) + assert result.getvalue() == b"test file content" + mock_get.assert_called_once_with( + "http://fake-url.com" + ) + mock_get.return_value.raise_for_status.assert_called_once() + + @patch("app.services.s3_service.requests.get") + def test_download_file_http_error(self, mock_get): + # arrange + mock_get.return_value = MagicMock() + mock_get.return_value.raise_for_status.side_effect = Exception("HTTP Error") + + obj = s3_service + + # act & assert + with pytest.raises(Exception, match="HTTP Error"): + obj.download_file_from_presigned_url("http://fake-url.com") \ No newline at end of file diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..d0ad861 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,10 @@ +from fastapi.testclient import TestClient + +from app.main import app + +client = TestClient(app=app) + +def test_health_check(): + response = client.get("/api/v1/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} \ No newline at end of file From fcb9b54dd10e596b626d78c742d0a214efa9d668 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:50:27 +0900 Subject: [PATCH 09/31] =?UTF-8?q?feat:=20pytest=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From 2dbc3a914f1893bf9660fbeeeff368e30528736d Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:51:35 +0900 Subject: [PATCH 10/31] =?UTF-8?q?fix:=20S3=EC=9D=98=20=EC=9D=B4=EB=AF=B8?= =?UTF-8?q?=EC=A7=80=20Path=20=EC=A0=80=EC=9E=A5=20=EC=82=AD=EC=A0=9C=20->?= =?UTF-8?q?=20=EC=9D=B4=EB=AF=B8=EC=A7=80=20=EC=9E=90=EC=B2=B4=EB=A5=BC=20?= =?UTF-8?q?=EC=9D=B8=EC=9E=90=EB=A1=9C=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 2 +- app/services/s3_service.py | 8 ++++---- app/worker/tasks.py | 12 +++--------- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index bfa2e55..06fd1b2 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -14,7 +14,7 @@ def get_redis_client(request: Request) -> redis.Redis: async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): correlation_id = str(uuid.uuid4()) job = ImageJob( - correlationId=correlationId, + correlationId=correlation_id, presignedUrl=job_request.presigned_url, replyQueue=settings.STREAM_RESULT, callbackUrl=None, diff --git a/app/services/s3_service.py b/app/services/s3_service.py index b85d4fc..8cf7719 100644 --- a/app/services/s3_service.py +++ b/app/services/s3_service.py @@ -1,8 +1,8 @@ import boto3 from botocore.config import Config as BotoConfig -from pathlib import Path import requests +from io import BytesIO from app.core.config import settings @@ -18,10 +18,10 @@ def __init__(self): ), ) - def download_file_from_presigned_url(self, presigned_url: str, destination: Path): + def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO: response = requests.get(presigned_url) response.raise_for_status() - with open(destination, "wb") as f: - f.write(response.content) + + return BytesIO(response.content) # response 안의 content Stream으로 처리 s3_service = S3Service() diff --git a/app/worker/tasks.py b/app/worker/tasks.py index f3b2eed..a2a3bf2 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -1,7 +1,5 @@ import asyncio -import json -from pathlib import Path import redis.asyncio as redis from datetime import datetime @@ -13,13 +11,10 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis): correlation_id = job.correlationId print(f"[task] Start image scan for job_id={correlation_id}") - - temp_image_path = Path(f"/tmp/{correlation_id}.jpg") - try: - s3_service.download_file_from_presigned_url(job.presignedUrl, temp_image_path) + stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl)) - pill_name, label, confidence = predictor_service.predict(temp_image_path) + pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file)) finished_at = datetime.utcnow().isoformat() @@ -43,5 +38,4 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis): except Exception as e: print(f"[task] Failed to process job_id={correlation_id}: {e}") finally: - if temp_image_path.exists(): - temp_image_path.unlink() + print(f"[task] Image scan finished for job_id={correlation_id}") From c4d5482eedee301639e7c0fd4757a3cbaf8edff2 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:53:38 +0900 Subject: [PATCH 11/31] =?UTF-8?q?fix:=20S3=EC=9D=98=20=EC=9D=B4=EB=AF=B8?= =?UTF-8?q?=EC=A7=80=20Path=20=EC=A0=80=EC=9E=A5=20=EC=82=AD=EC=A0=9C=20->?= =?UTF-8?q?=20=EC=9D=B4=EB=AF=B8=EC=A7=80=20=EC=9E=90=EC=B2=B4=EB=A5=BC=20?= =?UTF-8?q?=EC=9D=B8=EC=9E=90=EB=A1=9C=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/predictor_service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/services/predictor_service.py b/app/services/predictor_service.py index c06efcd..caa94c2 100644 --- a/app/services/predictor_service.py +++ b/app/services/predictor_service.py @@ -6,6 +6,7 @@ from PIL import Image import json from pathlib import Path +from io import BytesIO class LightCNN(nn.Module): def __init__(self, num_classes): @@ -53,8 +54,8 @@ def _load_model(self, model_path: Path) -> LightCNN: model.eval() return model - def predict(self, image_path: Path) -> tuple[str, str, float]: - image = Image.open(image_path).convert('RGB') + async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: + image = Image.open(stream_file).convert('RGB') input_tensor = self.transform(image).unsqueeze(0).to(self.device) with torch.no_grad(): From a47a0569bb8aad117acfb44fb2cac3a6896021c8 Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:54:02 +0900 Subject: [PATCH 12/31] =?UTF-8?q?fix:=20=ED=95=98=EB=82=98=EC=9D=98=20?= =?UTF-8?q?=EA=B3=B5=ED=86=B5=EB=90=9C=20AsyncRedis=EB=A5=BC=20=EC=82=AC?= =?UTF-8?q?=EC=9A=A9=ED=95=98=EB=8F=84=EB=A1=9D=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/lifespan.py | 4 +- app/worker/redis_client.py | 95 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 6 deletions(-) diff --git a/app/core/lifespan.py b/app/core/lifespan.py index 9bdc15f..528e0bd 100644 --- a/app/core/lifespan.py +++ b/app/core/lifespan.py @@ -4,12 +4,14 @@ import redis.asyncio as redis from fastapi import FastAPI +from app.worker.redis_client import redis_client from app.core.config import settings from app.worker.worker import JobWorker @asynccontextmanager async def lifespan(app: FastAPI): - redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True) + limiter = asyncio.to_thread.current_default_thread_limiter() + limiter.total_tokens = 50 try: await redis_client.xgroup_create( diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 8b7339b..8f75b56 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -1,10 +1,95 @@ +import asyncio import redis + +from redis.asyncio import Redis as AsyncRedis from pydantic import BaseModel, Field -from typing import Any, Dict +from typing import Any, Dict, Optional, List from app.core.config import settings -redis_client = redis.asyncio.from_url(settings.REDIS_URL, decode_responses=True) - +# Redis Stream Definition class PublishRequest(BaseModel): - stream: str = Field(default=settings.JOB_STREAM, description="Redis Stream Job name") - payload: Dict[str, Any] \ No newline at end of file + stream: str = Field(default=settings.STREAM_JOB, description="Redis Stream Job name") + payload: Dict[str, Any] + +class RedisStreamClient: + + def __init__(self): + self.redis_client = AsyncRedis.from_url( + url=settings.REDIS_URL, + decode_responses=True, + ) + + @classmethod + def init(cls): + broker = cls() + return broker + + # Fast API 에서 Publish + async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: + await self.redis_client.xadd(stream_name, fields) + + # Group 단위로 읽어오기 + async def xreadgroup( + self, + group_name: str, + consumer_name: str, + stream_name: str, + count: Optional[int] = None, + block: Optional[int] = None, + id: str = ">", + ) -> List[tuple]: + try: + # Create the consumer group ( 존재하지 않을 때 ) + self.redis_client.xgroup_create(stream_name, group_name, id="$", mkstream=True) + except redis.exceptions.ResponseError as e: + # 이미 존재할 때 + if "BUSYGROUP" not in str(e): + raise e + + response = self.redis_client.xreadgroup( + groupname=group_name, + consumername=consumer_name, + streams={stream_name: id}, + count=count, + block=block, + ) + return response + + # Consumer 처리 완료 + def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + return self.redis_client.xack(stream_name, group_name, *message_ids) + + # 완료 시 삭제 + def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + + acked_count = self.redis_client.xack(stream_name, group_name, *message_ids) + + # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) + if acked_count > 0: + self.redis_client.xdel(stream_name, *message_ids) + + return acked_count + + + # 메시지 재처리 지원 + def xclaim( + self, + stream_name: str, + group_name: str, + consumer_name: str, + min_idle_time: int, + message_ids: List[str], + ) -> List[tuple]: + + return self.redis_client.xclaim( + name=stream_name, + groupname=group_name, + consumername=consumer_name, + min_idle_time=min_idle_time, + message_ids=message_ids, + ) + + def close(self): + self.redis_client.disconnect() + +redis_client = RedisStreamClient.init() \ No newline at end of file From f7376e2f53054261a082694a3bba05c118b3513d Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 03:54:39 +0900 Subject: [PATCH 13/31] =?UTF-8?q?feat:=20XACK=20=EC=9D=B4=ED=9B=84=20XDEL?= =?UTF-8?q?=EC=9D=84=20=ED=86=B5=ED=95=9C=20=EC=99=84=EB=A3=8C=EC=97=90=20?= =?UTF-8?q?=EB=8C=80=ED=95=9C=20=EB=A9=94=EC=8B=9C=EC=A7=80=20=EC=82=AD?= =?UTF-8?q?=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index e48e62c..fc9966d 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -1,12 +1,12 @@ import asyncio -import redis.asyncio as redis -from app.core.config import Settings +from app.worker.redis_client import redis_client +from app.core.config import settings from app.schemas.job import ImageJob from app.worker.tasks import process_image_scan class JobWorker: - def __init__(self, redis_client: redis.Redis): + def __init__(self, redis_client: redis_client): self.redis_client = redis_client async def run(self): @@ -28,9 +28,15 @@ async def run(self): for msg_id, fields in entries: try: job = ImageJob.model_validate_json(fields["json"]) - await process_image_scan(job, redis_client) - # 처리 성공 시에만 ack - await redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) except asyncio.CancelledError: # 취소되면 재전송되도록 ack 하지 않음 raise @@ -54,8 +60,15 @@ async def run(self): for msg_id, fields in claimed: try: job = ImageJob.model_validate_json(fields["json"]) - asyncio.create_task(process_image_scan(job, self.redis_client)) - await self.redis_client.xack(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) except Exception as e: await self.redis_client.xadd( f"{settings.STREAM_JOB}:DLQ", From b915cb5676a36022f6633d118b4c0d4e182800ad Mon Sep 17 00:00:00 2001 From: kite_U Date: Wed, 3 Sep 2025 04:31:56 +0900 Subject: [PATCH 14/31] =?UTF-8?q?fix:=20Redis=EB=A5=BC=20=EB=B9=84?= =?UTF-8?q?=EB=8F=99=EA=B8=B0=20=EA=B8=B0=EB=B0=98=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=B3=80=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/lifespan.py | 22 ++++++--------- app/worker/redis_client.py | 56 +++++++++++++++++++++++--------------- app/worker/worker.py | 6 ++-- 3 files changed, 46 insertions(+), 38 deletions(-) diff --git a/app/core/lifespan.py b/app/core/lifespan.py index 528e0bd..2db715d 100644 --- a/app/core/lifespan.py +++ b/app/core/lifespan.py @@ -1,7 +1,9 @@ import asyncio from contextlib import asynccontextmanager -import redis.asyncio as redis + +import anyio.to_thread +import redis from fastapi import FastAPI from app.worker.redis_client import redis_client @@ -10,19 +12,13 @@ @asynccontextmanager async def lifespan(app: FastAPI): - limiter = asyncio.to_thread.current_default_thread_limiter() - limiter.total_tokens = 50 + limiter = anyio.to_thread.current_default_thread_limiter() + limiter.total_tokens = 100 - try: - await redis_client.xgroup_create( - name=settings.STREAM_JOB, - groupname=settings.GROUP_NAME, - id="$", - mkstream=True - ) - except redis.ResponseError as e: - if "BUSYGROUP" not in str(e): - raise + redis_client.xgroup_create( + stream_name=settings.STREAM_JOB, + group_name=settings.GROUP_NAME, + ) worker = JobWorker(redis_client) worker_task = asyncio.create_task(worker.run()) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 8f75b56..01a4859 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -26,7 +26,7 @@ def init(cls): # Fast API 에서 Publish async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: - await self.redis_client.xadd(stream_name, fields) + return await self.redis_client.xadd(stream_name, fields) # Group 단위로 읽어오기 async def xreadgroup( @@ -35,44 +35,56 @@ async def xreadgroup( consumer_name: str, stream_name: str, count: Optional[int] = None, - block: Optional[int] = None, - id: str = ">", + block: Optional[int] = None, # ms 단위 + id: str = ">", # 새 메시지만 읽기 ) -> List[tuple]: try: - # Create the consumer group ( 존재하지 않을 때 ) - self.redis_client.xgroup_create(stream_name, group_name, id="$", mkstream=True) + # Create the consumer group (존재하지 않을 때) + self.redis_client.xgroup_create( + stream_name, group_name, id="$", mkstream=True + ) except redis.exceptions.ResponseError as e: # 이미 존재할 때 if "BUSYGROUP" not in str(e): - raise e + raise - response = self.redis_client.xreadgroup( - groupname=group_name, - consumername=consumer_name, - streams={stream_name: id}, + streams = {stream_name: id} + response = await self.redis_client.xreadgroup( + group_name, # groupname (positional) + consumer_name, # consumername (positional) + streams, # {stream: id} (positional) count=count, block=block, ) return response # Consumer 처리 완료 - def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: - return self.redis_client.xack(stream_name, group_name, *message_ids) + async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: - acked_count = self.redis_client.xack(stream_name, group_name, *message_ids) + acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) if acked_count > 0: - self.redis_client.xdel(stream_name, *message_ids) + await self.redis_client.xdel(stream_name, *message_ids) return acked_count + async def xgroup_create(self, stream_name: str, group_name: str, id: str = "$") -> bool: + try: + self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) + return True + except redis.exceptions.ResponseError as e: + if "BUSYGROUP" in str(e): + print(f"Consumer group '{group_name}' already exists.") + return False + raise e # 메시지 재처리 지원 - def xclaim( + async def xclaim( self, stream_name: str, group_name: str, @@ -81,15 +93,15 @@ def xclaim( message_ids: List[str], ) -> List[tuple]: - return self.redis_client.xclaim( - name=stream_name, - groupname=group_name, - consumername=consumer_name, + return await self.redis_client.xclaim( + stream_name=stream_name, + group_name=group_name, + consumer_name=consumer_name, min_idle_time=min_idle_time, message_ids=message_ids, ) - def close(self): - self.redis_client.disconnect() + async def aclose(self): + await self.redis_client.close() redis_client = RedisStreamClient.init() \ No newline at end of file diff --git a/app/worker/worker.py b/app/worker/worker.py index fc9966d..fd398fc 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -17,9 +17,9 @@ async def run(self): while True: try: resp = await self.redis_client.xreadgroup( - groupname=settings.GROUP_NAME, - consumername=settings.CONSUMER_NAME, - streams={settings.STREAM_JOB: ">"}, + group_name=settings.GROUP_NAME, + consumer_name=settings.CONSUMER_NAME, + stream_name={settings.STREAM_JOB: ">"}, count=10, block=5000, ) From 46c32a104bf8c63995d24ab633936be161ce730b Mon Sep 17 00:00:00 2001 From: youyeon11 Date: Wed, 3 Sep 2025 17:37:47 +0900 Subject: [PATCH 15/31] =?UTF-8?q?fix:=20predict=20=ED=95=A8=EC=88=98=20asy?= =?UTF-8?q?nc=20=EC=82=AD=EC=A0=9C(Pillow,=20pytorch=EC=99=80=EC=9D=98=20?= =?UTF-8?q?=ED=98=B8=ED=99=98)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/predictor_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/predictor_service.py b/app/services/predictor_service.py index caa94c2..175003c 100644 --- a/app/services/predictor_service.py +++ b/app/services/predictor_service.py @@ -54,7 +54,7 @@ def _load_model(self, model_path: Path) -> LightCNN: model.eval() return model - async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: + def predict(self, stream_file: BytesIO) -> tuple[str, str, float]: image = Image.open(stream_file).convert('RGB') input_tensor = self.transform(image).unsqueeze(0).to(self.device) From 437caf4c556aafc3f4ee0eeb8de4f47059000195 Mon Sep 17 00:00:00 2001 From: youyeon11 Date: Wed, 3 Sep 2025 17:38:32 +0900 Subject: [PATCH 16/31] =?UTF-8?q?fix:=20msg=20List->=20str=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/redis_client.py | 2 +- app/worker/worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 01a4859..4f56c75 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -63,7 +63,7 @@ async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int: + async def xack_and_del(self, stream_name: str, group_name: str, message_ids: str) -> int: acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) diff --git a/app/worker/worker.py b/app/worker/worker.py index fd398fc..2db59b1 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -19,7 +19,7 @@ async def run(self): resp = await self.redis_client.xreadgroup( group_name=settings.GROUP_NAME, consumer_name=settings.CONSUMER_NAME, - stream_name={settings.STREAM_JOB: ">"}, + stream_name=settings.STREAM_JOB, count=10, block=5000, ) From 58398109265d5cbfad681f4a36659720fdeda2fc Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:33:36 +0900 Subject: [PATCH 17/31] =?UTF-8?q?refactor:=20snake=20case=20->=20camel=20c?= =?UTF-8?q?ase=EB=A1=9C=20=EB=B3=80=EA=B2=BD=20(Spring=20=EC=84=9C?= =?UTF-8?q?=EB=B2=84=EC=99=80=EC=9D=98=20=ED=86=B5=EC=9D=BC=EC=84=B1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/schemas/job.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/app/schemas/job.py b/app/schemas/job.py index 4313bc5..f75ed14 100644 --- a/app/schemas/job.py +++ b/app/schemas/job.py @@ -2,20 +2,25 @@ from pydantic import BaseModel, Field class JobRequest(BaseModel): - presigned_url: str = Field(..., description="다운로드할 이미지의 Presigned URL") + presignedUrl: str = Field(..., description="다운로드할 이미지의 Presigned URL") +""" +FastAPI가 image.results Stream에 발행하는 메시지 +""" class JobResult(BaseModel): - pill_name: str - correlation_id: str - label: str - confidence: float - finished_at: str + correlationId: str + pillName: str + isSafe: int + description: str + finishedAt: str +""" +Spring으로부터 받는 Job(image.jobs 구독) +""" class ImageJob(BaseModel): - correlation_id: str = Field(alias="correlationId") - presigned_url: str = Field(alias="presignedUrl") - reply_queue: str = Field(alias="replyQueue") - callback_url: str | None = Field(alias="callbackUrl") - content_type: str = Field(alias="contentType") - created_at: str = Field(alias="createdAt") - ttl_sec: int = Field(alias="ttlSec") + correlationId: str = Field(alias="correlationId") + presignedUrl: str = Field(alias="presignedUrl") + replyQueue: str = Field(alias="replyQueue") + contentType: str = Field(alias="contentType") + createdAt: str = Field(alias="createdAt") + ttlSec: int = Field(alias="ttlSec") From 39bb4ff379145e716b59e152173c89486ecd85e5 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:34:19 +0900 Subject: [PATCH 18/31] =?UTF-8?q?refactor:=20snake=20case=20->=20camel=20c?= =?UTF-8?q?ase=EB=A1=9C=20=EB=B3=80=EA=B2=BD=20(Spring=20=EC=84=9C?= =?UTF-8?q?=EB=B2=84=EC=99=80=EC=9D=98=20=ED=86=B5=EC=9D=BC=EC=84=B1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index 06fd1b2..4b8c66f 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -12,13 +12,12 @@ def get_redis_client(request: Request) -> redis.Redis: @router.post("/predict", status_code=202) async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): - correlation_id = str(uuid.uuid4()) + correlationId = str(uuid.uuid4()) job = ImageJob( - correlationId=correlation_id, - presignedUrl=job_request.presigned_url, + correlationId=correlationId, + presignedUrl=job_request.presignedUrl, replyQueue=settings.STREAM_RESULT, - callbackUrl=None, - contentType="image/jpeg", + contentType=job_request.contentType, createdAt=datetime.utcnow().isoformat(), ttlSec=3600, ) @@ -30,4 +29,6 @@ async def create_prediction_job(job_request: JobRequest, redis_client: redis.Red approximate=True, ) - return {"job_id": correlation_id} \ No newline at end of file + print("분석 결과 발행 완료...") + + return {"job_id": correlationId} \ No newline at end of file From 43cb041e63d10c734afe1b0047c1dbbc7c0bba2d Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:34:51 +0900 Subject: [PATCH 19/31] =?UTF-8?q?fix:=20Message=20=EC=9E=91=EC=84=B1=20?= =?UTF-8?q?=EB=B0=8F=20=ED=95=84=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index 4b8c66f..6015b35 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -4,12 +4,16 @@ import redis.asyncio as redis import uuid from datetime import datetime +import json router = APIRouter() def get_redis_client(request: Request) -> redis.Redis: return request.app.state.redis_client +""" +테스트를 위한 임시적인 API +""" @router.post("/predict", status_code=202) async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): correlationId = str(uuid.uuid4()) @@ -22,9 +26,17 @@ async def create_prediction_job(job_request: JobRequest, redis_client: redis.Red ttlSec=3600, ) - await redis_client.xadd( + # 타입에 맞도록 넣어주기 + correlationId = job.correlationId + payload = json.dumps(job.dict()) + print("분석 결과 발행 시작...") + entry_id = await redis_client.xadd( settings.STREAM_JOB, - {"json": job.model_dump_json()}, + { + "type": "image_results", + "payload": payload, + "correlationId": correlationId, + }, maxlen=10_000, approximate=True, ) From fd8dea87d1d0a10f90db0a5755cf86fc37253c0a Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:36:55 +0900 Subject: [PATCH 20/31] =?UTF-8?q?fix:=20Message=EC=97=90=20=EB=8C=80?= =?UTF-8?q?=ED=95=9C=20=EC=A0=84=EC=B2=98=EB=A6=AC=20=EC=B6=94=EA=B0=80(?= =?UTF-8?q?=EC=95=88=EC=A0=84=ED=95=9C=20=EC=A0=84=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=ED=9B=84=20=EC=9D=BD=EA=B8=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/redis_client.py | 141 +++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 38 deletions(-) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 4f56c75..1a79bcb 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -1,9 +1,9 @@ -import asyncio import redis +import json from redis.asyncio import Redis as AsyncRedis from pydantic import BaseModel, Field -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List, Optional, Union from app.core.config import settings # Redis Stream Definition @@ -12,7 +12,6 @@ class PublishRequest(BaseModel): payload: Dict[str, Any] class RedisStreamClient: - def __init__(self): self.redis_client = AsyncRedis.from_url( url=settings.REDIS_URL, @@ -21,38 +20,74 @@ def __init__(self): @classmethod def init(cls): - broker = cls() - return broker + return cls() + + @staticmethod + def _to_scalar(v: Any) -> Union[str, bytes, int, float]: + # Redis XADD : str/bytes/int/float 허용 + if isinstance(v, (str, bytes, int, float)): + return v + # 그 외는 JSON 문자열로 직렬화 (ensure_ascii=False로 한글 보존) + return json.dumps(v, ensure_ascii=False) + + + @classmethod + def _sanitize_fields_for_xadd(cls, fields: Dict[str, Any]) -> Dict[str, Union[str, bytes, int, float]]: + clean: Dict[str, Union[str, bytes, int, float]] = {} + for k, v in fields.items(): + if not isinstance(k, str): + k = str(k) + clean[k] = cls._to_scalar(v) + return clean + # Fast API 에서 Publish - async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: - return await self.redis_client.xadd(stream_name, fields) + async def xadd( + self, + stream_name: str, + fields: Dict[str, Any], + *, + maxlen: Optional[int] = 10_000, + approximate: bool = True, + nomkstream: bool = False, + id: str = "*", + ) -> str: + safe_fields = self._sanitize_fields_for_xadd(fields) + return await self.redis_client.xadd( + stream_name, + safe_fields, + id=id, + maxlen=maxlen, + approximate=approximate, + nomkstream=nomkstream, + ) # Group 단위로 읽어오기 async def xreadgroup( - self, - group_name: str, - consumer_name: str, - stream_name: str, - count: Optional[int] = None, - block: Optional[int] = None, # ms 단위 - id: str = ">", # 새 메시지만 읽기 + self, + group_name: str, + consumer_name: str, + stream_name: str, + count: Optional[int] = None, + block: Optional[int] = None, # ms ) -> List[tuple]: + # Consumer Group 생성 (없으면) try: - # Create the consumer group (존재하지 않을 때) - self.redis_client.xgroup_create( - stream_name, group_name, id="$", mkstream=True + await self.redis_client.xgroup_create( + name=stream_name, + groupname=group_name, + id="0", + mkstream=True, ) except redis.exceptions.ResponseError as e: - # 이미 존재할 때 if "BUSYGROUP" not in str(e): raise - streams = {stream_name: id} + streams = {stream_name: ">"} # 신규 메시지 response = await self.redis_client.xreadgroup( - group_name, # groupname (positional) - consumer_name, # consumername (positional) - streams, # {stream: id} (positional) + group_name, + consumer_name, + streams, count=count, block=block, ) @@ -63,36 +98,38 @@ async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - async def xack_and_del(self, stream_name: str, group_name: str, message_ids: str) -> int: - - acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) - - # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) + async def xack_and_del( + self, + stream_name: str, + group_name: str, + message_ids: Union[str, List[str]], + ) -> int: + ids = [message_ids] if isinstance(message_ids, str) else list(message_ids) + acked_count = await self.redis_client.xack(stream_name, group_name, *ids) if acked_count > 0: - await self.redis_client.xdel(stream_name, *message_ids) - + await self.redis_client.xdel(stream_name, *ids) return acked_count + # Group 생성 async def xgroup_create(self, stream_name: str, group_name: str, id: str = "$") -> bool: try: - self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) + await self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) return True except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): print(f"Consumer group '{group_name}' already exists.") return False - raise e + raise # 메시지 재처리 지원 async def xclaim( - self, - stream_name: str, - group_name: str, - consumer_name: str, - min_idle_time: int, - message_ids: List[str], + self, + stream_name: str, + group_name: str, + consumer_name: str, + min_idle_time: int, + message_ids: List[str], ) -> List[tuple]: - return await self.redis_client.xclaim( stream_name=stream_name, group_name=group_name, @@ -101,6 +138,34 @@ async def xclaim( message_ids=message_ids, ) + # 자동으로 재청구 + async def xautoclaim( + self, + name: str, + groupname: str, + consumername: str, + min_idle_time: int, + start_id: str = "0-0", + count: Optional[int] = None, + justid: bool = False, + ): + res = await self.redis_client.xautoclaim( + name=name, + groupname=groupname, + consumername=consumername, + min_idle_time=min_idle_time, + start_id=start_id, + count=count, + justid=justid, + ) + # 2-튜플/3-튜플 호환하도록 전처리 + if isinstance(res, (list, tuple)) and len(res) == 3: + next_id, messages, _deleted = res + return next_id, messages + return res + + + # 종료 async def aclose(self): await self.redis_client.close() From 35cf9a4855e2c81e962f431071c7f864077dd72a Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:38:03 +0900 Subject: [PATCH 21/31] =?UTF-8?q?fix:=20=EB=B0=9C=ED=96=89=ED=95=98?= =?UTF-8?q?=EB=8A=94=20Message=20=EB=82=B4=EC=9A=A9=20=EC=88=98=EC=A0=95(c?= =?UTF-8?q?orrelationId,=20type,=20payload=20=EA=B0=81=EA=B0=81=20?= =?UTF-8?q?=ED=95=84=EB=93=9C=EB=A1=9C=20=EB=B0=9C=ED=96=89)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/tasks.py | 47 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/app/worker/tasks.py b/app/worker/tasks.py index a2a3bf2..3a22e33 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -8,34 +8,53 @@ from app.services.predictor_service import predictor_service from app.services.s3_service import s3_service +""" +이미지를 다운 -> 다운 한 것에 대하여 모델 분석 요청 +""" async def process_image_scan(job: ImageJob, redis_client: redis.Redis): - correlation_id = job.correlationId - print(f"[task] Start image scan for job_id={correlation_id}") + correlationId = job.correlationId + print(f"[task] Start image scan for job_id={correlationId}") try: - stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl)) - pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file)) + stream_file = await asyncio.to_thread( + s3_service.download_file_from_presigned_url, + job.presignedUrl + ) + + stream_file.seek(0) + + pillName, label, confidence = await asyncio.to_thread( + predictor_service.predict, + stream_file + ) + + # TODO: ChatGPT에 요청 결과 출력 - finished_at = datetime.utcnow().isoformat() + isSafe = 0 + description = "일단은 테스트입니다. 추후에 GPT 부분 추가할 예정" + finishedAt = datetime.utcnow().isoformat() result = JobResult( - pill_name=pill_name, - correlation_id=correlation_id, - label=label, - confidence=confidence, - finished_at=finished_at, + correlationId=correlationId, + pillName=pillName, + isSafe=isSafe, + description=description, + finishedAt=finishedAt, ) await redis_client.xadd( settings.STREAM_RESULT, - {"json": result.model_dump_json()}, + { + "correlationId": correlationId, + "type": "image_results", + "payload": result.model_dump_json()}, maxlen=10_000, approximate=True, ) - print(f"[task] Image scan finished for job_id={correlation_id}") + print(f"[task] Image scan successfully finished for job_id={correlationId}") except Exception as e: - print(f"[task] Failed to process job_id={correlation_id}: {e}") + print(f"[task] Failed to process job_id={correlationId}: {e}") finally: - print(f"[task] Image scan finished for job_id={correlation_id}") + print(f"[task] Image scan finished for job_id={correlationId}") From e8e0651fb684b41ccdb9e1e2e18e424589df3950 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:38:34 +0900 Subject: [PATCH 22/31] =?UTF-8?q?fix:=20=EB=A9=94=EC=8B=9C=EC=A7=80=20Cons?= =?UTF-8?q?umer=EC=9D=98=20=EC=95=88=EC=A0=84=ED=95=9C=20=EB=A9=94?= =?UTF-8?q?=EC=8B=9C=EC=A7=80=20=EC=9D=BD=EA=B8=B0=20=EC=A0=84=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=ED=95=A8=EC=88=98=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 136 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 108 insertions(+), 28 deletions(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index 2db59b1..fbdec09 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -1,10 +1,49 @@ - +import json import asyncio from app.worker.redis_client import redis_client from app.core.config import settings from app.schemas.job import ImageJob from app.worker.tasks import process_image_scan +""" +Redis Stream에 정의한 유효한 형식 메시지를 위한 전처리 함수 +""" +def _to_scalr(v): + # XADD 허용 타입: str, bytes, int, float + if isinstance(v, (str, bytes, int, float)): + return v + # 그 외는 JSON str + return json.dumps(v, ensure_ascii=False) + +""" +Decoding +""" +def _decode(b): + if isinstance(b, (bytes, bytearray)): + return b.decode() + else: + return b + + +def _sanitize_fields_for_xadd(fields: dict) -> dict: + # 정제 + cleaned = {} + for k, v in fields.items(): + k = _decode(k) + if isinstance(v, (bytes, bytearray)): + try: + v = v.decode() + except Exception: + pass + else: + v = _to_scalr(v) + cleaned[k] = v + return cleaned + + +""" +"image.jobs"를 구독 +""" class JobWorker: def __init__(self, redis_client: redis_client): self.redis_client = redis_client @@ -16,6 +55,7 @@ async def run(self): while True: try: + # Consumer의 메시지 읽기 resp = await self.redis_client.xreadgroup( group_name=settings.GROUP_NAME, consumer_name=settings.CONSUMER_NAME, @@ -27,25 +67,52 @@ async def run(self): _, entries = resp[0] for msg_id, fields in entries: try: - job = ImageJob.model_validate_json(fields["json"]) - task = asyncio.create_task(process_image_scan(job, redis_client)) - print(f"[worker] {task} 발행 성공") - # 처리 성공 시에만 ack 후 del - task.add_done_callback(lambda t: asyncio.create_task( - self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) - if not t.exception() else - self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(t.exception()), **fields}) - )) + job_type = fields.get(b"type") or fields.get("type") + correlation_id = fields.get(b"correlationId") or fields.get("correlationId") + payload = fields.get(b"payload") or fields.get("payload") + + # type 검증 + if job_type in (b"image_jobs", "image_jobs"): + + # payload 전처리 + if isinstance(payload, (bytes, bytearray)): + payload_str = payload.decode() + else: + payload_str = payload if isinstance(payload, str) else json.dumps(payload) + + # 최종 반환 data + data = json.loads(payload_str) + print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}") + + job = data + # XADD까지 호출 + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) + + else: + # job_type 불일치 경우 -> DLQ + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": "unexpected job type"}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + except asyncio.CancelledError: # 취소되면 재전송되도록 ack 하지 않음 raise + except Exception as e: - await self.redis_client.xadd( - f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(e), **fields}, - ) + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(e)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + # 주기적으로 AutoClaim now = asyncio.get_event_loop().time() if now - last_reclaim > reclaim_every_sec: last_reclaim = now @@ -59,21 +126,34 @@ async def run(self): ) for msg_id, fields in claimed: try: - job = ImageJob.model_validate_json(fields["json"]) - task = asyncio.create_task(process_image_scan(job, redis_client)) + payload = fields.get(b"payload") or fields.get("payload") + if isinstance(payload, (bytes, bytearray)): + payload = payload.decode() + job = ImageJob.model_validate_json(payload) + + task = asyncio.create_task(process_image_scan(job, self.redis_client)) print(f"[worker] {task} 발행 성공") - # 처리 성공 시에만 ack 후 del - task.add_done_callback(lambda t: asyncio.create_task( - self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) - if not t.exception() else - self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(t.exception()), **fields}) - )) + + def _on_done(t: asyncio.Task, *, msg_id=msg_id, fields=fields): + async def _ack_or_dlq(): + exc = t.exception() + if exc is None: + await self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, + msg_id) + else: + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(exc)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + + asyncio.create_task(_ack_or_dlq()) + + task.add_done_callback(_on_done) + except Exception as e: - await self.redis_client.xadd( - f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(e), **fields}, - ) + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(e)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + except asyncio.CancelledError: print("[worker] cancelled; bye") break From bfe07a6d796e3625d22c4583e36d9d243347affb Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 16:31:21 +0900 Subject: [PATCH 23/31] =?UTF-8?q?feat:=20BaseModle=EB=A1=9C=20validate=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index fbdec09..751455a 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -84,7 +84,7 @@ async def run(self): data = json.loads(payload_str) print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}") - job = data + job = ImageJob.model_validate(data) # XADD까지 호출 task = asyncio.create_task(process_image_scan(job, redis_client)) print(f"[worker] {task} 발행 성공") From bb4eb0a55360411601d79188e34f01fa8e24dc86 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:25:04 +0900 Subject: [PATCH 24/31] =?UTF-8?q?feat:=20open=20AI=20API=20kye=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/config.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 5e336f3..00a7b51 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -5,14 +5,11 @@ load_dotenv() redis_url = os.getenv("REDIS_SERVER_URL") -bucket_name = os.getenv("BUCKET_NAME") -s3_region = os.getenv("S3_REGION") - +oepnai_key = os.getenv("OPENAI_API_KEY") class Settings(BaseSettings): REDIS_URL: str = redis_url - S3_REGION: str = bucket_name - BUCKET_NAME: str = s3_region + OPENAI_KEY: str = oepnai_key STREAM_JOB: str = "image.jobs" # SpringBoot에서 job 발행 (FastAPI에서 listen) STREAM_RESULT: str = "image.results" # FastAPI에서 결과 발행 (SpringBoot에서 listen) From 38eaf14edea8525167b23eb17aabf9202897cdec Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:25:17 +0900 Subject: [PATCH 25/31] =?UTF-8?q?feat:=20openAI=20=EB=9D=BC=EC=9D=B4?= =?UTF-8?q?=EB=B8=8C=EB=9F=AC=EB=A6=AC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5cf02b2..d06a886 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,9 +3,9 @@ uvicorn redis pydantic pydantic-settings -boto3 requests torch==2.8.0 torchvision==0.23.0 Pillow==11.3.0 -dotenv \ No newline at end of file +dotenv +openai \ No newline at end of file From db909042e755301c4f973a383adce92734b8f3cb Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:25:35 +0900 Subject: [PATCH 26/31] =?UTF-8?q?del:=20=EB=B6=88=ED=95=84=EC=9A=94?= =?UTF-8?q?=ED=95=9C=20boto=20=EB=9D=BC=EC=9D=B4=EB=B8=8C=EB=9F=AC?= =?UTF-8?q?=EB=A6=AC=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/s3_service.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/app/services/s3_service.py b/app/services/s3_service.py index 8cf7719..1e8debb 100644 --- a/app/services/s3_service.py +++ b/app/services/s3_service.py @@ -1,27 +1,15 @@ -import boto3 -from botocore.config import Config as BotoConfig import requests from io import BytesIO -from app.core.config import settings - class S3Service: def __init__(self): - self.client = boto3.client( - "s3", - region_name=settings.S3_REGION, - config=BotoConfig( - retries={"max_attempts": 5, "mode": "standard"}, - read_timeout=30, - connect_timeout=5, - ), - ) + pass def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO: response = requests.get(presigned_url) response.raise_for_status() - - return BytesIO(response.content) # response 안의 content Stream으로 처리 + # response의 content를 BytesIO로 감싸 반환 + return BytesIO(response.content) s3_service = S3Service() From e885d49792cc576efb9872f5ed3248e370218ce4 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:25:54 +0900 Subject: [PATCH 27/31] =?UTF-8?q?feat:=20OpenAI=20=EC=A7=88=EB=AC=B8=20?= =?UTF-8?q?=ED=9B=84=20=EC=84=A4=EB=AA=85=20=EB=B0=98=ED=99=98=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/openai_service.py | 66 ++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 app/services/openai_service.py diff --git a/app/services/openai_service.py b/app/services/openai_service.py new file mode 100644 index 0000000..7455f19 --- /dev/null +++ b/app/services/openai_service.py @@ -0,0 +1,66 @@ +from app.core.config import settings +from openai import OpenAI +import json + +client = OpenAI(api_key=settings.OPENAI_KEY) + +class PregnancySafetyChecker: + def __init__(self, client: OpenAI): + self.client = client + + """ + - isSafe: 안전하면 1, 안전하지 않으면 0 + - description: 복용 가능 여부 설명 + """ + def ask_chatgpt_about_pregnancy_safety(self, pill_name: str) -> tuple[str, int]: + prompt = f""" + 약 이름: {pill_name} + 질문: 이 약은 임산부가 복용해도 안전한가요? 복용 가능 여부와 주의사항을 알려주세요. + description 안에는 문장마다 \\n 을 적용하세요. + 결과를 JSON 형식으로 정확히 반환하세요. 설명이나 다른 텍스트를 절대 덧붙이지 마세요. + 스키마: + {{ + "description": "복용 가능 여부 및 주의사항에 대한 설명", + "isSafe": 1 또는 0 + }} + """ + + response = self.client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": prompt}], + temperature=0, + max_tokens=600, + response_format={"type": "json_object"} + ) + print("GPT Asking 성공...") + raw = response.choices[0].message.content.strip() + + try: + data = json.loads(raw) + except json.JSONDecodeError: + start = raw.find("{") + end = raw.rfind("}") + if start != -1 and end != -1 and start < end: + data = json.loads(raw[start:end+1]) + else: + # 디버깅 + preview = raw[:200].replace("\n", "\\n") + raise ValueError(f"응답이 유효한 JSON이 아닙니다. preview='{preview}'") + + description = data.get("description") + isSafe = data.get("isSafe") + + if isinstance(isSafe, bool): + isSafe = 1 if isSafe else 0 + elif isinstance(isSafe, str): + isSafe = 1 if isSafe.strip() in {"1", "true", "True"} else 0 + elif not isinstance(isSafe, int): + isSafe = 0 + + if not isinstance(description, str): + description = "" # 안전장치 + + return description, int(isSafe) + + +checker = PregnancySafetyChecker(client) \ No newline at end of file From 022e9f3099f86d3f31b17dd56f2698aaacbf7266 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:26:02 +0900 Subject: [PATCH 28/31] =?UTF-8?q?feat:=20OpenAI=20=EC=A7=88=EB=AC=B8=20?= =?UTF-8?q?=ED=9B=84=20=EC=84=A4=EB=AA=85=20=EB=B0=98=ED=99=98=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/tasks.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/app/worker/tasks.py b/app/worker/tasks.py index 3a22e33..63dfdc5 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -5,6 +5,7 @@ from app.core.config import settings from app.schemas.job import ImageJob, JobResult +from app.services.openai_service import checker from app.services.predictor_service import predictor_service from app.services.s3_service import s3_service @@ -27,11 +28,8 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis): predictor_service.predict, stream_file ) - - # TODO: ChatGPT에 요청 결과 출력 - - isSafe = 0 - description = "일단은 테스트입니다. 추후에 GPT 부분 추가할 예정" + print(f"[task] Start Asking GPT for job_id={correlationId}") + description, isSafe = checker.ask_chatgpt_about_pregnancy_safety(pillName) finishedAt = datetime.utcnow().isoformat() result = JobResult( From 0541d2784de708e619b1a59371de0df3a5add296 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:44:25 +0900 Subject: [PATCH 29/31] =?UTF-8?q?fix:=20=EB=B3=80=EC=88=98=EB=AA=85=20?= =?UTF-8?q?=EC=98=A4=ED=83=80=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 00a7b51..1bf7e00 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -5,11 +5,11 @@ load_dotenv() redis_url = os.getenv("REDIS_SERVER_URL") -oepnai_key = os.getenv("OPENAI_API_KEY") +openai_key = os.getenv("OPENAI_API_KEY") class Settings(BaseSettings): REDIS_URL: str = redis_url - OPENAI_KEY: str = oepnai_key + OPENAI_KEY: str = openai_key STREAM_JOB: str = "image.jobs" # SpringBoot에서 job 발행 (FastAPI에서 listen) STREAM_RESULT: str = "image.results" # FastAPI에서 결과 발행 (SpringBoot에서 listen) From 5d56d32027e2d4ce154fef44c6e2c1a6aea4ea01 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 17:51:50 +0900 Subject: [PATCH 30/31] =?UTF-8?q?fix:=20=EA=B6=8C=ED=95=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index fbabb99..d502246 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -77,5 +77,5 @@ jobs: docker pull ${{ secrets.ECR_URI }}/dearbelly-cv:latest # 6. docker start - chmod +x deploy.sh + sudo chmod +x deploy.sh source deploy.sh \ No newline at end of file From 1125d0d476a23e29f4d42880566bf07eba1c99ff Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 18:00:51 +0900 Subject: [PATCH 31/31] =?UTF-8?q?fix:=20=EC=8A=A4=ED=81=AC=EB=A6=BD?= =?UTF-8?q?=ED=8A=B8=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/deploy.sh | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/deployment/deploy.sh b/deployment/deploy.sh index 7cbf316..9f012d4 100644 --- a/deployment/deploy.sh +++ b/deployment/deploy.sh @@ -20,7 +20,7 @@ else HOST_PORT="8000" fi -echo "Pulling image: ${IMAGE}" +echo "Pulling new image" # docker pull docker compose pull app-${AFTER_COMPOSE_COLOR} docker compose up -d --no-deps --force-recreate app-${AFTER_COMPOSE_COLOR} @@ -28,7 +28,7 @@ docker compose up -d --no-deps --force-recreate app-${AFTER_COMPOSE_COLOR} # 새 컨테이너가 running 될 때까지 대기 for i in $(seq 1 600); do - if docker ps --filter "name=^/app-${AFTER_COLOR}$" --filter "status=running" --format '{{.Names}}' | grep -q .; then + if docker ps --filter "name=^/app-${AFTER_COMPOSE_COLOR}$" --filter "status=running" --format '{{.Names}}' | grep -q .; then echo "New app-${AFTER_COLOR} container is running." break fi @@ -39,15 +39,13 @@ for i in $(seq 1 600); do fi done -# 새로운 컨테이너 확인 후 Nginx 설정 변경 -if docker ps --filter "name=app-${AFTER_COMPOSE_COLOR}" --filter "status=running" --format '{{.Names}}' | grep -q .; then - echo "New app-${AFTER_COMPOSE_COLOR} container is running." - # reload nginx - NGINX_ID=$(sudo docker ps --filter "name=nginx" --quiet) - NGINX_CONFIG="/home/ubuntu/deployment/nginx.conf" - - echo "Switching Nginx upstream config..." - if ! sed -i "s/app-${BEFORE_COMPOSE_COLOR}:8000/app-${AFTER_COMPOSE_COLOR}:8000/" $NGINX_CONFIG; then - echo "Error occured: Failed to update Nginx config. Exiting deploy script." - exit 1 - fi \ No newline at end of file +# 이전 컨테이너 종료 및 정리 +if docker ps --filter "name=app-${AFTER_COMPOSE_COLOR}" --filter "status=running" | grep -q .; then + echo "Stopping old container app-${BEFORE_COMPOSE_COLOR}" + docker stop app-${BEFORE_COMPOSE_COLOR} || true + docker rm app-${BEFORE_COMPOSE_COLOR} || true + docker image prune -af +fi + +echo "Deployment success." +exit 0 \ No newline at end of file