Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 111 additions & 18 deletions app/router/recommendation_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from app.services.model_trainer.recommendation import generate_recommendations
from app.services.preprocess.user.user_preprocess import user_preprocess_data # 사용자 데이터 전처리 모듈 추가
from typing import List, Dict, Any
from datetime import datetime

# 라우터 설정
logger = logging.getLogger("recommendation_api")
Expand All @@ -22,35 +23,83 @@
# 글로벌 변수 초기화
globals_dict = {}
model_initializing = False # 모델 초기화 상태를 추적하는 전역 변수
last_initialization_attempt = None # 마지막 초기화 시도 시간

# 초기 데이터 로딩 및 모델 학습
def initialize_model():
global globals_dict, model_initializing
def initialize_model(force=False):
global globals_dict, model_initializing, last_initialization_attempt

# 이미 초기화 중이면 중복 실행 방지
if model_initializing and not force:
logger.info("모델 초기화가 이미 진행 중입니다.")
return False

# 마지막 초기화 시도 기록
current_time = datetime.now()
last_initialization_attempt = current_time

try:
# 초기화 상태 설정
model_initializing = True
logger.info("모델 초기화 시작")

# 데이터 디렉토리 및 파일 확인
if not os.path.exists(str(RESTAURANTS_DIR)) or not os.path.exists(str(USER_DIR)):
logger.error("필요한 데이터 디렉토리가 없습니다. 데이터 동기화가 완료되었는지 확인하세요.")
model_initializing = False
return False

# 파일 존재 여부 확인
restaurant_files = [f for f in os.listdir(str(RESTAURANTS_DIR)) if f.endswith('.json')]
user_files = [f for f in os.listdir(str(USER_DIR)) if f.endswith('.json')]

if not restaurant_files or not user_files:
logger.error(f"데이터 파일이 충분하지 않습니다. 식당 파일: {len(restaurant_files)}개, 사용자 파일: {len(user_files)}개")
model_initializing = False
return False

# 식당 데이터 로드
df_raw = load_restaurant_json_files(str(RESTAURANTS_DIR))
if df_raw.empty:
logger.error("식당 데이터가 비어 있습니다.")
model_initializing = False
return False

logger.info(f"식당 데이터 로드 완료: {len(df_raw)}개 식당")

# 사용자 데이터 로드 및 전처리
user_data_frames = load_user_json_files(str(USER_DIR))
if not user_data_frames:
logger.warning("사용자 데이터가 비어 있습니다. 기본 추천만 가능합니다.")
# 사용자 데이터가 없어도 계속 진행 (기본 추천만 제공)
else:
logger.info(f"사용자 데이터 로드 완료: {len(user_data_frames)}개 파일")

# 사용자 데이터 전처리 및 특성 추출
# 사용자 데이터 전처리 및 특성 추출 부분 수정
try:
# 전처리된 사용자 특성 파일 경로
user_features_path = os.path.join(str(USER_DIR), "preprocessed_user_features.csv")

# 이미 전처리된 파일이 있는지 확인
if os.path.exists(user_features_path):
# 이미 전처리된 파일이 있고 강제 초기화가 아니면 기존 파일 사용
if os.path.exists(user_features_path) and not force:
logger.info(f"기존 전처리된 사용자 특성 파일 로드: {user_features_path}")
user_features_df = pd.read_csv(user_features_path)
else:
# 사용자 데이터 전처리 실행
# 사용자 데이터 파일 경로 리스트 생성
logger.info("사용자 데이터 전처리 시작")
user_json_files = [
os.path.join(str(USER_DIR), f) for f in os.listdir(str(USER_DIR))
if f.endswith('.json') and not f.startswith('.') # 숨김 파일 제외
]

# 파일 경로 로깅
logger.info(f"처리할 사용자 데이터 파일: {len(user_json_files)}개")
for file in user_json_files[:5]: # 첫 5개만 로깅
logger.debug(f"- {os.path.basename(file)}")

# 파일 경로 리스트를 전달하여 전처리 수행
user_features_df = user_preprocess_data(
user_data_frames,
user_json_files, # 파일 경로 리스트 전달
save_path=user_features_path
)
logger.info(f"사용자 데이터 전처리 완료: {len(user_features_df)}명의 사용자 데이터")
Expand All @@ -73,18 +122,56 @@ def initialize_model():

