Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class VoiceAnalyzePreviewResponse(BaseModel):
sad_bps: int
neutral_bps: int
angry_bps: int
anxiety_bps: int # fear -> anxiety (출력용)
anxiety_bps: Optional[int] = 0 # fear -> anxiety (출력용)
surprise_bps: int
top_emotion: Optional[str] = None
top_confidence_bps: Optional[int] = None
Expand Down
12 changes: 12 additions & 0 deletions app/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,15 @@ class InternalServerException(AppException):
def __init__(self, message: str):
super().__init__(status_code=500, message=message)


class NotFoundException(AppException):
"""리소스를 찾을 수 없음 (404)"""
def __init__(self, message: str):
super().__init__(status_code=404, message=message)


class ExternalAPIException(AppException):
"""외부 API 호출 오류 (가변 상태 코드)"""
def __init__(self, status_code: int, message: str):
super().__init__(status_code=status_code, message=message)

33 changes: 33 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async def general_exception_handler(request, exc: Exception):
nlp_router = APIRouter(prefix="/nlp", tags=["nlp"])
test_router = APIRouter(prefix="/test", tags=["test"])
questions_router = APIRouter(prefix="/questions", tags=["questions"])
analyze_router = APIRouter(prefix="/analyze", tags=["analyze"])

# Health
@app.get("/health")
Expand Down Expand Up @@ -972,6 +973,37 @@ async def test_fcm_send(
result = svc.send_notification_to_tokens([token], title, body)
return {"success": True, "result": result}

def get_analyze_chat_service_dep(db: Session = Depends(get_db)):
"""AnalyzeChatService 의존성 함수"""
from .services.analyze_chat_service import get_analyze_chat_service
return get_analyze_chat_service(db)


@analyze_router.post("/chat")
async def analyze_chat(
session_id: str = Form(...),
user_id: str = Form(...),
question: str = Form(...),
file: UploadFile = File(...),
analyze_chat_service: "AnalyzeChatService" = Depends(get_analyze_chat_service_dep)
):
"""
음성 파일을 받아 STT, 감정 분석 후 외부 chatbot API로 전송
"""
try:
return await analyze_chat_service.analyze_and_send(
file=file,
session_id=session_id,
user_id=user_id,
question=question
)
except AppException:
raise
except Exception as exc:
raise InternalServerException(
"Internal server error while analyzing chat"
) from exc

# ---------------- router 등록 ----------------
app.include_router(users_router)
app.include_router(care_router)
Expand All @@ -980,3 +1012,4 @@ async def test_fcm_send(
app.include_router(test_router)
app.include_router(questions_router)
app.include_router(composite_router.router)
app.include_router(analyze_router)
283 changes: 283 additions & 0 deletions app/services/analyze_chat_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
import os
import asyncio
import uuid
from typing import Dict, Any
from io import BytesIO
from datetime import datetime
from sqlalchemy.orm import Session
import httpx
from fastapi import UploadFile

from ..services.va_fusion import fuse_VA
from ..nlp_service import analyze_text_sentiment
from ..emotion_service import analyze_voice_emotion
from ..stt_service import transcribe_voice
from ..s3_service import upload_fileobj
from ..auth_service import get_auth_service
from ..exceptions import (
ValidationException,
InternalServerException,
RuntimeException,
NotFoundException,
ExternalAPIException,
)

class AnalyzeChatService:
"""음성 파일 분석 및 chatbot API 전송 서비스"""

def __init__(self, db: Session):
self.db = db

async def analyze_and_send(
self,
file: UploadFile,
session_id: str,
user_id: str,
question: str
) -> Dict[str, Any]:
"""
음성 파일을 분석하고 외부 chatbot API로 전송

Args:
file: 업로드된 음성 파일
session_id: 세션 ID
user_id: 사용자 ID
question: 질문 내용

Returns:
Dict: 외부 API 응답
"""
# 파일 내용을 메모리에 저장 (여러 번 사용하기 위해)
file_content = await file.read()
# 파일명을 현재 시간과 UUID를 포함한 형식으로 생성 (중복 방지)
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8] # UUID 앞 8자리만 사용
if file.filename:
name, ext = os.path.splitext(file.filename)
filename = f"{name}_{current_time}_{unique_id}{ext}"
else:
filename = f"audio_{current_time}_{unique_id}.wav"

# 1. S3 파일 업로드 (별도 스레드에서 실행)
await asyncio.to_thread(
self._upload_to_s3,
file_content,
filename,
session_id,
user_id,
file.content_type
)

# 2. STT (음성 → 텍스트) - 별도 스레드에서 실행
content = await asyncio.to_thread(
self._transcribe_audio,
file_content,
filename,
file.content_type
)

# 3. 음성 감정 분석 - 별도 스레드에서 실행
emotion_data = await asyncio.to_thread(
self._analyze_emotion,
file_content,
filename,
file.content_type,
content
)

# 4. 사용자 정보 조회 (DB 세션은 스레드 안전하지 않으므로 메인 스레드에서 실행)
user_name = self._get_user_name(user_id)

# 5. 외부 API 호출
return await self._send_to_chatbot(
content=content,
emotion=emotion_data,
question=question,
user_id=user_id,
user_name=user_name
)

def _upload_to_s3(
self,
file_content: bytes,
filename: str,
session_id: str,
user_id: str,
content_type: str
) -> str:
"""S3에 파일 업로드"""
bucket = os.getenv("S3_BUCKET_NAME")
if not bucket:
raise InternalServerException("S3_BUCKET_NAME not configured")

# S3 키 생성: {session_id}/{user_id}/{filename}
s3_key = f"{session_id}/{user_id}/{filename}" if session_id and user_id else f"chat/{filename}"
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3 key construction using user-provided session_id and user_id without sanitization could lead to path traversal vulnerabilities. If these values contain characters like ../, it could allow access to unintended S3 paths. Consider sanitizing these inputs by removing or replacing special characters before constructing the S3 key.

Copilot uses AI. Check for mistakes.

# S3 업로드
file_obj = BytesIO(file_content)
upload_fileobj(bucket=bucket, key=s3_key, fileobj=file_obj, content_type=content_type)

return s3_key

def _transcribe_audio(
self,
file_content: bytes,
filename: str,
content_type: str
) -> str:
"""STT로 음성을 텍스트로 변환"""
# 파일 내용을 BytesIO로 래핑하여 UploadFile처럼 사용
class FileWrapper:
def __init__(self, content: bytes, filename: str, content_type: str):
self.file = BytesIO(content)
self.filename = filename
self.content_type = content_type

wrapped_file = FileWrapper(file_content, filename, content_type)
stt_result = transcribe_voice(wrapped_file)

if stt_result.get("error"):
raise RuntimeException(f"STT failed: {stt_result.get('error')}")

content = stt_result.get("transcript", "")
if not content:
raise ValidationException("STT result is empty")

return content

def _analyze_emotion(
self,
file_content: bytes,
filename: str,
content_type: str,
text_content: str
) -> Dict[str, Any]:
"""음성 및 텍스트 감정 분석"""

class FileWrapper:
def __init__(self, content: bytes, filename: str, content_type: str):
self.file = BytesIO(content)
self.filename = filename
self.content_type = content_type
Comment on lines +157 to +161
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileWrapper class is duplicated in both _transcribe_audio and _analyze_emotion methods. This code duplication should be refactored by extracting FileWrapper as a module-level class or a separate helper method to improve maintainability.

Copilot uses AI. Check for mistakes.

wrapped_file = FileWrapper(file_content, filename, content_type)

# 3-1. Audio 감정 분석
wrapped_file.file.seek(0)
emotion_result = analyze_voice_emotion(wrapped_file)
if emotion_result.get("error"):
raise RuntimeException(f"Emotion analysis failed: {emotion_result.get('error')}")

audio_probs = emotion_result.get("emotion_scores", {})

# 3-2. 텍스트 감정 분석
text_sentiment = analyze_text_sentiment(text_content, language_code="ko")
if text_sentiment.get("error"):
raise RuntimeException(f"Text sentiment analysis failed: {text_sentiment.get('error')}")

sentiment_data = text_sentiment.get("sentiment", {})
text_score = sentiment_data.get("score", 0.0) # [-1, 1]
text_magnitude = sentiment_data.get("magnitude", 0.0) # [0, +inf)

# 3-3. VA Fusion으로 arousal, valence 계산
va_result = fuse_VA(audio_probs, text_score, text_magnitude)

# arousal, valence를 [0, 1] 범위로 변환 ([-1, 1] -> [0, 1])
valence = (va_result.get("V_final", 0.0) + 1.0) / 2.0 # [-1, 1] -> [0, 1]
arousal = (va_result.get("A_final", 0.0) + 1.0) / 2.0 # [-1, 1] -> [0, 1]

# emotion details 구성 (bps를 [0, 1] 범위로 변환)
per_emotion_bps = va_result.get("per_emotion_bps", {})
details = {
"angry": per_emotion_bps.get("angry", 0) / 10000.0,
"anxiety": per_emotion_bps.get("fear", 0) / 10000.0, # fear -> anxiety
"happy": per_emotion_bps.get("happy", 0) / 10000.0,
"neutral": per_emotion_bps.get("neutral", 0) / 10000.0,
"sad": per_emotion_bps.get("sad", 0) / 10000.0,
"surprise": per_emotion_bps.get("surprise", 0) / 10000.0,
}

top_emotion = va_result.get("top_emotion", "neutral")
# fear -> anxiety 변환
if top_emotion == "fear":
top_emotion = "anxiety"

top_confidence_bps = va_result.get("top_confidence_bps", 0)
confidence = top_confidence_bps / 10000.0 # [0, 1]

return {
"arousal": round(arousal, 2),
"confidence": round(confidence, 2),
"details": {k: round(v, 2) for k, v in details.items()},
"top_emotion": top_emotion,
"valence": round(valence, 2)
}

def _get_user_name(self, user_id: str) -> str:
"""사용자 이름 조회"""
if not user_id:
raise ValidationException("user_id is required")

auth_service = get_auth_service(self.db)
user = auth_service.get_user_by_username(user_id)
if not user:
raise NotFoundException(f"User not found: {user_id}")

return user.name

async def _send_to_chatbot(
self,
content: str,
emotion: Dict[str, Any],
question: str,
user_id: str,
user_name: str
) -> Dict[str, Any]:
"""외부 chatbot API로 전송"""
recorded_at = datetime.now().isoformat()

request_payload = {
"content": content,
"emotion": emotion,
"question": question,
"recorded_at": recorded_at,
"user_id": user_id,
"user_name": user_name
}

chatbot_url = os.getenv("CHATBOT_API_URL")

if not chatbot_url:
raise InternalServerException("CHATBOT_API_URL not configured")

try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
chatbot_url,
json=request_payload,
headers={"accept": "application/json", "Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
error_message = self._extract_external_error(exc)
raise ExternalAPIException(status_code=exc.response.status_code, message=error_message)
except httpx.RequestError as exc:
raise ExternalAPIException(status_code=500, message=f"External API request failed: {str(exc)}")

@staticmethod
def _extract_external_error(exc: httpx.HTTPStatusError) -> str:
"""외부 API 예외 메시지를 문자열로 변환"""
try:
json_body = exc.response.json()
if isinstance(json_body, dict) and "message" in json_body:
return str(json_body["message"])
return str(json_body)
except Exception:
return f"External API error: {exc.response.text}"


def get_analyze_chat_service(db: Session) -> AnalyzeChatService:
"""AnalyzeChatService 인스턴스 생성"""
return AnalyzeChatService(db)

Loading