Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions app/core/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from logging.config import dictConfig

"""
애플리케이션 전체에서의 로깅 레벨 설정
"""
def setup_logging():
dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)s [%(name)s] %(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
},
},
"root": {
"level": "INFO",
"handlers": ["console"],
},
"loggers": {
"uvicorn": {"level": "INFO"},
"uvicorn.error": {"level": "INFO"},
"uvicorn.access": {"level": "INFO"},
"app": {"level": "INFO", "handlers": ["console"], "propagate": False},
},
})
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from fastapi import FastAPI, Request
from app.core.lifespan import lifespan
from app.api.endpoints import predictions
from app.core.logging_config import setup_logging

setup_logging()
app = FastAPI(
title="DearBelly CV API",
description="DearBelly CV를 위한 Swagger 입니다.",
Expand Down
6 changes: 5 additions & 1 deletion app/services/openai_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from app.core.config import settings
from openai import OpenAI
import json
import logging

logger = logging.getLogger(__name__)

client = OpenAI(api_key=settings.OPENAI_KEY)

Expand Down Expand Up @@ -32,7 +35,7 @@ def ask_chatgpt_about_pregnancy_safety(self, pill_name: str) -> tuple[str, int]:
max_tokens=600,
response_format={"type": "json_object"}
)
print("GPT Asking 성공...")
logger.info("GPT Asking 성공...")
raw = response.choices[0].message.content.strip()

try:
Expand All @@ -45,6 +48,7 @@ def ask_chatgpt_about_pregnancy_safety(self, pill_name: str) -> tuple[str, int]:
else:
# 디버깅
preview = raw[:200].replace("\n", "\\n")
logger.warning("JSON 파싱 실패")
raise ValueError(f"응답이 유효한 JSON이 아닙니다. preview='{preview}'")

description = data.get("description")
Expand Down
133 changes: 94 additions & 39 deletions app/services/predictor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@
import json
from pathlib import Path
from io import BytesIO
import logging

logger = logging.getLogger(__name__)

class EfficientNetBaseline(nn.Module):
def __init__(self, num_classes, pretrained=True, dropout=0.2):
super().__init__()
logger.info(
f"Initializing EfficientNetBaseline with num_classes={num_classes}, pretrained={pretrained}, dropout={dropout}")

self.backbone = timm.create_model(
"efficientnet_b3", pretrained=pretrained, num_classes=0, global_pool="avg"
)
feat_dim = self.backbone.num_features

self.bn = nn.BatchNorm1d(feat_dim)
self.dp = nn.Dropout(dropout)
self.fc = nn.Linear(feat_dim, num_classes)
Expand All @@ -28,66 +35,114 @@ def forward(self, x):

class PredictorService:
def __init__(self, model_path: Path, json_path: Path):

logger.info(f"Initializing PredictorService with model_path={model_path}, json_path={json_path}")

self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# cuda 모델 확인
logger.info(f"Using device: {self.device}")

self.idx2label = self._load_idx2label(json_path)
self.num_classes = len(self.idx2label)
logger.info(f"Loaded {self.num_classes} classes")

self.model = self._load_model(model_path)
self.transform = transforms.Compose([
transforms.Resize((64, 64)),
transforms.ToTensor(),
])
logger.info("PredictorService initialized successfully")

def _load_idx2label(self, json_path: Path) -> dict:
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
idx2label = data.get("idx2label")
if not idx2label:
unique_labels = sorted(set(sample["label"] for sample in data["samples"]))
idx2label = {str(label): f"K-{label:06d}" for label in unique_labels}
return idx2label

# json 제대로 읽었는지 확인
logger.info(f"Loading idx2label from {json_path}")

try:
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
idx2label = data.get("idx2label")
if not idx2label:
logger.warning("idx2label not found in JSON, generating from samples")
unique_labels = sorted(set(sample["label"] for sample in data["samples"]))
idx2label = {str(label): f"K-{label:06d}" for label in unique_labels}
return idx2label

# 예외 사항 추가
except FileNotFoundError:
logger.error(f"JSON file not found: {json_path}")
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON file: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error loading idx2label: {e}", exc_info=True)
raise

def _load_model(self, model_path: Path) -> EfficientNetBaseline:
# model path 확인하기
logger.info(f"Loading model from {model_path}")

import __main__
__main__.EfficientNetBaseline = EfficientNetBaseline

object = torch.load(model_path, map_location=self.device, weights_only=False)

