diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index ad50c51b..54d1ab1a 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -2,6 +2,8 @@ name: CI (Python/FastAPI) on: push: + branches: + - feature/onnx tags: - 'pre-processing-v*' pull_request: diff --git a/.github/workflows/deploy-java.yml b/.github/workflows/deploy-java.yml index 9c876f2f..d7526506 100644 --- a/.github/workflows/deploy-java.yml +++ b/.github/workflows/deploy-java.yml @@ -28,6 +28,9 @@ jobs: echo "DB_USER=${{ secrets.DB_USER }}" >> .env.prod echo "DB_PASS=${{ secrets.DB_PASS }}" >> .env.prod echo "DB_NAME=${{ secrets.DB_NAME }}" >> .env.prod + echo "ENV_NAME=${{ secrets.LOKI_URL }}" >> .env.prod + echo "ENV_NAME=${{ secrets.LOKI_USERNAME }}" >> .env.prod + echo "ENV_NAME=${{ secrets.LOKI_PASSWORD }}" >> .env.prod echo "ENV_NAME=${{ secrets.ENV_NAME }}" >> .env.prod - name: Set repo lowercase @@ -52,6 +55,27 @@ jobs: target: "~/app/docker/production/" overwrite: true + - name: Copy Caddyfile to EC2 + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ secrets.SERVER_HOST }} + username: ubuntu + key: ${{ secrets.SERVER_SSH_KEY }} + source: "docker/production/Caddyfile" + target: "~/app/docker/production/" + overwrite: true + + - name: Copy promtail-config to EC2 + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ secrets.SERVER_HOST }} + username: ubuntu + key: ${{ secrets.SERVER_SSH_KEY }} + source: "docker/production/promtail-config.yml" + target: "~/app/docker/production/" + overwrite: true + + - name: Deploy on EC2 uses: appleboy/ssh-action@v1.0.3 with: diff --git a/apps/pre-processing-service/app/api/endpoints/blog.py b/apps/pre-processing-service/app/api/endpoints/blog.py index 04ae0b14..d0d078e8 100644 --- a/apps/pre-processing-service/app/api/endpoints/blog.py +++ b/apps/pre-processing-service/app/api/endpoints/blog.py @@ -4,16 +4,16 @@ from ...model.schemas import * from app.service.blog.tistory_blog_post_service import TistoryBlogPostService from app.service.blog.naver_blog_post_service import NaverBlogPostService -from ...service.blog.blogger_blog_post_service import BloggerBlogPostService +from ...service.blog.blogger_blog_post_adapter import ( + BloggerBlogPostAdapter, +) # 수정된 import +from app.utils.response import Response +from app.service.blog.blog_create_service import BlogContentService +from app.service.blog.blog_publish_service import BlogPublishService router = APIRouter() -@router.get("/", summary="블로그 API 상태 확인") -async def root(): - return {"message": "blog API"} - - @router.post( "/rag/create", response_model=ResponseBlogCreate, @@ -23,7 +23,10 @@ async def rag_create(request: RequestBlogCreate): """ RAG 기반 블로그 콘텐츠 생성 """ - return {"message": "blog API"} + blog_service = BlogContentService() + response_data = blog_service.generate_blog_content(request) + + return Response.ok(response_data) @router.post( @@ -37,52 +40,7 @@ async def publish(request: RequestBlogPublish): 네이버 블로그와 티스토리 블로그를 지원하며, 현재는 생성된 콘텐츠가 아닌 임의의 제목, 내용, 태그를 배포합니다. """ - if request.tag == "naver": - naver_service = NaverBlogPostService() - result = naver_service.post_content( - title=request.post_title, - content=request.post_content, - tags=request.post_tags, - ) - - if not result: - raise CustomException( - "네이버 블로그 포스팅에 실패했습니다.", status_code=500 - ) - return ResponseBlogPublish( - job_id=1, schedule_id=1, schedule_his_id=1, status="200", metadata=result - ) - - elif request.tag == "tistory": - tistory_service = TistoryBlogPostService() - result = tistory_service.post_content( - title=request.post_title, - content=request.post_content, - tags=request.post_tags, - ) - - if not result: - raise CustomException( - "티스토리 블로그 포스팅에 실패했습니다.", status_code=500 - ) - - return ResponseBlogPublish( - job_id=1, schedule_id=1, schedule_his_id=1, status="200", metadata=result - ) - - elif request.tag == "blogger": - blogger_service = BloggerBlogPostService() - result = blogger_service.post_content( - title=request.post_title, - content=request.post_content, - tags=request.post_tags, - ) - - if not result: - raise CustomException( - "블로거 블로그 포스팅에 실패했습니다.", status_code=500 - ) + publish_service = BlogPublishService() + response_data = publish_service.publish_content(request) - return ResponseBlogPublish( - job_id=1, schedule_id=1, schedule_his_id=1, status="200", metadata=result - ) + return Response.ok(response_data) diff --git a/apps/pre-processing-service/app/api/endpoints/keywords.py b/apps/pre-processing-service/app/api/endpoints/keywords.py index 2b407d6d..6c1627bd 100644 --- a/apps/pre-processing-service/app/api/endpoints/keywords.py +++ b/apps/pre-processing-service/app/api/endpoints/keywords.py @@ -6,14 +6,6 @@ router = APIRouter() -@router.get("/", summary="키워드 API 상태 확인") -async def root(): - """ - 키워드 API가 정상 동작하는지 확인 - """ - return {"message": "keyword API"} - - @router.post( "/search", response_model=ResponseNaverSearch, summary="네이버 키워드 검색" ) @@ -23,9 +15,6 @@ async def search(request: RequestNaverSearch): 요청 예시: { - "job_id": 1, - "schedule_id": 1, - "schedule_his_id": 1, "tag": "naver", "category": "50000000", "start_date": "2025-09-01", @@ -34,15 +23,3 @@ async def search(request: RequestNaverSearch): """ response_data = await keyword_search(request) return response_data - - -@router.post( - "/ssadagu/validate", - response_model=ResponseNaverSearch, - summary="사다구몰 키워드 검증", -) -async def ssadagu_validate(request: RequestNaverSearch): - """ - 사다구몰 키워드 검증 테스트용 엔드포인트 - """ - return ResponseNaverSearch() diff --git a/apps/pre-processing-service/app/api/endpoints/product.py b/apps/pre-processing-service/app/api/endpoints/product.py index ceb55c9d..32a4dcbe 100644 --- a/apps/pre-processing-service/app/api/endpoints/product.py +++ b/apps/pre-processing-service/app/api/endpoints/product.py @@ -8,6 +8,7 @@ from ...service.crawl_service import CrawlService from ...service.search_service import SearchService from ...service.match_service import MatchService +from ...service.similarity_service import SimilarityService # from ...service.similarity_service import SimilarityService @@ -16,14 +17,6 @@ router = APIRouter() -@router.get("/", summary="상품 API 상태 확인") -async def root(): - """ - 상품 API 서버 상태 확인용 엔드포인트 - """ - return {"message": "product API"} - - @router.post("/search", response_model=ResponseSadaguSearch, summary="상품 검색") async def search(request: RequestSadaguSearch): """ @@ -31,12 +24,12 @@ async def search(request: RequestSadaguSearch): """ try: search_service = SearchService() - result = await search_service.search_products(request) + response_data = await search_service.search_products(request) - if not result: + if not response_data: raise CustomException(500, "상품 검색에 실패했습니다.", "SEARCH_FAILED") - return result + return response_data except InvalidItemDataException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: @@ -50,56 +43,56 @@ async def match(request: RequestSadaguMatch): """ try: match_service = MatchService() - result = match_service.match_products(request) + response_data = match_service.match_products(request) - if not result: + if not response_data: raise CustomException(500, "상품 매칭에 실패했습니다.", "MATCH_FAILED") - return result + return response_data except InvalidItemDataException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) -# @router.post( -# "/similarity", response_model=ResponseSadaguSimilarity, summary="상품 유사도 분석" -# ) -# async def similarity(request: RequestSadaguSimilarity): -# """ -# 매칭된 상품들 중 키워드와의 유사도를 계산하여 최적의 상품을 선택합니다. -# """ -# try: -# similarity_service = SimilarityService() -# result = similarity_service.select_product_by_similarity(request) -# -# if not result: -# raise CustomException( -# 500, "유사도 분석에 실패했습니다.", "SIMILARITY_FAILED" -# ) -# -# return result -# except InvalidItemDataException as e: -# raise HTTPException(status_code=e.status_code, detail=e.detail) -# except Exception as e: -# raise HTTPException(status_code=500, detail=str(e)) +@router.post( + "/similarity", response_model=ResponseSadaguSimilarity, summary="상품 유사도 분석" +) +async def similarity(request: RequestSadaguSimilarity): + """ + 매칭된 상품들 중 키워드와의 유사도를 계산하여 최적의 상품을 선택합니다. + """ + try: + similarity_service = SimilarityService() + response_data = similarity_service.select_product_by_similarity(request) + + if not response_data: + raise CustomException( + 500, "유사도 분석에 실패했습니다.", "SIMILARITY_FAILED" + ) + + return response_data + except InvalidItemDataException as e: + raise HTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @router.post( "/crawl", response_model=ResponseSadaguCrawl, summary="상품 상세 정보 크롤링" ) -async def crawl(request: Request, body: RequestSadaguCrawl): +async def crawl(body: RequestSadaguCrawl): """ 상품 상세 페이지를 크롤링하여 상세 정보를 수집합니다. """ try: crawl_service = CrawlService() - result = await crawl_service.crawl_product_detail(body) + response_data = await crawl_service.crawl_product_detail(body) - if not result: + if not response_data: raise CustomException(500, "상품 크롤링에 실패했습니다.", "CRAWL_FAILED") - return result + return response_data except InvalidItemDataException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except ItemNotFoundException as e: diff --git a/apps/pre-processing-service/app/api/endpoints/sample.py b/apps/pre-processing-service/app/api/endpoints/sample.py new file mode 100644 index 00000000..f6d586fb --- /dev/null +++ b/apps/pre-processing-service/app/api/endpoints/sample.py @@ -0,0 +1,45 @@ +from fastapi import APIRouter +from ...model.schemas import * +from app.utils.response import Response + +router = APIRouter() + + +@router.get("/") +async def root(): + return {"message": "sample API"} + + +@router.post("/keywords/search", summary="네이버 키워드 검색") +async def search(request: RequestNaverSearch): + return Response.ok({"test": "hello world"}) + + +@router.post("/blogs/rag/create", summary="RAG 기반 블로그 콘텐츠 생성") +async def rag_create(request: RequestBlogCreate): + return Response.ok({"test": "hello world"}) + + +@router.post("/blogs/publish", summary="블로그 콘텐츠 배포") +async def publish(request: RequestBlogPublish): + return Response.ok({"test": "hello world"}) + + +@router.post("/products/search", summary="상품 검색") +async def product_search(request: RequestSadaguSearch): + return Response.ok({"test": "hello world"}) + + +@router.post("/products/match", summary="상품 매칭") +async def product_match(request: RequestSadaguMatch): + return Response.ok({"test": "hello world"}) + + +@router.post("/products/similarity", summary="상품 유사도 분석") +async def product_similarity(request: RequestSadaguSimilarity): + return Response.ok({"test": "hello world"}) + + +@router.post("/products/crawl", summary="상품 상세 정보 크롤링") +async def product_crawl(request: RequestSadaguCrawl): + return Response.ok({"test": "hello world"}) diff --git a/apps/pre-processing-service/app/api/endpoints/test.py b/apps/pre-processing-service/app/api/endpoints/test.py index e26bd203..91977a3f 100644 --- a/apps/pre-processing-service/app/api/endpoints/test.py +++ b/apps/pre-processing-service/app/api/endpoints/test.py @@ -21,11 +21,6 @@ router = APIRouter() -@router.get("/") -async def root(): - return {"message": "테스트 API"} - - @router.get("/hello/{name}", tags=["hello"]) # @log_api_call async def say_hello(name: str): @@ -67,11 +62,6 @@ def with_meta(data: Mapping[str, Any], meta: Mapping[str, Any]) -> Dict[str, Any @router.get("/tester", response_model=None) async def processing_tester(): - meta = { - "job_id": 1, - "schedule_id": 1, - "schedule_his_id": 1, # ✅ 타이포 수정 - } request_dict = { "tag": "naver", "category": "50000000", @@ -79,7 +69,7 @@ async def processing_tester(): "end_date": "2025-09-02", } # 네이버 키워드 검색 - naver_request = RequestNaverSearch(**with_meta(meta, request_dict)) + naver_request = RequestNaverSearch(**with_meta(request_dict)) response_data = await keyword_search(naver_request) keyword = response_data.get("keyword") loguru.logger.info(keyword) @@ -89,21 +79,21 @@ async def processing_tester(): } # 싸다구 상품 검색 - sadagu_request = RequestSadaguSearch(**with_meta(meta, keyword)) + sadagu_request = RequestSadaguSearch(**with_meta(keyword)) search_service = SearchService() keyword_result = await search_service.search_products(sadagu_request) loguru.logger.info(keyword_result) # 싸다구 상품 매치 keyword["search_results"] = keyword_result.get("search_results") - keyword_match_request = RequestSadaguMatch(**with_meta(meta, keyword)) + keyword_match_request = RequestSadaguMatch(**with_meta(keyword)) match_service = MatchService() keyword_match_response = match_service.match_products(keyword_match_request) loguru.logger.info(keyword_match_response) # 싸다구 상품 유사도 분석 keyword["matched_products"] = keyword_match_response.get("matched_products") - keyword_similarity_request = RequestSadaguSimilarity(**with_meta(meta, keyword)) + keyword_similarity_request = RequestSadaguSimilarity(**with_meta(keyword)) # similarity_service = SimilarityService() # keyword_similarity_response = similarity_service.select_product_by_similarity( # keyword_similarity_request diff --git a/apps/pre-processing-service/app/api/router.py b/apps/pre-processing-service/app/api/router.py index 99286cf6..c1a2fcb4 100644 --- a/apps/pre-processing-service/app/api/router.py +++ b/apps/pre-processing-service/app/api/router.py @@ -1,6 +1,6 @@ # app/api/router.py from fastapi import APIRouter -from .endpoints import keywords, blog, product, test +from .endpoints import keywords, blog, product, test, sample from ..core.config import settings api_router = APIRouter() @@ -17,6 +17,8 @@ # 모듈 테스터를 위한 endpoint -> 추후 삭제 예정 api_router.include_router(test.router, prefix="/tests", tags=["Test"]) +api_router.include_router(sample.router, prefix="/v0", tags=["Sample"]) + @api_router.get("/ping") async def root(): diff --git a/apps/pre-processing-service/app/core/config.py b/apps/pre-processing-service/app/core/config.py index ed54cc69..2de3833a 100644 --- a/apps/pre-processing-service/app/core/config.py +++ b/apps/pre-processing-service/app/core/config.py @@ -76,10 +76,18 @@ class BaseSettingsConfig(BaseSettings): db_pass: str db_name: str env_name: str + app_name: str # MeCab 사전 경로 (자동 감지) mecab_path: Optional[str] = None + # Loki 설정 + loki_host: str = "localhost" + loki_port: int = 3100 + + # 테스트/추가용 필드 + openai_api_key: Optional[str] = None # << 이 부분 추가 + def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/apps/pre-processing-service/app/core/logging_config.py b/apps/pre-processing-service/app/core/logging_config.py new file mode 100644 index 00000000..b2c10d88 --- /dev/null +++ b/apps/pre-processing-service/app/core/logging_config.py @@ -0,0 +1,95 @@ +import os +from loguru import logger +import sys +from contextvars import ContextVar + +# trace_id context 변수 import +try: + from app.middleware.ServiceLoggerMiddleware import trace_id_context +except ImportError: + # 모듈이 아직 로드되지 않은 경우를 위한 기본값 + trace_id_context: ContextVar[str] = ContextVar("trace_id", default="") + + +def setup_file_logging(): + """ + PromTail을 통해 Loki로 전송하기 위한 파일 로깅 설정 + """ + # 기존 loguru 핸들러 제거 (기본 콘솔 출력 제거) + logger.remove() + + # 환경변수로 로그 디렉토리 설정 (기본값: logs/develop) + log_dir = "../../docker/local/logs/develop" + + # 로그 디렉토리가 없으면 생성 + + # 로그 파일 경로 설정 + log_file_path = log_dir + "/pre-processing-app.log" + error_log_file_path = log_dir + "/pre-processing-app-error.log" + + # trace_id를 포함한 간단한 포맷 문자열 사용 + def add_trace_id_filter(record): + try: + current_trace_id = trace_id_context.get() + if current_trace_id: + record["extra"]["trace_id"] = current_trace_id + else: + record["extra"]["trace_id"] = "" + except LookupError: + record["extra"]["trace_id"] = "" + return record + + # 파일 로깅에서 LoggingMiddleware 제외하는 필터 + def exclude_logging_middleware_filter(record): + # LoggingMiddleware의 로그는 파일에 기록하지 않음 + if record["name"] == "app.middleware.logging": + return False + return add_trace_id_filter(record) + + # 파일 로깅 핸들러 추가 - trace_id 포함, LoggingMiddleware 제외 + logger.add( + log_file_path, + format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}", + level="DEBUG", + rotation="100 MB", # 100MB마다 로테이션 + retention="7 days", # 7일간 보관 + compression="zip", # 압축 + enqueue=True, # 멀티프로세스 안전 + serialize=False, # JSON 직렬화 비활성화 (PromTail에서 파싱) + backtrace=True, # 백트레이스 포함 + diagnose=True, # 진단 정보 포함 + filter=exclude_logging_middleware_filter, + ) + + # 에러 레벨 이상은 별도 파일에도 기록 - trace_id 포함, LoggingMiddleware 제외 + logger.add( + error_log_file_path, + format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}", + level="ERROR", + rotation="50 MB", + retention="30 days", + compression="zip", + enqueue=True, + serialize=False, + backtrace=True, + diagnose=True, + filter=exclude_logging_middleware_filter, + ) + + # 개발 환경에서는 콘솔 출력도 유지 + if os.getenv("ENVIRONMENT", "development") == "development": + logger.add( + sys.stdout, + format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}", + level="DEBUG", + colorize=False, # colorize 비활성화하여 태그 충돌 방지 + filter=add_trace_id_filter, + ) + + logger.info("File logging setup completed for PromTail integration") + return logger + + +def get_logger(): + """구성된 로거 인스턴스 반환""" + return logger diff --git a/apps/pre-processing-service/app/main.py b/apps/pre-processing-service/app/main.py index 9865d845..4bbf3ff1 100644 --- a/apps/pre-processing-service/app/main.py +++ b/apps/pre-processing-service/app/main.py @@ -5,6 +5,11 @@ from fastapi.exceptions import RequestValidationError from app.middleware.ServiceLoggerMiddleware import ServiceLoggerMiddleware +# 파일 로깅 설정 초기화 +from app.core.logging_config import setup_file_logging + +setup_file_logging() + # --- 애플리케이션 구성 요소 임포트 --- from app.api.router import api_router from app.middleware.logging import LoggingMiddleware diff --git a/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py b/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py index acb120fa..30d3475b 100644 --- a/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py +++ b/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py @@ -8,8 +8,12 @@ import json import time +import asyncio -trace_id_context: ContextVar[str] = ContextVar("trace_id", default="NO_TRACE_ID") +from app.middleware.rds_logger import RDSLogger +from app.middleware.loki_logger import LokiLogger + +trace_id_context: ContextVar[str] = ContextVar("trace_id", default="") class ServiceLoggerMiddleware(BaseHTTPMiddleware): @@ -18,9 +22,17 @@ class ServiceLoggerMiddleware(BaseHTTPMiddleware): URL 패턴을 기반으로 자동으로 서비스 타입 식별 및 로깅 """ - def __init__(self, app, service_mappings: Dict[str, Dict] = None): + def __init__( + self, + app, + service_mappings: Dict[str, Dict] = None, + enable_rds: bool = True, + enable_loki: bool = True, + ): """ :param service_mappings: URL 패턴별 서비스 설정 + :param enable_rds: RDS 로깅 활성화 여부 + :param enable_loki: Loki 로깅 활성화 여부 예: { "/keywords/search": { "service_type": "NAVER_CRAWLING", @@ -31,13 +43,23 @@ def __init__(self, app, service_mappings: Dict[str, Dict] = None): """ super().__init__(app) self.service_mappings = service_mappings or self._default_mappings() + self.enable_rds = enable_rds + self.enable_loki = enable_loki + + # 로거 인스턴스 초기화 + self.rds_logger = RDSLogger() if enable_rds else None + # Loki 직접 로깅 비활성화 - PromTail을 통해 파일로 로깅 + # self.loki_logger = LokiLogger() if enable_loki else None + self.loki_logger = None def _default_mappings(self) -> Dict[str, Dict]: """기본 서비스 매핑 설정""" return { + # 네이버 키워드 검색 "/keywords/search": { "service_type": "NAVER_CRAWLING", "track_params": [ + "tag", "keyword", "category", "startDate", @@ -45,25 +67,121 @@ def _default_mappings(self) -> Dict[str, Dict]: "job_id", "schedule_id", ], - "response_trackers": ["keyword", "total_keywords", "results_count"], + "response_trackers": ["keyword", "total_keyword", "success", "status"], }, + # 블로그 RAG 콘텐츠 생성 + "/blogs/rag/create": { + "service_type": "BLOG_RAG_CREATE", + "track_params": [ + "keyword", + "product_info", + "content_type", + "target_length", + "job_id", + "schedule_id", + "schedule_his_id", + ], + "response_trackers": [ + "title", + "content_length", + "tags_count", + "success", + "status", + ], + }, + # 블로그 배포 "/blogs/publish": { "service_type": "BLOG_PUBLISH", "track_params": [ "tag", - "title", - "content", - "tags", + "blog_id", + "post_title", + "post_content", + "post_tags", + "job_id", + "schedule_id", + "schedule_his_id", + ], + "response_trackers": [ + "tag", + "post_title", + "post_url", + "published_at", + "publish_success", + "metadata", + "success", + "status", + ], + }, + # 상품 검색 + "/products/search": { + "service_type": "PRODUCT_SEARCH", + "track_params": [ + "keyword", "job_id", "schedule_id", "schedule_his_id", ], "response_trackers": [ + "keyword", + "search_results_count", + "success", + "status", + ], + }, + # 상품 매칭 + "/products/match": { + "service_type": "PRODUCT_MATCH", + "track_params": [ + "keyword", + "search_results", "job_id", "schedule_id", "schedule_his_id", + ], + "response_trackers": [ + "keyword", + "matched_products_count", + "success", + "status", + ], + }, + # 상품 유사도 분석 + "/products/similarity": { + "service_type": "PRODUCT_SIMILARITY", + "track_params": [ + "keyword", + "matched_products", + "search_results", + "job_id", + "schedule_id", + "schedule_his_id", + ], + "response_trackers": [ + "keyword", + "selected_product", + "reason", + "success", + "status", + ], + }, + # 상품 크롤링 + "/products/crawl": { + "service_type": "PRODUCT_CRAWL", + "track_params": [ + "tag", + "product_url", + "job_id", + "schedule_id", + "schedule_his_id", + ], + "response_trackers": [ + "tag", + "product_url", + "product_detail", + "crawled_at", + "success", "status", - "metadata", ], }, } @@ -78,7 +196,8 @@ async def dispatch(self, request: Request, call_next): return await call_next(request) # 2. 시작 로깅 - trace_id = trace_id_context.get("NO_TRACE_ID") + trace_id = request.headers.get("X-Request-ID", "") + trace_id_context.set(trace_id) start_time = time.time() # 파라미터 추출 및 시작 로그 @@ -91,11 +210,23 @@ async def dispatch(self, request: Request, call_next): service_type = service_config["service_type"] logger.info(f"[{service_type}_START] trace_id={trace_id}{param_str}") + # source_id 추출 (job_id, schedule_id 등에서) + source_id = self._extract_source_id(params) + run_id = params.get("run_id") + + # RDS 및 Loki에 시작 로그 전송 + start_message = f"[{service_type}_START]{param_str}" + await self._log_to_external_systems( + "start", service_type, source_id, trace_id, start_message, run_id, params + ) + # 3. 요청 처리 try: response = await call_next(request) # 4. 성공 로깅 + duration_ms = int((time.time() - start_time) * 1000) + if 200 <= response.status_code < 300: await self._log_success_response( service_type, @@ -105,16 +236,60 @@ async def dispatch(self, request: Request, call_next): response, service_config["response_trackers"], ) + + # 외부 로깅 시스템에 성공 로그 전송 + success_message = f"[{service_type}_SUCCESS]{param_str} status_code={response.status_code}" + await self._log_to_external_systems( + "success", + service_type, + source_id, + trace_id, + success_message, + run_id, + params, + duration_ms=duration_ms, + ) else: await self._log_error_response( service_type, trace_id, start_time, param_str, response ) + # 외부 로깅 시스템에 에러 로그 전송 + error_message = f"[{service_type}_ERROR]{param_str} status_code={response.status_code}" + await self._log_to_external_systems( + "error", + service_type, + source_id, + trace_id, + error_message, + run_id, + params, + duration_ms=duration_ms, + error_code=f"HTTP_{response.status_code}", + ) + return response except Exception as e: # 5. 예외 로깅 + duration_ms = int((time.time() - start_time) * 1000) await self._log_exception(service_type, trace_id, start_time, param_str, e) + + # 외부 로깅 시스템에 예외 로그 전송 + exception_message = ( + f"[{service_type}_EXCEPTION]{param_str} exception={str(e)}" + ) + await self._log_to_external_systems( + "error", + service_type, + source_id, + trace_id, + exception_message, + run_id, + params, + duration_ms=duration_ms, + error_code="EXCEPTION", + ) raise def _get_service_config(self, url_path: str) -> Optional[Dict]: @@ -248,3 +423,94 @@ async def _log_exception( f"execution_time={duration:.4f}s{param_str} " f"exception={str(exception)}" ) + + def _extract_source_id(self, params: Dict[str, Any]) -> int: + """파라미터에서 source_id 추출 (job_id, schedule_id 등 우선순위)""" + for key in ["job_id", "schedule_id", "task_id", "workflow_id"]: + if key in params and params[key]: + try: + return int(params[key]) + except (ValueError, TypeError): + continue + return 0 # 기본값 + + async def _log_to_external_systems( + self, + log_type: str, # start, success, error + service_type: str, + source_id: int, + trace_id: str, + message: str, + run_id: Optional[int] = None, + params: Optional[Dict[str, Any]] = None, + duration_ms: Optional[int] = None, + error_code: Optional[str] = None, + ): + """RDS와 Loki에 로그 전송""" + tasks = [] + + # 로깅할 추가 데이터 준비 + additional_data = params.copy() if params else {} + + if self.rds_logger: + if log_type == "start": + task = self.rds_logger.log_start( + service_type, source_id, trace_id, message, run_id, additional_data + ) + elif log_type == "success": + task = self.rds_logger.log_success( + service_type, + source_id, + trace_id, + message, + duration_ms, + run_id, + additional_data, + ) + elif log_type == "error": + task = self.rds_logger.log_error( + service_type, + source_id, + trace_id, + message, + error_code, + duration_ms, + run_id, + additional_data, + ) + tasks.append(task) + + if self.loki_logger: + if log_type == "start": + task = self.loki_logger.log_start( + service_type, source_id, trace_id, message, run_id, additional_data + ) + elif log_type == "success": + task = self.loki_logger.log_success( + service_type, + source_id, + trace_id, + message, + duration_ms, + run_id, + additional_data, + ) + elif log_type == "error": + task = self.loki_logger.log_error( + service_type, + source_id, + trace_id, + message, + error_code, + duration_ms, + run_id, + additional_data, + ) + tasks.append(task) + + # 비동기로 병렬 실행 (로깅 실패가 메인 로직에 영향을 주지 않도록) + if tasks: + try: + await asyncio.gather(*tasks, return_exceptions=True) + except Exception as e: + logger.debug(f"외부 로깅 시스템 전송 중 일부 실패: {e}") diff --git a/apps/pre-processing-service/app/middleware/logging.py b/apps/pre-processing-service/app/middleware/logging.py index 9a8cb6a0..15bfd757 100644 --- a/apps/pre-processing-service/app/middleware/logging.py +++ b/apps/pre-processing-service/app/middleware/logging.py @@ -3,11 +3,24 @@ from loguru import logger from starlette.middleware.base import BaseHTTPMiddleware +# trace_id context 변수 import +try: + from app.middleware.ServiceLoggerMiddleware import trace_id_context +except ImportError: + from contextvars import ContextVar + + trace_id_context: ContextVar[str] = ContextVar("trace_id", default="") + class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start_time = time.time() + # trace_id 설정 (X-Request-ID 헤더에서) + current_trace_id = request.headers.get("X-Request-ID", "") + if current_trace_id: + trace_id_context.set(current_trace_id) + # 1. 요청 시작 로그 logger.info( "요청 시작: IP='{}' 메서드='{}' URL='{}'", diff --git a/apps/pre-processing-service/app/middleware/loki_logger.py b/apps/pre-processing-service/app/middleware/loki_logger.py new file mode 100644 index 00000000..0a4d603c --- /dev/null +++ b/apps/pre-processing-service/app/middleware/loki_logger.py @@ -0,0 +1,198 @@ +import json +import aiohttp +import asyncio +from typing import Dict, List, Any, Optional +from datetime import datetime +from loguru import logger + +from app.model.execution_log import ExecutionLog +from app.core.config import settings + + +class LokiLogger: + """Loki에 로그를 전송하는 클래스""" + + def __init__(self): + self.loki_url = f"{settings.loki_host}:{settings.loki_port}/loki/api/v1/push" + self.app_name = settings.app_name + self.session = None + + async def _get_session(self) -> aiohttp.ClientSession: + """aiohttp 세션 관리""" + if self.session is None or self.session.closed: + self.session = aiohttp.ClientSession() + return self.session + + async def close(self): + """세션 종료""" + if self.session and not self.session.closed: + await self.session.close() + + async def send_log( + self, + execution_type: str, + source_id: int, + log_level: str, + log_message: str, + trace_id: Optional[str] = None, + run_id: Optional[int] = None, + status: Optional[str] = None, + duration_ms: Optional[int] = None, + error_code: Optional[str] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """ + Loki로 로그 전송 + + Args: + execution_type: task, schedule, job, workflow + source_id: 모든 데이터에 대한 ID + log_level: INFO, ERROR, WARNING, DEBUG + log_message: 로그 메시지 + trace_id: 추적 ID + run_id: 실행 ID + status: SUCCESS, ERROR, RUNNING, PENDING + duration_ms: 실행 시간(밀리초) + error_code: 에러 코드 + additional_data: 추가 데이터 + + Returns: + bool: 전송 성공 여부 + """ + try: + execution_log = ExecutionLog( + execution_type=execution_type, + source_id=source_id, + log_level=log_level, + executed_at=datetime.now(), + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status=status, + duration_ms=duration_ms, + error_code=error_code, + reserved4=additional_data, + ) + + loki_data = execution_log.to_loki_format(self.app_name) + + # Loki push API 형식으로 변환 + payload = { + "streams": [ + { + "stream": loki_data["labels"], + "values": [ + [ + str(loki_data["log"]["timestamp"]), + json.dumps(loki_data["log"], ensure_ascii=False), + ] + ], + } + ] + } + + session = await self._get_session() + + async with session.post( + self.loki_url, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=aiohttp.ClientTimeout(total=5), + ) as response: + if response.status == 204: + # logger.debug(f"Loki 로그 전송 성공: {execution_type} - {log_message[:50]}...") + return True + else: + response_text = await response.text() + logger.error( + f"Loki 로그 전송 실패: status={response.status}, response={response_text}" + ) + return False + + except asyncio.TimeoutError: + logger.error("Loki 로그 전송 타임아웃") + return False + except Exception as e: + logger.error(f"Loki 로그 전송 실패: {str(e)}") + return False + + async def log_start( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """시작 로그 전송""" + return await self.send_log( + execution_type=execution_type, + source_id=source_id, + log_level="INFO", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="RUNNING", + additional_data=additional_data, + ) + + async def log_success( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + duration_ms: int, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """성공 로그 전송""" + return await self.send_log( + execution_type=execution_type, + source_id=source_id, + log_level="INFO", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="SUCCESS", + duration_ms=duration_ms, + additional_data=additional_data, + ) + + async def log_error( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + error_code: str, + duration_ms: Optional[int] = None, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """에러 로그 전송""" + return await self.send_log( + execution_type=execution_type, + source_id=source_id, + log_level="ERROR", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="ERROR", + duration_ms=duration_ms, + error_code=error_code, + additional_data=additional_data, + ) + + def __del__(self): + """소멸자에서 세션 정리""" + if self.session and not self.session.closed: + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self.session.close()) + else: + loop.run_until_complete(self.session.close()) + except: + pass diff --git a/apps/pre-processing-service/app/middleware/rds_logger.py b/apps/pre-processing-service/app/middleware/rds_logger.py new file mode 100644 index 00000000..66bad19c --- /dev/null +++ b/apps/pre-processing-service/app/middleware/rds_logger.py @@ -0,0 +1,153 @@ +from typing import Optional +from datetime import datetime +import traceback +from loguru import logger + +from app.db.mariadb_manager import MariadbManager +from app.model.execution_log import ExecutionLog + + +class RDSLogger: + """RDS(MariaDB)에 로그를 저장하는 클래스""" + + def __init__(self): + self.db_manager = MariadbManager() + + async def log_execution( + self, + execution_type: str, + source_id: int, + log_level: str, + log_message: str, + trace_id: Optional[str] = None, + run_id: Optional[int] = None, + status: Optional[str] = None, + duration_ms: Optional[int] = None, + error_code: Optional[str] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """ + execution_log 테이블에 로그 저장 + + Args: + execution_type: task, schedule, job, workflow + source_id: 모든 데이터에 대한 ID + log_level: INFO, ERROR, WARNING, DEBUG + log_message: 로그 메시지 + trace_id: 추적 ID + run_id: 실행 ID + status: SUCCESS, ERROR, RUNNING, PENDING + duration_ms: 실행 시간(밀리초) + error_code: 에러 코드 + additional_data: 추가 데이터 (reserved4에 JSON으로 저장) + + Returns: + bool: 저장 성공 여부 + """ + try: + execution_log = ExecutionLog( + execution_type=execution_type, + source_id=source_id, + log_level=log_level, + executed_at=datetime.now(), + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status=status, + duration_ms=duration_ms, + error_code=error_code, + reserved4=additional_data, + ) + + log_data = execution_log.to_dict() + + # 컬럼명과 값 분리 + columns = list(log_data.keys()) + values = list(log_data.values()) + placeholders = ", ".join(["%s"] * len(values)) + columns_str = ", ".join(columns) + + insert_query = f""" + INSERT INTO execution_log ({columns_str}) + VALUES ({placeholders}) + """ + + with self.db_manager.get_cursor() as cursor: + cursor.execute(insert_query, values) + + # logger.debug(f"RDS 로그 저장 성공: {execution_type} - {log_message[:50]}...") + return True + + except Exception as e: + logger.error(f"RDS 로그 저장 실패: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + return False + + async def log_start( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """시작 로그 저장""" + return await self.log_execution( + execution_type=execution_type, + source_id=source_id, + log_level="INFO", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="RUNNING", + additional_data=additional_data, + ) + + async def log_success( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + duration_ms: int, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """성공 로그 저장""" + return await self.log_execution( + execution_type=execution_type, + source_id=source_id, + log_level="INFO", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="SUCCESS", + duration_ms=duration_ms, + additional_data=additional_data, + ) + + async def log_error( + self, + execution_type: str, + source_id: int, + trace_id: str, + log_message: str, + error_code: str, + duration_ms: Optional[int] = None, + run_id: Optional[int] = None, + additional_data: Optional[dict] = None, + ) -> bool: + """에러 로그 저장""" + return await self.log_execution( + execution_type=execution_type, + source_id=source_id, + log_level="ERROR", + log_message=log_message, + trace_id=trace_id, + run_id=run_id, + status="ERROR", + duration_ms=duration_ms, + error_code=error_code, + additional_data=additional_data, + ) diff --git a/apps/pre-processing-service/app/model/execution_log.py b/apps/pre-processing-service/app/model/execution_log.py new file mode 100644 index 00000000..c1bef2e0 --- /dev/null +++ b/apps/pre-processing-service/app/model/execution_log.py @@ -0,0 +1,79 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Dict, Any +import json + + +@dataclass +class ExecutionLog: + """execution_log 테이블에 대응하는 데이터 모델""" + + execution_type: str # task, schedule, job, workflow + source_id: int # 모든 데이터에 대한 ID + log_level: str # INFO, ERROR, WARNING, DEBUG + executed_at: datetime + log_message: str + span_id: str = "" # 테스트값 + trace_id: Optional[str] = None + run_id: Optional[int] = None + status: Optional[str] = None # SUCCESS, ERROR, RUNNING, PENDING + duration_ms: Optional[int] = None + error_code: Optional[str] = None + reserved1: Optional[str] = None + reserved2: Optional[str] = None + reserved3: Optional[int] = None + reserved4: Optional[Dict[str, Any]] = None # JSON 데이터 + reserved5: Optional[datetime] = None + id: Optional[int] = None # auto_increment + + def to_dict(self) -> Dict[str, Any]: + """딕셔너리로 변환 (DB 삽입용)""" + data = { + "execution_type": self.execution_type, + "source_id": self.source_id, + "log_level": self.log_level, + "executed_at": self.executed_at, + "log_message": self.log_message, + "trace_id": self.trace_id, + "run_id": self.run_id, + "status": self.status, + "duration_ms": self.duration_ms, + "error_code": self.error_code, + "reserved1": self.span_id, + "reserved2": self.reserved2, + "reserved3": self.reserved3, + "reserved4": json.dumps(self.reserved4) if self.reserved4 else None, + "reserved5": self.reserved5, + } + return {k: v for k, v in data.items() if v is not None} + + def to_loki_format( + self, app_name: str = "pre-processing-service" + ) -> Dict[str, Any]: + """Loki 형식으로 변환""" + + labels = { + "app": app_name, + "env": "develop", + "traceId": self.trace_id or "", + "spanId": self.span_id, # 필요시 추가 + "executionType": self.execution_type, + "sourceId": str(self.source_id), + "runId": str(self.run_id) if self.run_id else "", + } + + log_data = { + "timestamp": int(self.executed_at.timestamp() * 1000000000), # nanoseconds + "level": self.log_level, + "message": self.log_message, + "execution_type": self.execution_type, + "source_id": self.source_id, + "status": self.status, + "duration_ms": self.duration_ms, + "error_code": self.error_code, + } + + if self.reserved4: + log_data.update(self.reserved4) + + return {"labels": labels, "log": log_data} diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index 61720cb6..18d0d99f 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -1,70 +1,71 @@ from datetime import datetime -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, TypeVar, Generic from pydantic import BaseModel, Field, HttpUrl +# 제네릭 타입 변수 정의 +T = TypeVar("T") + # 기본 요청 class RequestBase(BaseModel): - job_id: int = Field( - ..., title="작업 ID", description="현재 실행 중인 작업의 고유 식별자" - ) - schedule_id: int = Field( - ..., title="스케줄 ID", description="예약된 스케줄의 고유 식별자" - ) - schedule_his_id: Optional[int] = Field( - None, title="스케줄 히스토리 ID", description="스케줄 실행 이력의 고유 식별자" - ) + pass # 기본 응답 -class ResponseBase(BaseModel): - job_id: int = Field( - ..., title="작업 ID", description="현재 실행 중인 작업의 고유 식별자" - ) - schedule_id: int = Field( - ..., title="스케줄 ID", description="예약된 스케줄의 고유 식별자" - ) - schedule_his_id: Optional[int] = Field( - None, title="스케줄 히스토리 ID", description="스케줄 실행 이력의 고유 식별자" - ) +class ResponseBase(BaseModel, Generic[T]): + success: bool = Field(..., title="성공유무", description="true,false") + data: T = Field(..., title="응답 데이터") status: str = Field(..., title="상태", description="요청 처리 상태") + message: str = Field(..., title="메시지", description="메시지입니다.") + + +# ============== 1단계: 네이버 키워드 추출 ============== -# 네이버 키워드 추출 class RequestNaverSearch(RequestBase): tag: str = Field(..., title="태그", description="데이터랩/스토어 태그 구분") - category: Optional[str] = Field( - None, title="카테고리", description="검색할 카테고리" - ) - start_date: Optional[str] = Field( - None, title="시작일", description="검색 시작 날짜 (YYYY-MM-DD)" - ) - end_date: Optional[str] = Field( - None, title="종료일", description="검색 종료 날짜 (YYYY-MM-DD)" - ) -class ResponseNaverSearch(ResponseBase): - category: Optional[str] = Field(None, title="카테고리", description="검색 카테고리") +# 응답 데이터 모델 +class NaverSearchData(BaseModel): keyword: str = Field(..., title="키워드", description="검색에 사용된 키워드") total_keyword: Dict[int, str] = Field( ..., title="총 키워드", description="키워드별 총 검색 결과" ) -# 2단계: 검색 +# 최종 응답 모델 +class ResponseNaverSearch(ResponseBase[NaverSearchData]): + """네이버 키워드 검색 API 응답""" + + pass + + +# ============== 2단계: 사다구 검색 ============== + + class RequestSadaguSearch(RequestBase): keyword: str = Field(..., title="검색 키워드", description="상품을 검색할 키워드") -class ResponseSadaguSearch(ResponseBase): +# 응답 데이터 모델 +class SadaguSearchData(BaseModel): keyword: str = Field(..., title="검색 키워드", description="검색에 사용된 키워드") search_results: List[Dict] = Field( ..., title="검색 결과", description="검색된 상품 목록" ) -# 3단계: 매칭 +# 최종 응답 모델 +class ResponseSadaguSearch(ResponseBase[SadaguSearchData]): + """사다구 상품 검색 API 응답""" + + pass + + +# ============== 3단계: 사다구 매칭 ============== + + class RequestSadaguMatch(RequestBase): keyword: str = Field(..., title="매칭 키워드", description="상품과 매칭할 키워드") search_results: List[Dict] = Field( @@ -72,14 +73,24 @@ class RequestSadaguMatch(RequestBase): ) -class ResponseSadaguMatch(ResponseBase): +# 응답 데이터 모델 +class SadaguMatchData(BaseModel): keyword: str = Field(..., title="매칭 키워드", description="매칭에 사용된 키워드") matched_products: List[Dict] = Field( ..., title="매칭된 상품", description="키워드와 매칭된 상품 목록" ) -# 4단계: 유사도 +# 최종 응답 모델 +class ResponseSadaguMatch(ResponseBase[SadaguMatchData]): + """사다구 상품 매칭 API 응답""" + + pass + + +# ============== 4단계: 사다구 유사도 ============== + + class RequestSadaguSimilarity(RequestBase): keyword: str = Field( ..., title="유사도 분석 키워드", description="유사도 분석할 키워드" @@ -94,7 +105,8 @@ class RequestSadaguSimilarity(RequestBase): ) -class ResponseSadaguSimilarity(ResponseBase): +# 응답 데이터 모델 +class SadaguSimilarityData(BaseModel): keyword: str = Field( ..., title="분석 키워드", description="유사도 분석에 사용된 키워드" ) @@ -106,7 +118,16 @@ class ResponseSadaguSimilarity(ResponseBase): ) -# 사다구몰 크롤링 +# 최종 응답 모델 +class ResponseSadaguSimilarity(ResponseBase[SadaguSimilarityData]): + """사다구 상품 유사도 분석 API 응답""" + + pass + + +# ============== 사다구몰 크롤링 ============== + + class RequestSadaguCrawl(RequestBase): tag: str = Field( ..., @@ -118,7 +139,8 @@ class RequestSadaguCrawl(RequestBase): ) -class ResponseSadaguCrawl(ResponseBase): +# 응답 데이터 모델 +class SadaguCrawlData(BaseModel): tag: str = Field(..., title="크롤링 태그", description="크롤링 유형 태그") product_url: str = Field(..., title="상품 URL", description="크롤링된 상품 URL") product_detail: Optional[Dict] = Field( @@ -129,30 +151,79 @@ class ResponseSadaguCrawl(ResponseBase): ) -# 블로그 콘텐츠 생성 -class RequestBlogCreate(RequestBase): +# 최종 응답 모델 +class ResponseSadaguCrawl(ResponseBase[SadaguCrawlData]): + """사다구몰 크롤링 API 응답""" + pass -class ResponseBlogCreate(ResponseBase): +# ============== 블로그 콘텐츠 생성 ============== + + +class RequestBlogCreate(RequestBase): + keyword: Optional[str] = Field( + None, title="키워드", description="콘텐츠 생성용 키워드" + ) + product_info: Optional[Dict] = Field( + None, title="상품 정보", description="블로그 콘텐츠에 포함할 상품 정보" + ) + content_type: Optional[str] = Field( + None, title="콘텐츠 타입", description="생성할 콘텐츠 유형" + ) + target_length: Optional[int] = Field( + None, title="목표 글자 수", description="생성할 콘텐츠의 목표 길이" + ) + + +# 응답 데이터 모델 +class BlogCreateData(BaseModel): + title: str = Field(..., title="블로그 제목", description="생성된 블로그 제목") + content: str = Field(..., title="블로그 내용", description="생성된 블로그 내용") + tags: List[str] = Field( + default_factory=list, title="추천 태그", description="콘텐츠에 적합한 태그 목록" + ) + + +# 최종 응답 모델 +class ResponseBlogCreate(ResponseBase[BlogCreateData]): + """블로그 콘텐츠 생성 API 응답""" + pass -# 블로그 배포 +# ============== 블로그 배포 ============== + + class RequestBlogPublish(RequestBase): tag: str = Field(..., title="블로그 태그", description="블로그 플랫폼 종류") blog_id: str = Field(..., description="블로그 아이디") blog_pw: str = Field(..., description="블로그 비밀번호") post_title: str = Field(..., description="포스팅 제목") post_content: str = Field(..., description="포스팅 내용") - post_tags: List[str] = Field(default=[], description="포스팅 태그 목록") + post_tags: List[str] = Field(default_factory=list, description="포스팅 태그 목록") -class ResponseBlogPublish(ResponseBase): - # 디버깅 용 +# 응답 데이터 모델 +class BlogPublishData(BaseModel): + tag: str = Field(..., title="블로그 태그", description="블로그 플랫폼 종류") + post_title: str = Field(..., title="포스팅 제목", description="배포된 포스팅 제목") + post_url: Optional[str] = Field( + None, title="포스팅 URL", description="배포된 포스팅 URL" + ) + published_at: Optional[str] = Field( + None, title="배포 시간", description="포스팅 배포 완료 시간" + ) + publish_success: bool = Field(..., title="배포 성공 여부") + + # 디버깅 용 (Optional로 변경) metadata: Optional[Dict[str, Any]] = Field( None, description="포스팅 관련 메타데이터" ) - # 프로덕션 용 - # post_url: str = Field(..., description="포스팅 URL") + +# 최종 응답 모델 +class ResponseBlogPublish(ResponseBase[BlogPublishData]): + """블로그 배포 API 응답""" + + pass diff --git a/apps/pre-processing-service/app/service/blog/base_blog_post_service.py b/apps/pre-processing-service/app/service/blog/base_blog_post_service.py index ff4b2754..f55bdba0 100644 --- a/apps/pre-processing-service/app/service/blog/base_blog_post_service.py +++ b/apps/pre-processing-service/app/service/blog/base_blog_post_service.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import Dict from app.utils.crawling_util import CrawlingUtil from app.errors.BlogPostingException import * @@ -11,51 +11,39 @@ class BaseBlogPostService(ABC): 블로그 포스팅 서비스 추상 클래스 """ - def __init__(self, config_file="blog_config.json"): - """공통 초기화 로직""" - # Selenium 기반 서비스를 위한 초기화 - if self._requires_webdriver(): + def __init__(self, use_webdriver=True): + """ + 공통 초기화 로직 + :param use_webdriver: 웹드라이버 사용 여부 (API 서비스의 경우 False) + """ + self.use_webdriver = use_webdriver + + if self.use_webdriver: try: - self.crawling_service = CrawlingUtil() + # 블로그 포스팅용 설정으로 초기화 + self.crawling_service = CrawlingUtil( + headless=False, # 네이버 탐지 우회를 위해 headless 비활성화 + for_blog_posting=True, + ) self.web_driver = self.crawling_service.get_driver() self.wait_driver = self.crawling_service.get_wait() except Exception: raise WebDriverConnectionException() else: - # API 기반 서비스의 경우 WebDriver가 필요 없음 self.crawling_service = None self.web_driver = None self.wait_driver = None - # API 기반 서비스를 위한 초기화 - self.config_file = config_file - self.config = {} - self.current_upload_account = None - - # API 관련 속성들 (사용하지 않는 서비스에서는 None으로 유지) - self.blogger_service = None - self.blog_id = None - self.scopes = None - self._load_config() - def _requires_webdriver(self) -> bool: - """ - 서브클래스에서 WebDriver가 필요한지 여부를 반환 - 기본값은 True (Selenium 기반), API 기반 서비스에서는 False로 오버라이드 - """ - return True - @abstractmethod def _load_config(self) -> None: """플랫폼별 설정 로드""" pass + @abstractmethod def _login(self) -> None: - """ - 플랫폼별 로그인 구현 (API 기반 서비스의 경우 인증으로 대체) - 기본 구현은 아무것도 하지 않음 (API 서비스용) - """ + """플랫폼별 로그인 구현""" pass @abstractmethod @@ -83,6 +71,14 @@ def _validate_content( :param content: 포스트 내용 :param tags: 포스트 태그 리스트 """ + # if not title or not title.strip(): + # raise BlogContentValidationException("title", "제목이 비어있습니다") + # + # if not content or not content.strip(): + # raise BlogContentValidationException("content", "내용이 비어있습니다") + # + # if tags is None: + # raise BlogContentValidationException("tags", "태그가 비어있습니다") pass def post_content(self, title: str, content: str, tags: List[str] = None) -> Dict: @@ -96,7 +92,7 @@ def post_content(self, title: str, content: str, tags: List[str] = None) -> Dict # 1. 콘텐츠 유효성 검사 self._validate_content(title, content, tags) - # 2. 로그인 (Selenium 기반) 또는 인증 (API 기반) + # 2. 로그인 self._login() # 3. 포스트 작성 및 발행 diff --git a/apps/pre-processing-service/app/service/blog/blog_create_service.py b/apps/pre-processing-service/app/service/blog/blog_create_service.py new file mode 100644 index 00000000..29ce12b7 --- /dev/null +++ b/apps/pre-processing-service/app/service/blog/blog_create_service.py @@ -0,0 +1,345 @@ +import json +import logging +import os +from datetime import datetime +from typing import Dict, List, Optional, Any + +from openai import OpenAI +from dotenv import load_dotenv + +from app.model.schemas import RequestBlogCreate +from app.errors.BlogPostingException import * + +# 환경변수 로드 +load_dotenv(".env.dev") + + +class BlogContentService: + """RAG를 사용한 블로그 콘텐츠 생성 전용 서비스""" + + def __init__(self): + # OpenAI API 키 설정 + self.openai_api_key = os.getenv("OPENAI_API_KEY") + if not self.openai_api_key: + raise ValueError("OPENAI_API_KEY가 .env.dev 파일에 설정되지 않았습니다.") + + # 인스턴스 레벨에서 클라이언트 생성 + self.client = OpenAI(api_key=self.openai_api_key) + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + + def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: + """ + 요청 데이터를 기반으로 블로그 콘텐츠 생성 + + Args: + request: RequestBlogCreate 객체 + + Returns: + Dict: {"title": str, "content": str, "tags": List[str]} 형태의 결과 + """ + try: + # 1. 콘텐츠 정보 정리 + content_context = self._prepare_content_context(request) + + # 2. 프롬프트 생성 + prompt = self._create_content_prompt(content_context, request) + + # 3. GPT를 통한 콘텐츠 생성 + generated_content = self._generate_with_openai(prompt) + + # 4. 콘텐츠 파싱 및 구조화 + return self._parse_generated_content(generated_content, request) + + except Exception as e: + self.logger.error(f"콘텐츠 생성 실패: {e}") + return self._create_fallback_content(request) + + def _prepare_content_context(self, request: RequestBlogCreate) -> str: + """요청 데이터를 콘텐츠 생성용 컨텍스트로 변환""" + context_parts = [] + + # 키워드 정보 추가 + if request.keyword: + context_parts.append(f"주요 키워드: {request.keyword}") + + # 상품 정보 추가 + if request.product_info: + context_parts.append("\n상품 정보:") + + # 상품 기본 정보 + if request.product_info.get("title"): + context_parts.append(f"- 상품명: {request.product_info['title']}") + + if request.product_info.get("price"): + context_parts.append(f"- 가격: {request.product_info['price']:,}원") + + if request.product_info.get("rating"): + context_parts.append(f"- 평점: {request.product_info['rating']}/5.0") + + # 상품 상세 정보 + if request.product_info.get("description"): + context_parts.append(f"- 설명: {request.product_info['description']}") + + # 상품 사양 (material_info 등) + if request.product_info.get("material_info"): + context_parts.append("- 주요 사양:") + specs = request.product_info["material_info"] + if isinstance(specs, dict): + for key, value in specs.items(): + context_parts.append(f" * {key}: {value}") + + # 상품 옵션 + if request.product_info.get("options"): + options = request.product_info["options"] + context_parts.append(f"- 구매 옵션 ({len(options)}개):") + for i, option in enumerate(options[:5], 1): # 최대 5개만 + if isinstance(option, dict): + option_name = option.get("name", f"옵션 {i}") + context_parts.append(f" {i}. {option_name}") + else: + context_parts.append(f" {i}. {option}") + + # 구매 링크 + if request.product_info.get("url") or request.product_info.get( + "product_url" + ): + url = request.product_info.get("url") or request.product_info.get( + "product_url" + ) + context_parts.append(f"- 구매 링크: {url}") + + return "\n".join(context_parts) if context_parts else "키워드 기반 콘텐츠 생성" + + def _create_content_prompt(self, context: str, request: RequestBlogCreate) -> str: + """콘텐츠 생성용 프롬프트 생성""" + + # 기본 키워드가 없으면 상품 제목에서 추출 + main_keyword = request.keyword + if ( + not main_keyword + and request.product_info + and request.product_info.get("title") + ): + main_keyword = request.product_info["title"] + + prompt = f""" +다음 정보를 바탕으로 매력적인 블로그 포스트를 작성해주세요. + +정보: +{context} + +작성 가이드라인: +- 스타일: 친근하면서도 신뢰할 수 있는, 정보 제공 중심 +- 길이: 1200자 내외의 적당한 길이 +- 톤: 독자의 관심을 끄는 자연스러운 어조 + +작성 요구사항: +1. SEO 친화적이고 클릭하고 싶은 매력적인 제목 +2. 독자의 관심을 끄는 도입부 +3. 핵심 특징과 장점을 구체적으로 설명 +4. 실제 사용 시나리오나 활용 팁 +5. 구매 결정에 도움이 되는 정보 + +⚠️ 주의: +- 절대로 마지막에 'HTML 구조는…' 같은 자기 평가 문장을 추가하지 마세요. +- 출력 시 ```나 ```html 같은 코드 블록 구문을 포함하지 마세요. +- 오직 HTML 태그만 사용하여 구조화된 콘텐츠를 작성해주세요. +(예:

,

,

,