diff --git a/apps/pre-processing-service/app/api/endpoints/product.py b/apps/pre-processing-service/app/api/endpoints/product.py index 2812ef79..f5a91272 100644 --- a/apps/pre-processing-service/app/api/endpoints/product.py +++ b/apps/pre-processing-service/app/api/endpoints/product.py @@ -10,6 +10,7 @@ from ...service.search_service import SearchService from ...service.match_service import MatchService from ...service.similarity_service import SimilarityService +from ...service.product_selection_service import ProductSelectionService # from ...service.similarity_service import SimilarityService @@ -121,3 +122,26 @@ async def s3_upload(request: RequestS3Upload): raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + + +@router.post( + "/select", response_model=ResponseProductSelect, summary="콘텐츠용 상품 선택" +) +def select_product(request: RequestProductSelect): # async 제거 + """ + S3 업로드 완료 후 콘텐츠 생성을 위한 최적 상품을 선택합니다. + """ + try: + selection_service = ProductSelectionService() + response_data = selection_service.select_product_for_content( + request + ) # await 제거 + + if not response_data: + raise CustomException( + 500, "상품 선택에 실패했습니다.", "PRODUCT_SELECTION_FAILED" + ) + + return response_data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index dd49cf44..7487927b 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -161,7 +161,7 @@ class ResponseSadaguCrawl(ResponseBase[SadaguCrawlData]): pass -# ============== S3 이미지 업로드 ============== +# ============== S3 업로드 ============== class RequestS3Upload(RequestBase): @@ -227,12 +227,6 @@ class S3UploadData(BaseModel): uploaded_at: str = Field( ..., title="업로드 완료 시간", description="S3 업로드 완료 시간" ) - # 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정) - selected_product_for_content: Optional[Dict] = Field( - None, - title="콘텐츠 생성용 선택 상품", - description="임시: 블로그 콘텐츠 생성을 위해 선택된 단일 상품 정보", - ) # 최종 응답 모델 @@ -242,6 +236,36 @@ class ResponseS3Upload(ResponseBase[S3UploadData]): pass +# ============== 상품 선택 (새로 추가) ============== + + +class RequestProductSelect(RequestBase): + task_run_id: int = Field( + ..., title="Task Run ID", description="상품을 선택할 task_run_id" + ) + selection_criteria: Optional[str] = Field( + None, title="선택 기준", description="특별한 선택 기준 (기본: 이미지 개수 우선)" + ) + + +# 응답 데이터 모델 +class ProductSelectData(BaseModel): + task_run_id: int = Field(..., title="Task Run ID") + selected_product: Dict = Field( + ..., title="선택된 상품", description="콘텐츠 생성용으로 선택된 상품" + ) + total_available_products: int = Field( + ..., title="전체 상품 수", description="선택 가능했던 전체 상품 개수" + ) + + +# 최종 응답 모델 +class ResponseProductSelect(ResponseBase[ProductSelectData]): + """상품 선택 API 응답""" + + pass + + # ============== 블로그 콘텐츠 생성 ============== diff --git a/apps/pre-processing-service/app/service/product_selection_service.py b/apps/pre-processing-service/app/service/product_selection_service.py new file mode 100644 index 00000000..723bd940 --- /dev/null +++ b/apps/pre-processing-service/app/service/product_selection_service.py @@ -0,0 +1,203 @@ +import json +from typing import List, Dict +from loguru import logger +from app.model.schemas import RequestProductSelect +from app.utils.response import Response +from app.db.mariadb_manager import MariadbManager + + +class ProductSelectionService: + """콘텐츠 생성용 단일 상품 선택 서비스""" + + def __init__(self): + self.db_manager = MariadbManager() + + def select_product_for_content(self, request: RequestProductSelect) -> dict: + """ + S3 업로드와 DB 저장 결과를 바탕으로 콘텐츠 생성용 단일 상품을 선택 + """ + try: + task_run_id = request.task_run_id + logger.info(f"콘텐츠용 상품 선택 시작: task_run_id={task_run_id}") + + # 1. DB에서 해당 task_run_id의 모든 상품 조회 + db_products = self._fetch_products_from_db(task_run_id) + + if not db_products: + logger.warning(f"DB에서 상품을 찾을 수 없음: task_run_id={task_run_id}") + return Response.error( + "상품 데이터를 찾을 수 없습니다.", "PRODUCTS_NOT_FOUND" + ) + + # 2. 최적 상품 선택 + selected_product = self._select_best_product(db_products) + + logger.success( + f"콘텐츠용 상품 선택 완료: name={selected_product['name']}, " + f"selection_reason={selected_product['selection_reason']}" + ) + + data = { + "task_run_id": task_run_id, + "selected_product": selected_product, + "total_available_products": len(db_products), + } + + return Response.ok( + data, f"콘텐츠용 상품 선택 완료: {selected_product['name']}" + ) + + except Exception as e: + logger.error(f"콘텐츠용 상품 선택 오류: {e}") + raise + + def _fetch_products_from_db(self, task_run_id: int) -> List[Dict]: + """DB에서 task_run_id에 해당하는 모든 상품 조회""" + try: + sql = """ + SELECT id, \ + name, \ + data_value, \ + created_at + FROM task_io_data + WHERE task_run_id = %s + AND io_type = 'OUTPUT' + AND data_type = 'JSON' + ORDER BY name \ + """ + + with self.db_manager.get_cursor() as cursor: + cursor.execute(sql, (task_run_id,)) + rows = cursor.fetchall() + + products = [] + for row in rows: + try: + # MariaDB에서 반환되는 row는 튜플 형태 + id, name, data_value_str, created_at = row + + # JSON 데이터 파싱 + data_value = json.loads(data_value_str) + + products.append( + { + "id": id, + "name": name, + "data_value": data_value, + "created_at": created_at, + } + ) + except json.JSONDecodeError as e: + logger.warning(f"JSON 파싱 실패: name={name}, error={e}") + continue + except Exception as e: + logger.warning(f"Row 처리 실패: {row}, error={e}") + continue + + logger.info(f"DB에서 {len(products)}개 상품 조회 완료") + return products + + except Exception as e: + logger.error(f"DB 상품 조회 오류: {e}") + return [] + + def _select_best_product(self, db_products: List[Dict]) -> Dict: + """ + 상품 선택 로직: + 1순위: S3 이미지 업로드가 성공하고 이미지가 많은 상품 + 2순위: 크롤링 성공한 첫 번째 상품 + 3순위: 첫 번째 상품 (fallback) + """ + try: + successful_products = [] + + # 1순위: S3 업로드 성공하고 이미지가 있는 상품들 + for product in db_products: + data_value = product.get("data_value", {}) + product_detail = data_value.get("product_detail", {}) + product_images = product_detail.get("product_images", []) + + # 크롤링 성공하고 이미지가 있는 상품 + if ( + data_value.get("status") == "success" + and product_detail + and len(product_images) > 0 + ): + successful_products.append( + { + "product": product, + "image_count": len(product_images), + "title": product_detail.get("title", "Unknown"), + } + ) + + if successful_products: + # 이미지 개수가 가장 많은 상품 선택 + best_product = max(successful_products, key=lambda x: x["image_count"]) + + logger.info( + f"1순위 선택: name={best_product['product']['name']}, " + f"images={best_product['image_count']}개" + ) + + return { + "selection_reason": "s3_upload_success_with_most_images", + "name": best_product["product"]["name"], + "product_info": best_product["product"]["data_value"], + "image_count": best_product["image_count"], + "title": best_product["title"], + } + + # 2순위: 크롤링 성공한 첫 번째 상품 (이미지 없어도) + for product in db_products: + data_value = product.get("data_value", {}) + if data_value.get("status") == "success" and data_value.get( + "product_detail" + ): + product_detail = data_value.get("product_detail", {}) + logger.info(f"2순위 선택: name={product['name']}") + + return { + "selection_reason": "first_crawl_success", + "name": product["name"], + "product_info": data_value, + "image_count": len(product_detail.get("product_images", [])), + "title": product_detail.get("title", "Unknown"), + } + + # 3순위: 첫 번째 상품 (fallback) + if db_products: + first_product = db_products[0] + data_value = first_product.get("data_value", {}) + product_detail = data_value.get("product_detail", {}) + + logger.warning(f"3순위 fallback 선택: name={first_product['name']}") + + return { + "selection_reason": "fallback_first_product", + "name": first_product["name"], + "product_info": data_value, + "image_count": len(product_detail.get("product_images", [])), + "title": product_detail.get("title", "Unknown"), + } + + # 모든 경우 실패 + logger.error("선택할 상품이 없습니다") + return { + "selection_reason": "no_products_available", + "name": None, + "product_info": None, + "image_count": 0, + "title": "Unknown", + } + + except Exception as e: + logger.error(f"상품 선택 로직 오류: {e}") + return { + "selection_reason": "selection_error", + "name": db_products[0]["name"] if db_products else None, + "product_info": db_products[0]["data_value"] if db_products else None, + "image_count": 0, + "title": "Unknown", + "error": str(e), + } diff --git a/apps/pre-processing-service/app/service/s3_upload_service.py b/apps/pre-processing-service/app/service/s3_upload_service.py index 48c84d35..c804a201 100644 --- a/apps/pre-processing-service/app/service/s3_upload_service.py +++ b/apps/pre-processing-service/app/service/s3_upload_service.py @@ -1,41 +1,59 @@ import time +import json import asyncio import aiohttp +import ssl, certifi from typing import List, Dict +from datetime import datetime from loguru import logger from app.errors.CustomException import InvalidItemDataException from app.model.schemas import RequestS3Upload from app.utils.s3_upload_util import S3UploadUtil from app.utils.response import Response +from app.db.mariadb_manager import MariadbManager class S3UploadService: - """6단계: 크롤링된 상품 이미지들과 데이터를 S3에 업로드하는 서비스""" + """6단계: 크롤링된 상품 이미지들과 데이터를 S3에 업로드하고 DB에 저장하는 서비스""" def __init__(self): self.s3_util = S3UploadUtil() + self.db_manager = MariadbManager() async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: """ - 크롤링된 상품들의 이미지와 데이터를 S3에 업로드하는 비즈니스 로직 (6단계) + 크롤링된 상품들의 이미지와 데이터를 S3에 업로드하고 DB에 저장하는 비즈니스 로직 (6단계) """ - keyword = request.keyword # 키워드 추가 + keyword = request.keyword crawled_products = request.crawled_products - base_folder = ( - request.base_folder or "product" - ) # 🔸 기본값 변경: product-images → product + base_folder = request.base_folder or "product" + + # task_run_id는 자바 워크플로우에서 전달받음 + task_run_id = getattr(request, "task_run_id", None) + if not task_run_id: + # 임시: task_run_id가 없으면 생성 + task_run_id = int(time.time() * 1000) + logger.warning(f"task_run_id가 없어서 임시로 생성: {task_run_id}") + else: + logger.info(f"자바 워크플로우에서 전달받은 task_run_id: {task_run_id}") logger.info( - f"S3 업로드 서비스 시작: keyword='{keyword}', {len(crawled_products)}개 상품" + f"S3 업로드 + DB 저장 서비스 시작: keyword='{keyword}', " + f"{len(crawled_products)}개 상품, task_run_id={task_run_id}" ) upload_results = [] total_success_images = 0 total_fail_images = 0 + db_save_results = [] try: # HTTP 세션을 사용한 이미지 다운로드 - async with aiohttp.ClientSession() as session: + + ssl_context = ssl.create_default_context(cafile=certifi.where()) + connector = aiohttp.TCPConnector(ssl=ssl_context) + + async with aiohttp.ClientSession(connector=connector) as session: # 각 상품별로 순차 업로드 for product_info in crawled_products: @@ -43,7 +61,7 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: product_detail = product_info.get("product_detail") logger.info( - f"상품 {product_index}/{len(crawled_products)} S3 업로드 시작" + f"상품 {product_index}/{len(crawled_products)} S3 업로드 + DB 저장 시작" ) # 크롤링 실패한 상품은 스킵 @@ -62,30 +80,44 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) + db_save_results.append( + { + "product_index": product_index, + "db_status": "skipped", + "error": "크롤링 실패", + } + ) continue try: - # 상품 이미지 + 데이터 업로드 (키워드 전달 추가!) - # 🔸 전체 크롤링 데이터를 전달 (product_detail이 아닌 product_info 전체) + # 1. 상품 이미지 + 데이터 S3 업로드 upload_result = await self.s3_util.upload_single_product_images( session, product_info, product_index, keyword, - base_folder, # product_detail → product_info + base_folder, ) upload_results.append(upload_result) total_success_images += upload_result["success_count"] total_fail_images += upload_result["fail_count"] + # 2. DB에 상품 데이터 저장 + db_result = self._save_product_to_db( + task_run_id, keyword, product_index, product_info + ) + db_save_results.append(db_result) + logger.success( - f"상품 {product_index} S3 업로드 완료: 성공 {upload_result['success_count']}개, " - f"실패 {upload_result['fail_count']}개" + f"상품 {product_index} S3 업로드 + DB 저장 완료: " + f"이미지 성공 {upload_result['success_count']}개, DB {db_result['db_status']}" ) except Exception as e: - logger.error(f"상품 {product_index} S3 업로드 오류: {e}") + logger.error( + f"상품 {product_index} S3 업로드/DB 저장 오류: {e}" + ) upload_results.append( { "product_index": product_index, @@ -97,122 +129,98 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: "fail_count": 0, } ) + db_save_results.append( + { + "product_index": product_index, + "db_status": "error", + "error": str(e), + } + ) # 상품간 간격 (서버 부하 방지) if product_index < len(crawled_products): await asyncio.sleep(1) - # 🆕 임시: 콘텐츠 생성용 단일 상품 선택 로직 - selected_product_for_content = self._select_single_product_for_content( - crawled_products, upload_results - ) - logger.success( - f"S3 업로드 서비스 완료: 총 성공 이미지 {total_success_images}개, 총 실패 이미지 {total_fail_images}개" + f"S3 업로드 + DB 저장 서비스 완료: 총 성공 이미지 {total_success_images}개, " + f"총 실패 이미지 {total_fail_images}개" ) - # 기존 응답 데이터 구성 + # 응답 데이터 구성 data = { "upload_results": upload_results, + "db_save_results": db_save_results, + "task_run_id": task_run_id, "summary": { "total_products": len(crawled_products), "total_success_images": total_success_images, "total_fail_images": total_fail_images, + "db_success_count": len( + [r for r in db_save_results if r.get("db_status") == "success"] + ), + "db_fail_count": len( + [r for r in db_save_results if r.get("db_status") == "error"] + ), }, "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), - # 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정) - "selected_product_for_content": selected_product_for_content, } - message = f"S3 업로드 완료: {total_success_images}개 이미지 업로드 성공, 상품 데이터 JSON 파일 포함" + message = f"S3 업로드 + DB 저장 완료: {total_success_images}개 이미지 성공, {len([r for r in db_save_results if r.get('db_status') == 'success'])}개 상품 DB 저장 성공" return Response.ok(data, message) except Exception as e: - logger.error(f"S3 업로드 서비스 전체 오류: {e}") + logger.error(f"S3 업로드 + DB 저장 서비스 전체 오류: {e}") raise InvalidItemDataException() - def _select_single_product_for_content( - self, crawled_products: List[Dict], upload_results: List[Dict] + def _save_product_to_db( + self, task_run_id: int, keyword: str, product_index: int, product_info: Dict ) -> Dict: """ - 🆕 임시: 콘텐츠 생성을 위한 단일 상품 선택 로직 - 우선순위: 1) S3 업로드 성공한 상품 중 이미지 개수가 많은 것 - 2) 없다면 크롤링 성공한 첫 번째 상품 + 상품 데이터를 TASK_IO_DATA 테이블에 저장 (MariaDB) """ try: - # 1순위: S3 업로드 성공하고 이미지가 있는 상품들 - successful_uploads = [ - result - for result in upload_results - if result.get("status") == "completed" - and result.get("success_count", 0) > 0 - ] - - if successful_uploads: - # 이미지 개수가 가장 많은 상품 선택 - best_upload = max( - successful_uploads, key=lambda x: x.get("success_count", 0) + # 상품명 생성 (산리오_01 형식) + product_name = f"{keyword}_{product_index:02d}" + + # data_value에 저장할 JSON 데이터 (전체 product_info) + data_value_json = json.dumps(product_info, ensure_ascii=False) + + # 현재 시간 + created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # MariaDB에 저장 + with self.db_manager.get_cursor() as cursor: + sql = """ + INSERT INTO task_io_data + (task_run_id, io_type, name, data_type, data_value, created_at) + VALUES (%s, %s, %s, %s, %s, %s) \ + """ + + cursor.execute( + sql, + ( + task_run_id, + "OUTPUT", + product_name, + "JSON", + data_value_json, + created_at, + ), ) - selected_index = best_upload["product_index"] - # 원본 크롤링 데이터에서 해당 상품 찾기 - for product_info in crawled_products: - if product_info.get("index") == selected_index: - logger.info( - f"콘텐츠 생성용 상품 선택: index={selected_index}, " - f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}', " - f"images={best_upload.get('success_count', 0)}개" - ) - return { - "selection_reason": "s3_upload_success_with_most_images", - "product_info": product_info, - "s3_upload_info": best_upload, - } - - # 2순위: 크롤링 성공한 첫 번째 상품 (S3 업로드 실패해도) - for product_info in crawled_products: - if product_info.get("status") == "success" and product_info.get( - "product_detail" - ): - - # 해당 상품의 S3 업로드 정보 찾기 - upload_info = None - for result in upload_results: - if result.get("product_index") == product_info.get("index"): - upload_info = result - break + logger.success(f"상품 {product_index} DB 저장 성공: name={product_name}") - logger.info( - f"콘텐츠 생성용 상품 선택 (fallback): index={product_info.get('index')}, " - f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}'" - ) - return { - "selection_reason": "first_crawl_success", - "product_info": product_info, - "s3_upload_info": upload_info, - } - - # 3순위: 아무거나 (모든 상품이 실패한 경우) - if crawled_products: - logger.warning("모든 상품이 크롤링 실패 - 첫 번째 상품으로 fallback") - return { - "selection_reason": "fallback_first_product", - "product_info": crawled_products[0], - "s3_upload_info": upload_results[0] if upload_results else None, - } - - logger.error("선택할 상품이 없습니다") return { - "selection_reason": "no_products_available", - "product_info": None, - "s3_upload_info": None, + "product_index": product_index, + "product_name": product_name, + "db_status": "success", + "task_run_id": task_run_id, } except Exception as e: - logger.error(f"단일 상품 선택 오류: {e}") + logger.error(f"상품 {product_index} DB 저장 오류: {e}") return { - "selection_reason": "selection_error", - "product_info": crawled_products[0] if crawled_products else None, - "s3_upload_info": upload_results[0] if upload_results else None, + "product_index": product_index, + "db_status": "error", "error": str(e), } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index 419a23a4..8a8008ed 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -20,7 +20,7 @@ public class BlogRagBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; private static final String TASK_NAME = "블로그 RAG 생성 태스크"; private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String S3_UPLOAD_SOURCE_TASK = "S3 업로드 태스크"; // 변경: 크롤링 → S3 업로드 + private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; // 변경: S3 업로드 → 상품 선택 @Override public boolean supports(String taskName) { @@ -36,14 +36,8 @@ public ObjectNode build(Task task, Map workflowContext) { .map(node -> node.path("data").path("keyword")) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // S3 업로드에서 선택된 상품 정보 가져오기 (변경된 부분) - Optional.ofNullable(workflowContext.get(S3_UPLOAD_SOURCE_TASK)) - .map( - node -> - node.path("data") - .path("selected_product_for_content") - .path("product_info") - .path("product_detail")) + Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) + .map(node -> node.path("data").path("selected_product")) .ifPresent(productNode -> body.set("product_info", productNode)); // 기본 콘텐츠 설정 diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java new file mode 100644 index 00000000..17934012 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -0,0 +1,40 @@ +package site.icebang.domain.workflow.runner.fastapi.body; + +import java.util.Map; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class ProductSelectBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 선택 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // task_run_id는 현재 실행 중인 task의 run_id를 사용 + // 실제 구현에서는 Task 객체나 워크플로우 컨텍스트에서 가져와야 할 수 있습니다. + body.put("task_run_id", task.getId()); // Task 객체에서 ID를 가져오는 방식으로 가정 + + // 기본 선택 기준 설정 (이미지 개수 우선) + body.put("selection_criteria", "image_count_priority"); + + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index bd0f823e..7548452a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -31,17 +31,19 @@ public boolean supports(String taskName) { public ObjectNode build(Task task, Map workflowContext) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 + // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) .map(node -> node.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); // 크롤링된 상품 데이터 가져오기 Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) .map(node -> node.path("data").path("crawled_products")) + .filter(node -> !node.isMissingNode()) .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); - // 기본 폴더 설정 + // 기본 폴더 설정 (스키마의 기본값과 일치) body.put("base_folder", "product"); return body;