if isinstance(object, nn.Module) :
# 그 자체로 모델일 때
model = object.to(self.device)
elif isinstance(object, dict) :
# 반환 타입이 state_dict
state_dict = object
for k in ['state_dict', 'model_state_dict', 'model']:
if k in object and isinstance(object[k], dict):
state_dict = object[k]
break
try:
object = torch.load(model_path, map_location=self.device, weights_only=False)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

torch.load에서 weights_only=False를 사용하는 것은 신뢰할 수 없는 소스의 모델 파일을 로드할 때 임의 코드 실행으로 이어질 수 있는 보안 위험을 초래할 수 있습니다. 모델의 state_dict만 로드하는 것이 더 안전합니다. weights_only=True로 설정하고 state_dict를 로드하도록 코드를 수정하는 것을 강력히 권장합니다.

Suggested change
object = torch.load(model_path, map_location=self.device, weights_only=False)
object = torch.load(model_path, map_location=self.device, weights_only=True)


if isinstance(object, nn.Module) :
# 그 자체로 모델일 때
model = object.to(self.device)
elif isinstance(object, dict) :
# 반환 타입이 state_dict
state_dict = object
for k in ['state_dict', 'model_state_dict', 'model']:
if k in object and isinstance(object[k], dict):
state_dict = object[k]
break

model = EfficientNetBaseline(self.num_classes).to(self.device)

missing, unexpected = model.load_state_dict(state_dict, strict=False)
if missing or unexpected:
logger.warning(f"[load_state_dict] missing keys: {missing}, unexpected keys: {unexpected}")
else:
# type 일치하지 않음
error_msg = f"Unsupported checkpoint type: {type(object)}"
logger.error(error_msg)
raise TypeError(f"Unsupported checkpoint type: {type(object)}")

# 완료
model.eval()
logger.info("Model loaded and set to evaluation mode")
return model

except FileNotFoundError:
logger.error(f"Model file not found: {model_path}")
raise
except Exception as e:
logger.error(f"Failed to load model: {e}", exc_info=True)
raise

model = EfficientNetBaseline(self.num_classes).to(self.device)

missing, unexpected = model.load_state_dict(state_dict, strict=False)
if missing or unexpected:
print(f"[load_state_dict] missing keys: {missing}, unexpected keys: {unexpected}")
else:
# type 일치하지 않음
raise TypeError(f"Unsupported checkpoint type: {type(object)}")

model.eval()
return model

def predict(self, stream_file: BytesIO) -> tuple[str, str, float]:
image = Image.open(stream_file).convert('RGB')
input_tensor = self.transform(image).unsqueeze(0).to(self.device)

with torch.no_grad():
output = self.model(input_tensor)
predicted_idx = torch.argmax(output, dim=1).item()
confidence = torch.softmax(output, dim=1)[0][predicted_idx].item()
try :
image = Image.open(stream_file).convert('RGB')
input_tensor = self.transform(image).unsqueeze(0).to(self.device)

with torch.no_grad():
output = self.model(input_tensor)
predicted_idx = torch.argmax(output, dim=1).item()
confidence = torch.softmax(output, dim=1)[0][predicted_idx].item()

label = str(predicted_idx)
pill_name = self.idx2label.get(label, f"Unknown Label: {label}")
label = str(predicted_idx)
pill_name = self.idx2label.get(label, f"Unknown Label: {label}")
logger.info(f"Prediction completed - pill_name: {pill_name}, label: {label}, confidence: {confidence:.4f}")

return pill_name, label, confidence
return pill_name, label, confidence
except Exception as e:
logger.error(f"Prediction failed: {e}", exc_info=True)
raise


HERE = Path(__file__).resolve().parent.parent
Expand Down
13 changes: 8 additions & 5 deletions app/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from app.services.openai_service import checker
from app.services.predictor_service import predictor_service
from app.services.s3_service import s3_service
import logging

logger = logging.getLogger(__name__)

