From 0fd54d66959a0eb63bceabcf160b83b06fd78f3b Mon Sep 17 00:00:00 2001 From: thkim7 Date: Thu, 25 Sep 2025 15:34:10 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20S3=20=ED=95=98=EA=B3=A0=20RDB=20?= =?UTF-8?q?=EC=97=B0=EB=8F=99=201.=20body=20builder=20=EC=B6=94=EA=B0=80?= =?UTF-8?q?=202.=20s3=5Fupload=20+=20=EC=9E=84=EC=8B=9C=201=EA=B0=9C=20?= =?UTF-8?q?=EB=BD=91=EA=B8=B0=20->=20rag=5Fcreate=20=EB=A5=BC=20s3=5Fuploa?= =?UTF-8?q?d=20+=20rdb=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EC=82=BD=EC=9E=85?= =?UTF-8?q?=20->=20=EC=9E=84=EC=8B=9C=201=EA=B0=9C=20=EB=BD=91=EA=B8=B0=20?= =?UTF-8?q?->=20rag=5Fcreate=EB=A1=9C=20=EB=B3=80=EA=B2=BD=203.=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD=EC=97=90=20=EB=94=B0=EB=9D=BC=20schemas=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/api/endpoints/product.py | 17 ++ .../app/model/schemas.py | 28 ++- .../app/service/product_selection_service.py | 191 ++++++++++++++++ .../app/service/s3_upload_service.py | 204 +++++++++--------- .../fastapi/body/BlogRagBodyBuilder.java | 66 +++--- .../body/ProductSelectBodyBuilder.java | 40 ++++ .../fastapi/body/S3UploadBodyBuilder.java | 62 +++--- 7 files changed, 434 insertions(+), 174 deletions(-) create mode 100644 apps/pre-processing-service/app/service/product_selection_service.py create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java diff --git a/apps/pre-processing-service/app/api/endpoints/product.py b/apps/pre-processing-service/app/api/endpoints/product.py index 2812ef79..0b9e888f 100644 --- a/apps/pre-processing-service/app/api/endpoints/product.py +++ b/apps/pre-processing-service/app/api/endpoints/product.py @@ -10,6 +10,7 @@ from ...service.search_service import SearchService from ...service.match_service import MatchService from ...service.similarity_service import SimilarityService +from ...service.product_selection_service import ProductSelectionService # from ...service.similarity_service import SimilarityService @@ -121,3 +122,19 @@ async def s3_upload(request: RequestS3Upload): raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/select", response_model=ResponseProductSelect, summary="콘텐츠용 상품 선택") +def select_product(request: RequestProductSelect): # async 제거 + """ + S3 업로드 완료 후 콘텐츠 생성을 위한 최적 상품을 선택합니다. + """ + try: + selection_service = ProductSelectionService() + response_data = selection_service.select_product_for_content(request) # await 제거 + + if not response_data: + raise CustomException(500, "상품 선택에 실패했습니다.", "PRODUCT_SELECTION_FAILED") + + return response_data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index dd49cf44..a555dfe5 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -161,7 +161,7 @@ class ResponseSadaguCrawl(ResponseBase[SadaguCrawlData]): pass -# ============== S3 이미지 업로드 ============== +# ============== S3 업로드 ============== class RequestS3Upload(RequestBase): @@ -227,12 +227,6 @@ class S3UploadData(BaseModel): uploaded_at: str = Field( ..., title="업로드 완료 시간", description="S3 업로드 완료 시간" ) - # 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정) - selected_product_for_content: Optional[Dict] = Field( - None, - title="콘텐츠 생성용 선택 상품", - description="임시: 블로그 콘텐츠 생성을 위해 선택된 단일 상품 정보", - ) # 최종 응답 모델 @@ -241,6 +235,26 @@ class ResponseS3Upload(ResponseBase[S3UploadData]): pass +# ============== 상품 선택 (새로 추가) ============== + +class RequestProductSelect(RequestBase): + task_run_id: int = Field(..., title="Task Run ID", description="상품을 선택할 task_run_id") + selection_criteria: Optional[str] = Field( + None, title="선택 기준", description="특별한 선택 기준 (기본: 이미지 개수 우선)" + ) + + +# 응답 데이터 모델 +class ProductSelectData(BaseModel): + task_run_id: int = Field(..., title="Task Run ID") + selected_product: Dict = Field(..., title="선택된 상품", description="콘텐츠 생성용으로 선택된 상품") + total_available_products: int = Field(..., title="전체 상품 수", description="선택 가능했던 전체 상품 개수") + + +# 최종 응답 모델 +class ResponseProductSelect(ResponseBase[ProductSelectData]): + """상품 선택 API 응답""" + pass # ============== 블로그 콘텐츠 생성 ============== diff --git a/apps/pre-processing-service/app/service/product_selection_service.py b/apps/pre-processing-service/app/service/product_selection_service.py new file mode 100644 index 00000000..96093707 --- /dev/null +++ b/apps/pre-processing-service/app/service/product_selection_service.py @@ -0,0 +1,191 @@ +import json +from typing import List, Dict +from loguru import logger +from app.model.schemas import RequestProductSelect +from app.utils.response import Response +from app.db.mariadb_manager import MariadbManager + + +class ProductSelectionService: + """콘텐츠 생성용 단일 상품 선택 서비스""" + + def __init__(self): + self.db_manager = MariadbManager() + + def select_product_for_content(self, request: RequestProductSelect) -> dict: + """ + S3 업로드와 DB 저장 결과를 바탕으로 콘텐츠 생성용 단일 상품을 선택 + """ + try: + task_run_id = request.task_run_id + logger.info(f"콘텐츠용 상품 선택 시작: task_run_id={task_run_id}") + + # 1. DB에서 해당 task_run_id의 모든 상품 조회 + db_products = self._fetch_products_from_db(task_run_id) + + if not db_products: + logger.warning(f"DB에서 상품을 찾을 수 없음: task_run_id={task_run_id}") + return Response.error("상품 데이터를 찾을 수 없습니다.", "PRODUCTS_NOT_FOUND") + + # 2. 최적 상품 선택 + selected_product = self._select_best_product(db_products) + + logger.success( + f"콘텐츠용 상품 선택 완료: name={selected_product['name']}, " + f"selection_reason={selected_product['selection_reason']}" + ) + + data = { + "task_run_id": task_run_id, + "selected_product": selected_product, + "total_available_products": len(db_products), + } + + return Response.ok(data, f"콘텐츠용 상품 선택 완료: {selected_product['name']}") + + except Exception as e: + logger.error(f"콘텐츠용 상품 선택 오류: {e}") + raise + + def _fetch_products_from_db(self, task_run_id: int) -> List[Dict]: + """DB에서 task_run_id에 해당하는 모든 상품 조회""" + try: + sql = """ + SELECT id, \ + name, \ + data_value, \ + created_at + FROM task_io_data + WHERE task_run_id = %s + AND io_type = 'OUTPUT' + AND data_type = 'JSON' + ORDER BY name \ + """ + + with self.db_manager.get_cursor() as cursor: + cursor.execute(sql, (task_run_id,)) + rows = cursor.fetchall() + + products = [] + for row in rows: + try: + # MariaDB에서 반환되는 row는 튜플 형태 + id, name, data_value_str, created_at = row + + # JSON 데이터 파싱 + data_value = json.loads(data_value_str) + + products.append({ + "id": id, + "name": name, + "data_value": data_value, + "created_at": created_at + }) + except json.JSONDecodeError as e: + logger.warning(f"JSON 파싱 실패: name={name}, error={e}") + continue + except Exception as e: + logger.warning(f"Row 처리 실패: {row}, error={e}") + continue + + logger.info(f"DB에서 {len(products)}개 상품 조회 완료") + return products + + except Exception as e: + logger.error(f"DB 상품 조회 오류: {e}") + return [] + + def _select_best_product(self, db_products: List[Dict]) -> Dict: + """ + 상품 선택 로직: + 1순위: S3 이미지 업로드가 성공하고 이미지가 많은 상품 + 2순위: 크롤링 성공한 첫 번째 상품 + 3순위: 첫 번째 상품 (fallback) + """ + try: + successful_products = [] + + # 1순위: S3 업로드 성공하고 이미지가 있는 상품들 + for product in db_products: + data_value = product.get("data_value", {}) + product_detail = data_value.get("product_detail", {}) + product_images = product_detail.get("product_images", []) + + # 크롤링 성공하고 이미지가 있는 상품 + if (data_value.get("status") == "success" and + product_detail and len(product_images) > 0): + successful_products.append({ + "product": product, + "image_count": len(product_images), + "title": product_detail.get("title", "Unknown") + }) + + if successful_products: + # 이미지 개수가 가장 많은 상품 선택 + best_product = max(successful_products, key=lambda x: x["image_count"]) + + logger.info( + f"1순위 선택: name={best_product['product']['name']}, " + f"images={best_product['image_count']}개" + ) + + return { + "selection_reason": "s3_upload_success_with_most_images", + "name": best_product["product"]["name"], + "product_info": best_product["product"]["data_value"], + "image_count": best_product["image_count"], + "title": best_product["title"] + } + + # 2순위: 크롤링 성공한 첫 번째 상품 (이미지 없어도) + for product in db_products: + data_value = product.get("data_value", {}) + if (data_value.get("status") == "success" and + data_value.get("product_detail")): + product_detail = data_value.get("product_detail", {}) + logger.info(f"2순위 선택: name={product['name']}") + + return { + "selection_reason": "first_crawl_success", + "name": product["name"], + "product_info": data_value, + "image_count": len(product_detail.get("product_images", [])), + "title": product_detail.get("title", "Unknown") + } + + # 3순위: 첫 번째 상품 (fallback) + if db_products: + first_product = db_products[0] + data_value = first_product.get("data_value", {}) + product_detail = data_value.get("product_detail", {}) + + logger.warning(f"3순위 fallback 선택: name={first_product['name']}") + + return { + "selection_reason": "fallback_first_product", + "name": first_product["name"], + "product_info": data_value, + "image_count": len(product_detail.get("product_images", [])), + "title": product_detail.get("title", "Unknown") + } + + # 모든 경우 실패 + logger.error("선택할 상품이 없습니다") + return { + "selection_reason": "no_products_available", + "name": None, + "product_info": None, + "image_count": 0, + "title": "Unknown" + } + + except Exception as e: + logger.error(f"상품 선택 로직 오류: {e}") + return { + "selection_reason": "selection_error", + "name": db_products[0]["name"] if db_products else None, + "product_info": db_products[0]["data_value"] if db_products else None, + "image_count": 0, + "title": "Unknown", + "error": str(e) + } \ No newline at end of file diff --git a/apps/pre-processing-service/app/service/s3_upload_service.py b/apps/pre-processing-service/app/service/s3_upload_service.py index 48c84d35..7e52152c 100644 --- a/apps/pre-processing-service/app/service/s3_upload_service.py +++ b/apps/pre-processing-service/app/service/s3_upload_service.py @@ -1,41 +1,59 @@ import time +import json import asyncio import aiohttp +import ssl, certifi from typing import List, Dict +from datetime import datetime from loguru import logger from app.errors.CustomException import InvalidItemDataException from app.model.schemas import RequestS3Upload from app.utils.s3_upload_util import S3UploadUtil from app.utils.response import Response +from app.db.mariadb_manager import MariadbManager class S3UploadService: - """6단계: 크롤링된 상품 이미지들과 데이터를 S3에 업로드하는 서비스""" + """6단계: 크롤링된 상품 이미지들과 데이터를 S3에 업로드하고 DB에 저장하는 서비스""" def __init__(self): self.s3_util = S3UploadUtil() + self.db_manager = MariadbManager() async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: """ - 크롤링된 상품들의 이미지와 데이터를 S3에 업로드하는 비즈니스 로직 (6단계) + 크롤링된 상품들의 이미지와 데이터를 S3에 업로드하고 DB에 저장하는 비즈니스 로직 (6단계) """ - keyword = request.keyword # 키워드 추가 + keyword = request.keyword crawled_products = request.crawled_products - base_folder = ( - request.base_folder or "product" - ) # 🔸 기본값 변경: product-images → product + base_folder = request.base_folder or "product" + + # task_run_id는 자바 워크플로우에서 전달받음 + task_run_id = getattr(request, 'task_run_id', None) + if not task_run_id: + # 임시: task_run_id가 없으면 생성 + task_run_id = int(time.time() * 1000) + logger.warning(f"task_run_id가 없어서 임시로 생성: {task_run_id}") + else: + logger.info(f"자바 워크플로우에서 전달받은 task_run_id: {task_run_id}") logger.info( - f"S3 업로드 서비스 시작: keyword='{keyword}', {len(crawled_products)}개 상품" + f"S3 업로드 + DB 저장 서비스 시작: keyword='{keyword}', " + f"{len(crawled_products)}개 상품, task_run_id={task_run_id}" ) upload_results = [] total_success_images = 0 total_fail_images = 0 + db_save_results = [] try: # HTTP 세션을 사용한 이미지 다운로드 - async with aiohttp.ClientSession() as session: + + ssl_context = ssl.create_default_context(cafile=certifi.where()) + connector = aiohttp.TCPConnector(ssl=ssl_context) + + async with aiohttp.ClientSession(connector=connector) as session: # 각 상품별로 순차 업로드 for product_info in crawled_products: @@ -43,7 +61,7 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: product_detail = product_info.get("product_detail") logger.info( - f"상품 {product_index}/{len(crawled_products)} S3 업로드 시작" + f"상품 {product_index}/{len(crawled_products)} S3 업로드 + DB 저장 시작" ) # 크롤링 실패한 상품은 스킵 @@ -62,30 +80,43 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) + db_save_results.append({ + "product_index": product_index, + "db_status": "skipped", + "error": "크롤링 실패" + }) continue try: - # 상품 이미지 + 데이터 업로드 (키워드 전달 추가!) - # 🔸 전체 크롤링 데이터를 전달 (product_detail이 아닌 product_info 전체) + # 1. 상품 이미지 + 데이터 S3 업로드 upload_result = await self.s3_util.upload_single_product_images( session, product_info, product_index, keyword, - base_folder, # product_detail → product_info + base_folder, ) upload_results.append(upload_result) total_success_images += upload_result["success_count"] total_fail_images += upload_result["fail_count"] + # 2. DB에 상품 데이터 저장 + db_result = self._save_product_to_db( + task_run_id, + keyword, + product_index, + product_info + ) + db_save_results.append(db_result) + logger.success( - f"상품 {product_index} S3 업로드 완료: 성공 {upload_result['success_count']}개, " - f"실패 {upload_result['fail_count']}개" + f"상품 {product_index} S3 업로드 + DB 저장 완료: " + f"이미지 성공 {upload_result['success_count']}개, DB {db_result['db_status']}" ) except Exception as e: - logger.error(f"상품 {product_index} S3 업로드 오류: {e}") + logger.error(f"상품 {product_index} S3 업로드/DB 저장 오류: {e}") upload_results.append( { "product_index": product_index, @@ -97,122 +128,93 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) + db_save_results.append({ + "product_index": product_index, + "db_status": "error", + "error": str(e) + }) # 상품간 간격 (서버 부하 방지) if product_index < len(crawled_products): await asyncio.sleep(1) - # 🆕 임시: 콘텐츠 생성용 단일 상품 선택 로직 - selected_product_for_content = self._select_single_product_for_content( - crawled_products, upload_results - ) - logger.success( - f"S3 업로드 서비스 완료: 총 성공 이미지 {total_success_images}개, 총 실패 이미지 {total_fail_images}개" + f"S3 업로드 + DB 저장 서비스 완료: 총 성공 이미지 {total_success_images}개, " + f"총 실패 이미지 {total_fail_images}개" ) - # 기존 응답 데이터 구성 + # 응답 데이터 구성 data = { "upload_results": upload_results, + "db_save_results": db_save_results, + "task_run_id": task_run_id, "summary": { "total_products": len(crawled_products), "total_success_images": total_success_images, "total_fail_images": total_fail_images, + "db_success_count": len([r for r in db_save_results if r.get("db_status") == "success"]), + "db_fail_count": len([r for r in db_save_results if r.get("db_status") == "error"]), }, "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), - # 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정) - "selected_product_for_content": selected_product_for_content, } - message = f"S3 업로드 완료: {total_success_images}개 이미지 업로드 성공, 상품 데이터 JSON 파일 포함" + message = f"S3 업로드 + DB 저장 완료: {total_success_images}개 이미지 성공, {len([r for r in db_save_results if r.get('db_status') == 'success'])}개 상품 DB 저장 성공" return Response.ok(data, message) except Exception as e: - logger.error(f"S3 업로드 서비스 전체 오류: {e}") + logger.error(f"S3 업로드 + DB 저장 서비스 전체 오류: {e}") raise InvalidItemDataException() - def _select_single_product_for_content( - self, crawled_products: List[Dict], upload_results: List[Dict] + def _save_product_to_db( + self, + task_run_id: int, + keyword: str, + product_index: int, + product_info: Dict ) -> Dict: """ - 🆕 임시: 콘텐츠 생성을 위한 단일 상품 선택 로직 - 우선순위: 1) S3 업로드 성공한 상품 중 이미지 개수가 많은 것 - 2) 없다면 크롤링 성공한 첫 번째 상품 + 상품 데이터를 TASK_IO_DATA 테이블에 저장 (MariaDB) """ try: - # 1순위: S3 업로드 성공하고 이미지가 있는 상품들 - successful_uploads = [ - result - for result in upload_results - if result.get("status") == "completed" - and result.get("success_count", 0) > 0 - ] - - if successful_uploads: - # 이미지 개수가 가장 많은 상품 선택 - best_upload = max( - successful_uploads, key=lambda x: x.get("success_count", 0) - ) - selected_index = best_upload["product_index"] - - # 원본 크롤링 데이터에서 해당 상품 찾기 - for product_info in crawled_products: - if product_info.get("index") == selected_index: - logger.info( - f"콘텐츠 생성용 상품 선택: index={selected_index}, " - f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}', " - f"images={best_upload.get('success_count', 0)}개" - ) - return { - "selection_reason": "s3_upload_success_with_most_images", - "product_info": product_info, - "s3_upload_info": best_upload, - } - - # 2순위: 크롤링 성공한 첫 번째 상품 (S3 업로드 실패해도) - for product_info in crawled_products: - if product_info.get("status") == "success" and product_info.get( - "product_detail" - ): - - # 해당 상품의 S3 업로드 정보 찾기 - upload_info = None - for result in upload_results: - if result.get("product_index") == product_info.get("index"): - upload_info = result - break + # 상품명 생성 (산리오_01 형식) + product_name = f"{keyword}_{product_index:02d}" + + # data_value에 저장할 JSON 데이터 (전체 product_info) + data_value_json = json.dumps(product_info, ensure_ascii=False) + + # 현재 시간 + created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # MariaDB에 저장 + with self.db_manager.get_cursor() as cursor: + sql = """ + INSERT INTO task_io_data + (task_run_id, io_type, name, data_type, data_value, created_at) + VALUES (%s, %s, %s, %s, %s, %s) \ + """ + + cursor.execute(sql, ( + task_run_id, + "OUTPUT", + product_name, + "JSON", + data_value_json, + created_at + )) + + logger.success(f"상품 {product_index} DB 저장 성공: name={product_name}") - logger.info( - f"콘텐츠 생성용 상품 선택 (fallback): index={product_info.get('index')}, " - f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}'" - ) - return { - "selection_reason": "first_crawl_success", - "product_info": product_info, - "s3_upload_info": upload_info, - } - - # 3순위: 아무거나 (모든 상품이 실패한 경우) - if crawled_products: - logger.warning("모든 상품이 크롤링 실패 - 첫 번째 상품으로 fallback") - return { - "selection_reason": "fallback_first_product", - "product_info": crawled_products[0], - "s3_upload_info": upload_results[0] if upload_results else None, - } - - logger.error("선택할 상품이 없습니다") return { - "selection_reason": "no_products_available", - "product_info": None, - "s3_upload_info": None, + "product_index": product_index, + "product_name": product_name, + "db_status": "success", + "task_run_id": task_run_id, } except Exception as e: - logger.error(f"단일 상품 선택 오류: {e}") + logger.error(f"상품 {product_index} DB 저장 오류: {e}") return { - "selection_reason": "selection_error", - "product_info": crawled_products[0] if crawled_products else None, - "s3_upload_info": upload_results[0] if upload_results else None, - "error": str(e), - } + "product_index": product_index, + "db_status": "error", + "error": str(e) + } \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index 419a23a4..87c838a5 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -17,39 +17,33 @@ @RequiredArgsConstructor public class BlogRagBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "블로그 RAG 생성 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String S3_UPLOAD_SOURCE_TASK = "S3 업로드 태스크"; // 변경: 크롤링 → S3 업로드 - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - // S3 업로드에서 선택된 상품 정보 가져오기 (변경된 부분) - Optional.ofNullable(workflowContext.get(S3_UPLOAD_SOURCE_TASK)) - .map( - node -> - node.path("data") - .path("selected_product_for_content") - .path("product_info") - .path("product_detail")) - .ifPresent(productNode -> body.set("product_info", productNode)); - - // 기본 콘텐츠 설정 - body.put("content_type", "review_blog"); - body.put("target_length", 1000); - - return body; - } -} + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "블로그 RAG 생성 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; // 변경: S3 업로드 → 상품 선택 + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) + .map(node -> node.path("data").path("selected_product")) + .ifPresent(productNode -> body.set("product_info", productNode)); + + // 기본 콘텐츠 설정 + body.put("content_type", "review_blog"); + body.put("target_length", 1000); + + return body; + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java new file mode 100644 index 00000000..1d2ef5bf --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -0,0 +1,40 @@ +package site.icebang.domain.workflow.runner.fastapi.body; + +import java.util.Map; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class ProductSelectBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 선택 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // task_run_id는 현재 실행 중인 task의 run_id를 사용 + // 실제 구현에서는 Task 객체나 워크플로우 컨텍스트에서 가져와야 할 수 있습니다. + body.put("task_run_id", task.getId()); // Task 객체에서 ID를 가져오는 방식으로 가정 + + // 기본 선택 기준 설정 (이미지 개수 우선) + body.put("selection_criteria", "image_count_priority"); + + return body; + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index bd0f823e..7b927dff 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -17,33 +17,35 @@ @RequiredArgsConstructor public class S3UploadBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "S3 업로드 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - // 크롤링된 상품 데이터 가져오기 - Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) - .map(node -> node.path("data").path("crawled_products")) - .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); - - // 기본 폴더 설정 - body.put("base_folder", "product"); - - return body; - } -} + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "S3 업로드 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 크롤링된 상품 데이터 가져오기 + Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) + .map(node -> node.path("data").path("crawled_products")) + .filter(node -> !node.isMissingNode()) + .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); + + // 기본 폴더 설정 (스키마의 기본값과 일치) + body.put("base_folder", "product"); + + return body; + } +} \ No newline at end of file From 9076e8a9d0081c34147f491a8d8b8b79e4cb8d69 Mon Sep 17 00:00:00 2001 From: thkim7 Date: Thu, 25 Sep 2025 15:36:04 +0900 Subject: [PATCH 2/2] chore: poetry run black . & spotlessApply --- .../app/api/endpoints/product.py | 15 ++-- .../app/model/schemas.py | 16 ++++- .../app/service/product_selection_service.py | 58 +++++++++------ .../app/service/s3_upload_service.py | 72 ++++++++++--------- .../fastapi/body/BlogRagBodyBuilder.java | 60 ++++++++-------- .../body/ProductSelectBodyBuilder.java | 34 ++++----- .../fastapi/body/S3UploadBodyBuilder.java | 64 ++++++++--------- 7 files changed, 177 insertions(+), 142 deletions(-) diff --git a/apps/pre-processing-service/app/api/endpoints/product.py b/apps/pre-processing-service/app/api/endpoints/product.py index 0b9e888f..f5a91272 100644 --- a/apps/pre-processing-service/app/api/endpoints/product.py +++ b/apps/pre-processing-service/app/api/endpoints/product.py @@ -123,18 +123,25 @@ async def s3_upload(request: RequestS3Upload): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) -@router.post("/select", response_model=ResponseProductSelect, summary="콘텐츠용 상품 선택") + +@router.post( + "/select", response_model=ResponseProductSelect, summary="콘텐츠용 상품 선택" +) def select_product(request: RequestProductSelect): # async 제거 """ S3 업로드 완료 후 콘텐츠 생성을 위한 최적 상품을 선택합니다. """ try: selection_service = ProductSelectionService() - response_data = selection_service.select_product_for_content(request) # await 제거 + response_data = selection_service.select_product_for_content( + request + ) # await 제거 if not response_data: - raise CustomException(500, "상품 선택에 실패했습니다.", "PRODUCT_SELECTION_FAILED") + raise CustomException( + 500, "상품 선택에 실패했습니다.", "PRODUCT_SELECTION_FAILED" + ) return response_data except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index a555dfe5..7487927b 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -235,10 +235,14 @@ class ResponseS3Upload(ResponseBase[S3UploadData]): pass + # ============== 상품 선택 (새로 추가) ============== + class RequestProductSelect(RequestBase): - task_run_id: int = Field(..., title="Task Run ID", description="상품을 선택할 task_run_id") + task_run_id: int = Field( + ..., title="Task Run ID", description="상품을 선택할 task_run_id" + ) selection_criteria: Optional[str] = Field( None, title="선택 기준", description="특별한 선택 기준 (기본: 이미지 개수 우선)" ) @@ -247,15 +251,21 @@ class RequestProductSelect(RequestBase): # 응답 데이터 모델 class ProductSelectData(BaseModel): task_run_id: int = Field(..., title="Task Run ID") - selected_product: Dict = Field(..., title="선택된 상품", description="콘텐츠 생성용으로 선택된 상품") - total_available_products: int = Field(..., title="전체 상품 수", description="선택 가능했던 전체 상품 개수") + selected_product: Dict = Field( + ..., title="선택된 상품", description="콘텐츠 생성용으로 선택된 상품" + ) + total_available_products: int = Field( + ..., title="전체 상품 수", description="선택 가능했던 전체 상품 개수" + ) # 최종 응답 모델 class ResponseProductSelect(ResponseBase[ProductSelectData]): """상품 선택 API 응답""" + pass + # ============== 블로그 콘텐츠 생성 ============== diff --git a/apps/pre-processing-service/app/service/product_selection_service.py b/apps/pre-processing-service/app/service/product_selection_service.py index 96093707..723bd940 100644 --- a/apps/pre-processing-service/app/service/product_selection_service.py +++ b/apps/pre-processing-service/app/service/product_selection_service.py @@ -25,7 +25,9 @@ def select_product_for_content(self, request: RequestProductSelect) -> dict: if not db_products: logger.warning(f"DB에서 상품을 찾을 수 없음: task_run_id={task_run_id}") - return Response.error("상품 데이터를 찾을 수 없습니다.", "PRODUCTS_NOT_FOUND") + return Response.error( + "상품 데이터를 찾을 수 없습니다.", "PRODUCTS_NOT_FOUND" + ) # 2. 최적 상품 선택 selected_product = self._select_best_product(db_products) @@ -41,7 +43,9 @@ def select_product_for_content(self, request: RequestProductSelect) -> dict: "total_available_products": len(db_products), } - return Response.ok(data, f"콘텐츠용 상품 선택 완료: {selected_product['name']}") + return Response.ok( + data, f"콘텐츠용 상품 선택 완료: {selected_product['name']}" + ) except Exception as e: logger.error(f"콘텐츠용 상품 선택 오류: {e}") @@ -75,12 +79,14 @@ def _fetch_products_from_db(self, task_run_id: int) -> List[Dict]: # JSON 데이터 파싱 data_value = json.loads(data_value_str) - products.append({ - "id": id, - "name": name, - "data_value": data_value, - "created_at": created_at - }) + products.append( + { + "id": id, + "name": name, + "data_value": data_value, + "created_at": created_at, + } + ) except json.JSONDecodeError as e: logger.warning(f"JSON 파싱 실패: name={name}, error={e}") continue @@ -112,13 +118,18 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: product_images = product_detail.get("product_images", []) # 크롤링 성공하고 이미지가 있는 상품 - if (data_value.get("status") == "success" and - product_detail and len(product_images) > 0): - successful_products.append({ - "product": product, - "image_count": len(product_images), - "title": product_detail.get("title", "Unknown") - }) + if ( + data_value.get("status") == "success" + and product_detail + and len(product_images) > 0 + ): + successful_products.append( + { + "product": product, + "image_count": len(product_images), + "title": product_detail.get("title", "Unknown"), + } + ) if successful_products: # 이미지 개수가 가장 많은 상품 선택 @@ -134,14 +145,15 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "name": best_product["product"]["name"], "product_info": best_product["product"]["data_value"], "image_count": best_product["image_count"], - "title": best_product["title"] + "title": best_product["title"], } # 2순위: 크롤링 성공한 첫 번째 상품 (이미지 없어도) for product in db_products: data_value = product.get("data_value", {}) - if (data_value.get("status") == "success" and - data_value.get("product_detail")): + if data_value.get("status") == "success" and data_value.get( + "product_detail" + ): product_detail = data_value.get("product_detail", {}) logger.info(f"2순위 선택: name={product['name']}") @@ -150,7 +162,7 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "name": product["name"], "product_info": data_value, "image_count": len(product_detail.get("product_images", [])), - "title": product_detail.get("title", "Unknown") + "title": product_detail.get("title", "Unknown"), } # 3순위: 첫 번째 상품 (fallback) @@ -166,7 +178,7 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "name": first_product["name"], "product_info": data_value, "image_count": len(product_detail.get("product_images", [])), - "title": product_detail.get("title", "Unknown") + "title": product_detail.get("title", "Unknown"), } # 모든 경우 실패 @@ -176,7 +188,7 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "name": None, "product_info": None, "image_count": 0, - "title": "Unknown" + "title": "Unknown", } except Exception as e: @@ -187,5 +199,5 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "product_info": db_products[0]["data_value"] if db_products else None, "image_count": 0, "title": "Unknown", - "error": str(e) - } \ No newline at end of file + "error": str(e), + } diff --git a/apps/pre-processing-service/app/service/s3_upload_service.py b/apps/pre-processing-service/app/service/s3_upload_service.py index 7e52152c..c804a201 100644 --- a/apps/pre-processing-service/app/service/s3_upload_service.py +++ b/apps/pre-processing-service/app/service/s3_upload_service.py @@ -29,7 +29,7 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: base_folder = request.base_folder or "product" # task_run_id는 자바 워크플로우에서 전달받음 - task_run_id = getattr(request, 'task_run_id', None) + task_run_id = getattr(request, "task_run_id", None) if not task_run_id: # 임시: task_run_id가 없으면 생성 task_run_id = int(time.time() * 1000) @@ -80,11 +80,13 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) - db_save_results.append({ - "product_index": product_index, - "db_status": "skipped", - "error": "크롤링 실패" - }) + db_save_results.append( + { + "product_index": product_index, + "db_status": "skipped", + "error": "크롤링 실패", + } + ) continue try: @@ -103,10 +105,7 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: # 2. DB에 상품 데이터 저장 db_result = self._save_product_to_db( - task_run_id, - keyword, - product_index, - product_info + task_run_id, keyword, product_index, product_info ) db_save_results.append(db_result) @@ -116,7 +115,9 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: ) except Exception as e: - logger.error(f"상품 {product_index} S3 업로드/DB 저장 오류: {e}") + logger.error( + f"상품 {product_index} S3 업로드/DB 저장 오류: {e}" + ) upload_results.append( { "product_index": product_index, @@ -128,11 +129,13 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) - db_save_results.append({ - "product_index": product_index, - "db_status": "error", - "error": str(e) - }) + db_save_results.append( + { + "product_index": product_index, + "db_status": "error", + "error": str(e), + } + ) # 상품간 간격 (서버 부하 방지) if product_index < len(crawled_products): @@ -152,8 +155,12 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "total_products": len(crawled_products), "total_success_images": total_success_images, "total_fail_images": total_fail_images, - "db_success_count": len([r for r in db_save_results if r.get("db_status") == "success"]), - "db_fail_count": len([r for r in db_save_results if r.get("db_status") == "error"]), + "db_success_count": len( + [r for r in db_save_results if r.get("db_status") == "success"] + ), + "db_fail_count": len( + [r for r in db_save_results if r.get("db_status") == "error"] + ), }, "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), } @@ -166,11 +173,7 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: raise InvalidItemDataException() def _save_product_to_db( - self, - task_run_id: int, - keyword: str, - product_index: int, - product_info: Dict + self, task_run_id: int, keyword: str, product_index: int, product_info: Dict ) -> Dict: """ 상품 데이터를 TASK_IO_DATA 테이블에 저장 (MariaDB) @@ -193,14 +196,17 @@ def _save_product_to_db( VALUES (%s, %s, %s, %s, %s, %s) \ """ - cursor.execute(sql, ( - task_run_id, - "OUTPUT", - product_name, - "JSON", - data_value_json, - created_at - )) + cursor.execute( + sql, + ( + task_run_id, + "OUTPUT", + product_name, + "JSON", + data_value_json, + created_at, + ), + ) logger.success(f"상품 {product_index} DB 저장 성공: name={product_name}") @@ -216,5 +222,5 @@ def _save_product_to_db( return { "product_index": product_index, "db_status": "error", - "error": str(e) - } \ No newline at end of file + "error": str(e), + } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index 87c838a5..8a8008ed 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -17,33 +17,33 @@ @RequiredArgsConstructor public class BlogRagBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "블로그 RAG 생성 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; // 변경: S3 업로드 → 상품 선택 - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) - .map(node -> node.path("data").path("selected_product")) - .ifPresent(productNode -> body.set("product_info", productNode)); - - // 기본 콘텐츠 설정 - body.put("content_type", "review_blog"); - body.put("target_length", 1000); - - return body; - } -} \ No newline at end of file + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "블로그 RAG 생성 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; // 변경: S3 업로드 → 상품 선택 + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) + .map(node -> node.path("data").path("selected_product")) + .ifPresent(productNode -> body.set("product_info", productNode)); + + // 기본 콘텐츠 설정 + body.put("content_type", "review_blog"); + body.put("target_length", 1000); + + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java index 1d2ef5bf..17934012 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -16,25 +16,25 @@ @RequiredArgsConstructor public class ProductSelectBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "상품 선택 태스크"; + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 선택 태스크"; - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); - // task_run_id는 현재 실행 중인 task의 run_id를 사용 - // 실제 구현에서는 Task 객체나 워크플로우 컨텍스트에서 가져와야 할 수 있습니다. - body.put("task_run_id", task.getId()); // Task 객체에서 ID를 가져오는 방식으로 가정 + // task_run_id는 현재 실행 중인 task의 run_id를 사용 + // 실제 구현에서는 Task 객체나 워크플로우 컨텍스트에서 가져와야 할 수 있습니다. + body.put("task_run_id", task.getId()); // Task 객체에서 ID를 가져오는 방식으로 가정 - // 기본 선택 기준 설정 (이미지 개수 우선) - body.put("selection_criteria", "image_count_priority"); + // 기본 선택 기준 설정 (이미지 개수 우선) + body.put("selection_criteria", "image_count_priority"); - return body; - } -} \ No newline at end of file + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index 7b927dff..7548452a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -17,35 +17,35 @@ @RequiredArgsConstructor public class S3UploadBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "S3 업로드 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - // 크롤링된 상품 데이터 가져오기 - Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) - .map(node -> node.path("data").path("crawled_products")) - .filter(node -> !node.isMissingNode()) - .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); - - // 기본 폴더 설정 (스키마의 기본값과 일치) - body.put("base_folder", "product"); - - return body; - } -} \ No newline at end of file + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "S3 업로드 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 크롤링된 상품 데이터 가져오기 + Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) + .map(node -> node.path("data").path("crawled_products")) + .filter(node -> !node.isMissingNode()) + .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); + + // 기본 폴더 설정 (스키마의 기본값과 일치) + body.put("base_folder", "product"); + + return body; + } +}