# 원본 사용자 데이터 저장 (필요시)
globals_dict["user_data_frames"] = user_data_frames
globals_dict["last_update"] = datetime.now()

logger.info("모델 초기화 성공")
# 초기화 완료 상태로 설정
model_initializing = False
return True
except Exception as e:
logger.error(f"Error during initialization: {e}", exc_info=True)
logger.error(f"모델 초기화 중 오류 발생: {e}", exc_info=True)
globals_dict = {}
# 초기화 실패 상태로 설정
model_initializing = False
return False

# 서버 시작 시 모델 초기화
initialize_model()
# 모델 재초기화 엔드포인트 추가 (관리자용)
@router.post("/reload", response_model=Dict[str, str])
async def reload_model(force: bool = True):
"""모델을 강제로 다시 로드합니다. 관리자 전용 API입니다."""
result = initialize_model(force=force)

if result:
return {"status": "success", "message": "모델 재초기화가 완료되었습니다."}
else:
raise HTTPException(
status_code=503,
detail="모델 재초기화에 실패했습니다. 로그를 확인하세요."
)

# 모델 상태 확인 엔드포인트 추가 (상태 모니터링용)
@router.get("/status", response_model=Dict[str, Any])
async def check_model_status():
"""현재 모델의 초기화 상태를 확인합니다."""
global globals_dict, model_initializing, last_initialization_attempt

is_initialized = "stacking_reg" in globals_dict and "df_model" in globals_dict

status = {
"initialized": is_initialized,
"initializing": model_initializing,
"last_attempt": last_initialization_attempt.isoformat() if last_initialization_attempt else None,
"last_update": globals_dict.get("last_update").isoformat() if globals_dict.get("last_update") else None
}

if is_initialized:
# 기본 모델 통계 추가
status.update({
"restaurant_count": len(globals_dict.get("df_model", [])),
"user_count": len(globals_dict.get("user_features_df", [])) if globals_dict.get("user_features_df") is not None else 0
})

return status

# recommend 함수 내부 수정
@router.post("",
Expand All @@ -97,7 +184,7 @@ def initialize_model():

async def recommend(user_data: UserData, background_tasks: BackgroundTasks):
"""사용자 데이터를 받아 개인화된 추천 결과를 생성하고, 결과를 파일로 저장합니다."""
global model_initializing
global model_initializing, globals_dict

try:
# 모델 초기화 상태 확인
Expand All @@ -110,12 +197,18 @@ async def recommend(user_data: UserData, background_tasks: BackgroundTasks):
headers={"Retry-After": "30"} # 30초 후 재시도 권장
)
else:
# 초기화 실패 또는 아직 시작되지 않음
raise HTTPException(
status_code=503,
detail="모델 데이터가 초기화되지 않았습니다. 서버 관리자에게 문의하세요.",
headers={"Retry-After": "300"} # 5분 후 재시도 권장
)
# 초기화 실패했다면 자동으로 다시 시도
logger.info("모델이 초기화되지 않았습니다. 자동으로 초기화를 시도합니다.")
initialize_result = initialize_model()

if not initialize_result:
# 다시 시도해도 실패한 경우
raise HTTPException(
status_code=503,
detail="모델 데이터가 초기화되지 않았습니다. 서버 관리자에게 문의하세요.",
headers={"Retry-After": "300"} # 5분 후 재시도 권장
)
# 초기화 성공했다면 계속 진행