"""
이미지를 다운 -> 다운 한 것에 대하여 모델 분석 요청
"""
async def process_image_scan(job: ImageJob, redis_client: redis.Redis):
correlationId = job.correlationId
print(f"[task] Start image scan for job_id={correlationId}")
logging.info(f"[task] Start image scan for job_id={correlationId}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

로그 메시지에 [task] 접두사를 수동으로 추가하고 있습니다. 로깅 설정(logging_config.py)에서 포맷터가 %(name)s을 통해 로거 이름(예: app.worker.tasks)을 자동으로 포함하므로, 이 접두사는 중복 정보를 만듭니다. 가독성을 높이고 로그 메시지를 간결하게 유지하기 위해 이 파일의 다른 로그 메시지(34, 56, 59, 61행)에서도 [task] 접두사를 제거하는 것을 권장합니다.

Suggested change
logging.info(f"[task] Start image scan for job_id={correlationId}")
logging.info(f"Start image scan for job_id={correlationId}")

try:

stream_file = await asyncio.to_thread(
Expand All @@ -28,7 +31,7 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis):
predictor_service.predict,
stream_file
)
print(f"[task] Start Asking GPT for job_id={correlationId}")
logging.info(f"[task] Start Asking GPT for job_id={correlationId}")
description, isSafe = checker.ask_chatgpt_about_pregnancy_safety(pillName)
finishedAt = datetime.utcnow().isoformat()

Expand All @@ -50,9 +53,9 @@ async def process_image_scan(job: ImageJob, redis_client: redis.Redis):
approximate=True,
)

print(f"[task] Image scan successfully finished for job_id={correlationId}")
logging.info(f"[task] Image scan successfully finished for job_id={correlationId}")

except Exception as e:
print(f"[task] Failed to process job_id={correlationId}: {e}")
logging.warning(f"[task] Failed to process job_id={correlationId}: {e}")
finally:
print(f"[task] Image scan finished for job_id={correlationId}")
logging.info(f"[task] Image scan finished for job_id={correlationId}")
15 changes: 9 additions & 6 deletions app/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from app.core.config import settings
from app.schemas.job import ImageJob
from app.worker.tasks import process_image_scan
import logging

logger = logging.getLogger(__name__)

"""
Redis Stream에 정의한 유효한 형식 메시지를 위한 전처리 함수
Expand Down Expand Up @@ -49,7 +52,7 @@ def __init__(self, redis_client: redis_client):
self.redis_client = redis_client

async def run(self):
print(f"[worker] start consumer={settings.CONSUMER_NAME} group={settings.GROUP_NAME} stream={settings.STREAM_JOB}")
logging.info(f"[worker] start consumer={settings.CONSUMER_NAME} group={settings.GROUP_NAME} stream={settings.STREAM_JOB}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

로그 메시지에 [worker] 접두사를 수동으로 추가하고 있습니다. 로깅 설정(logging_config.py)에서 포맷터가 %(name)s을 통해 로거 이름(예: app.worker.worker)을 자동으로 포함하므로, 이 접두사는 중복 정보를 만듭니다. 가독성을 높이고 로그 메시지를 간결하게 유지하기 위해 [worker] 접두사가 포함된 이 파일의 다른 로그 메시지(93, 138, 161, 164행)에서도 접두사를 제거하는 것을 권장합니다.

Suggested change
logging.info(f"[worker] start consumer={settings.CONSUMER_NAME} group={settings.GROUP_NAME} stream={settings.STREAM_JOB}")
logging.info(f"start consumer={settings.CONSUMER_NAME} group={settings.GROUP_NAME} stream={settings.STREAM_JOB}")

reclaim_every_sec = 30
last_reclaim = 0.0

Expand Down Expand Up @@ -82,12 +85,12 @@ async def run(self):

# 최종 반환 data
data = json.loads(payload_str)
print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}")
logging.info(f"Job received id={msg_id} correlationId={correlation_id} payload={data}")

job = ImageJob.model_validate(data)
# XADD까지 호출
task = asyncio.create_task(process_image_scan(job, redis_client))
print(f"[worker] {task} 발행 성공")
logging.info(f"[worker] {task} 발행 성공")

# 처리 성공 시에만 ack 후 del
task.add_done_callback(lambda t: asyncio.create_task(
Expand Down Expand Up @@ -132,7 +135,7 @@ async def run(self):
job = ImageJob.model_validate_json(payload)

task = asyncio.create_task(process_image_scan(job, self.redis_client))
print(f"[worker] {task} 발행 성공")
logging.info(f"[worker] {task} 발행 성공")

def _on_done(t: asyncio.Task, *, msg_id=msg_id, fields=fields):
async def _ack_or_dlq():
Expand All @@ -155,8 +158,8 @@ async def _ack_or_dlq():
await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean)

except asyncio.CancelledError:
print("[worker] cancelled; bye")
logging.warning("[worker] cancelled; bye")
break
except Exception as e:
print(f"[worker] error: {e}")
logging.warning(f"[worker] error: {e}")
await asyncio.sleep(1)
Loading