user_id = user_data.user_id
preferred_categories = user_data.preferred_categories
Expand Down Expand Up @@ -174,5 +267,5 @@ async def save_recommendation():
# 이미 생성된 HTTPException은 그대로 다시 발생시킴
raise
except Exception as e:
logger.error(f"Error in recommendation endpoint: {e}", exc_info=True)
logger.error(f"추천 API 처리 중 오류 발생: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
63 changes: 52 additions & 11 deletions app/services/background_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# app/servies/background_tasks.py

# app/services/background_tasks.py
import asyncio
import logging
from datetime import datetime
Expand All @@ -10,15 +9,31 @@

logger = logging.getLogger(__name__)

# 초기 동기화 상태를 추적하는 변수
_initial_sync_completed = False

# 첫 번째 주기적 동기화 지연 시간(시간)
FIRST_PERIODIC_SYNC_DELAY = 1.0 # 첫 주기적 동기화는 1시간 후에 시작

async def periodic_data_sync(hours_interval=24):
"""주기적인 데이터 동기화 및 모델 재학습 수행"""
global _initial_sync_completed

try:
# 초기 지연 (5초)
await asyncio.sleep(5)
# 초기 동기화가 완료될 때까지 대기
while not _initial_sync_completed:
logger.info("초기 동기화가 완료되지 않았습니다. 주기적 동기화는 초기 동기화 완료 후 시작됩니다.")
await asyncio.sleep(10) # 10초마다 확인

# 첫 주기적 동기화를 위한 추가 지연
logger.info(f"첫 주기적 동기화는 {FIRST_PERIODIC_SYNC_DELAY}시간 후에 시작됩니다.")
await asyncio.sleep(FIRST_PERIODIC_SYNC_DELAY * 3600) # 시간을 초로 변환

while True:
start_time = datetime.now()
try:
logger.info(f"주기적 데이터 동기화 시작 (간격: {hours_interval}시간)")

# MongoDB에서 데이터 가져오기
sync_result = await asyncio.get_event_loop().run_in_executor(None, fetch_data_from_mongodb)

Expand All @@ -33,28 +48,54 @@ async def periodic_data_sync(hours_interval=24):
except Exception as e:
logger.error(f"주기적 동기화 중 오류: {str(e)}", exc_info=True)

# 다음 실행까지 대기 (시간 -> 초)
next_sync_seconds = hours_interval * 3600
logger.info(f"다음 동기화까지 {next_sync_seconds}초 대기")
await asyncio.sleep(next_sync_seconds)
# 다음 실행 시간 계산 (실행 소요 시간 고려)
end_time = datetime.now()
elapsed_seconds = (end_time - start_time).total_seconds()

# 간격에서 이미 소요된 시간을 뺀 만큼만 대기
wait_seconds = max(1, hours_interval * 3600 - elapsed_seconds) # 최소 1초 대기
logger.info(f"다음 동기화까지 {wait_seconds/3600:.2f}시간 대기")
await asyncio.sleep(wait_seconds)

except asyncio.CancelledError:
logger.info("주기적 동기화 태스크 취소됨")
except Exception as e:
logger.error(f"주기적 동기화 태스크 오류: {e}", exc_info=True)
# 태스크가 중단되지 않도록 다시 시작 (5초 후)
await asyncio.sleep(5)
asyncio.create_task(periodic_data_sync(hours_interval))

async def run_initial_sync():
"""애플리케이션 시작 시 데이터 동기화 및 모델 초기화 수행"""
global _initial_sync_completed

try:
logger.info("초기 데이터 동기화 시작")

# 이미 초기화가 완료되었는지 확인
if _initial_sync_completed:
logger.info("초기 동기화가 이미 완료되었습니다. 건너뜁니다.")
return True

# MongoDB에서 데이터 가져오기
sync_result = await asyncio.get_event_loop().run_in_executor(None, fetch_data_from_mongodb)

if sync_result:
logger.info("초기 데이터 동기화 완료, 모델 초기화 시작")
# 직접 MongoDB 사용 옵션을 True로 설정하여 MongoDB에서 직접 데이터를 가져와 모델 초기화
await initialize_model(force_reload=True, use_direct_mongodb=True)
logger.info("초기화 완료")
init_result = await initialize_model(force_reload=True, use_direct_mongodb=True)

if init_result:
logger.info("초기화 완료")
_initial_sync_completed = True # 초기화 완료 상태 설정
return True
else:
logger.error("모델 초기화 실패")
return False
else:
logger.error("초기 데이터 동기화 실패, 모델 초기화 스킵")
return False

except Exception as e:
logger.error(f"초기 동기화 중 오류: {str(e)}", exc_info=True)
logger.error(f"초기 동기화 중 오류: {str(e)}", exc_info=True)
return False
Loading