From 8b524d7d0e7ca7380065de00b2d3ca234d447b64 Mon Sep 17 00:00:00 2001 From: thkim7 Date: Sat, 27 Sep 2025 17:25:04 +0900 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20OCR=20=EC=B2=98=EB=A6=AC=EB=90=9C?= =?UTF-8?q?=20=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EB=B8=94=EB=A1=9C=EA=B7=B8=20?= =?UTF-8?q?=EC=BD=98=ED=85=90=EC=B8=A0=20=EC=83=9D=EC=84=B1=EC=97=90=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80(=EC=B4=88=EC=95=88)=20-=20=EC=8A=A4=ED=82=A4?= =?UTF-8?q?=EB=A7=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/pre-processing-service/app/model/schemas.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index 4001b705..b50077cd 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -186,6 +186,9 @@ class S3ImageInfo(BaseModel): ..., title="원본 URL", description="크롤링된 원본 이미지 URL" ) s3_url: str = Field(..., title="S3 URL", description="S3에서 접근 가능한 URL") + # 새로 추가: 파일 크기 정보 (이미지 선별용) + file_size_kb: Optional[float] = Field(None, title="파일 크기(KB)", description="이미지 파일 크기") + file_name: Optional[str] = Field(None, title="파일명", description="S3에 저장된 파일명") # 상품별 S3 업로드 결과 @@ -274,14 +277,14 @@ class RequestBlogCreate(RequestBase): keyword: Optional[str] = Field( None, title="키워드", description="콘텐츠 생성용 키워드" ) + translation_language: Optional[str] = Field( + None, title="번역한 언어", description="이미지에서 중국어를 한국어로 번역한 언어" + ) product_info: Optional[Dict] = Field( None, title="상품 정보", description="블로그 콘텐츠에 포함할 상품 정보" ) - content_type: Optional[str] = Field( - None, title="콘텐츠 타입", description="생성할 콘텐츠 유형" - ) - target_length: Optional[int] = Field( - None, title="목표 글자 수", description="생성할 콘텐츠의 목표 길이" + uploaded_images: Optional[List[Dict]] = Field( + None, title="업로드된 이미지", description="S3에 업로드된 이미지 목록 (크기 정보 포함)" ) From 11f3d49ef26cdcdaf0451fa1c83b16be395b509c Mon Sep 17 00:00:00 2001 From: thkim7 Date: Sat, 27 Sep 2025 17:25:10 +0900 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20OCR=20=EC=B2=98=EB=A6=AC=EB=90=9C?= =?UTF-8?q?=20=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EB=B8=94=EB=A1=9C=EA=B7=B8=20?= =?UTF-8?q?=EC=BD=98=ED=85=90=EC=B8=A0=20=EC=83=9D=EC=84=B1=EC=97=90=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80(=EC=B4=88=EC=95=88)=20-=20=EC=9C=A0=ED=8B=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/utils/s3_upload_util.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/apps/pre-processing-service/app/utils/s3_upload_util.py b/apps/pre-processing-service/app/utils/s3_upload_util.py index 0aaa5ace..a47f2ae3 100644 --- a/apps/pre-processing-service/app/utils/s3_upload_util.py +++ b/apps/pre-processing-service/app/utils/s3_upload_util.py @@ -159,16 +159,15 @@ def get_s3_url(self, s3_key: str) -> str: return f"{self.base_url}/{s3_key}" async def upload_single_product_images( - self, - session: aiohttp.ClientSession, - product_info: Dict, # 🔸 이름 변경: product_data → product_info (전체 크롤링 데이터) - product_index: int, - keyword: str, # 키워드 파라미터 추가 - base_folder: str = "product", # 🔸 기본 폴더 변경: product-images → product + self, + session: aiohttp.ClientSession, + product_info: Dict, + product_index: int, + keyword: str, + base_folder: str = "product", ) -> Dict: """단일 상품의 모든 데이터(이미지 + JSON)를 S3에 업로드""" - # 🔸 전체 크롤링 데이터에서 필요한 정보 추출 product_detail = product_info.get("product_detail", {}) product_title = product_detail.get("title", "Unknown") product_images = product_detail.get("product_images", []) @@ -179,18 +178,15 @@ async def upload_single_product_images( f"상품 {product_index} 업로드 시작: {len(product_images)}개 이미지, keyword='{keyword}'" ) - # 키워드 기반 폴더명 한 번만 생성 folder_name = self.generate_product_folder_name(product_index, keyword) - fail_count = 0 folder_s3_url = f"{self.base_url}/{base_folder}/{folder_name}" - # 🆕 1. 먼저 상품 데이터 JSON 파일 업로드 + # 1. JSON 파일 업로드 try: - # 전체 크롤링 데이터를 JSON으로 저장 (S3 업로드 메타데이터 추가) product_data_with_meta = { - **product_info, # 전체 크롤링 데이터 (index, url, product_detail, status, crawled_at 포함) - "s3_upload_keyword": keyword, # 추가 메타데이터 + **product_info, + "s3_upload_keyword": keyword, "s3_uploaded_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } @@ -206,7 +202,7 @@ async def upload_single_product_images( except Exception as e: logger.error(f"상품 {product_index} JSON 업로드 오류: {e}") - # 2. 이미지 업로드 (기존 로직) + # 2. 이미지 업로드 if not product_images: logger.warning(f"상품 {product_index}: 업로드할 이미지가 없음") return { @@ -236,7 +232,10 @@ async def upload_single_product_images( fail_count += 1 continue - # S3 키 생성 (키워드 기반 폴더명 사용) + # 파일 크기 계산 (KB 단위) + file_size_kb = len(image_data) / 1024 + + # S3 키 생성 file_extension = self.get_file_extension(original_url) image_file_name = f"image_{img_idx:03d}{file_extension}" s3_key = self.generate_s3_key(base_folder, folder_name, image_file_name) @@ -246,15 +245,18 @@ async def upload_single_product_images( if self.upload_to_s3(image_data, s3_key, content_type): s3_url = self.get_s3_url(s3_key) + # 파일 크기 정보 추가 uploaded_images.append( { "index": img_idx, "original_url": original_url, "s3_url": s3_url, + "file_size_kb": round(file_size_kb, 2), + "file_name": image_file_name, } ) - logger.debug(f"상품 {product_index}, 이미지 {img_idx} 업로드 완료") + logger.debug(f"상품 {product_index}, 이미지 {img_idx} 업로드 완료 ({file_size_kb:.1f}KB)") else: fail_count += 1 @@ -273,8 +275,8 @@ async def upload_single_product_images( "product_index": product_index, "product_title": product_title, "status": "completed", - "folder_s3_url": folder_s3_url, # 🔸 폴더 전체를 가리킴 (이미지 + JSON 포함) - "json_s3_url": f"{folder_s3_url}/product_data.json", # 🆕 JSON 파일 직접 링크 + "folder_s3_url": folder_s3_url, + "json_s3_url": f"{folder_s3_url}/product_data.json", "uploaded_images": uploaded_images, "success_count": len(uploaded_images), "fail_count": fail_count, From 4f9bdc75c647d05d8dcd81b933c97d86589c329d Mon Sep 17 00:00:00 2001 From: thkim7 Date: Sat, 27 Sep 2025 17:25:15 +0900 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20OCR=20=EC=B2=98=EB=A6=AC=EB=90=9C?= =?UTF-8?q?=20=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EB=B8=94=EB=A1=9C=EA=B7=B8=20?= =?UTF-8?q?=EC=BD=98=ED=85=90=EC=B8=A0=20=EC=83=9D=EC=84=B1=EC=97=90=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80(=EC=B4=88=EC=95=88)=20-=20=EC=84=9C=EB=B9=84?= =?UTF-8?q?=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/service/blog/blog_create_service.py | 293 +++++++++++++----- 1 file changed, 222 insertions(+), 71 deletions(-) diff --git a/apps/pre-processing-service/app/service/blog/blog_create_service.py b/apps/pre-processing-service/app/service/blog/blog_create_service.py index a66fa609..11d92285 100644 --- a/apps/pre-processing-service/app/service/blog/blog_create_service.py +++ b/apps/pre-processing-service/app/service/blog/blog_create_service.py @@ -1,5 +1,6 @@ -import json import logging +import os +import boto3 from loguru import logger from datetime import datetime from typing import Dict, List, Optional, Any @@ -19,19 +20,94 @@ def __init__(self): if not self.openai_api_key: raise ValueError("OPENAI_API_KEY가 .env.dev 파일에 설정되지 않았습니다.") - # 인스턴스 레벨에서 클라이언트 생성 self.client = OpenAI(api_key=self.openai_api_key) + + # S3 클라이언트 추가 + self.s3_client = boto3.client( + "s3", + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=os.getenv("AWS_REGION", "ap-northeast-2") + ) + self.bucket_name = os.getenv("S3_BUCKET_NAME", "icebang4-dev-bucket") + logging.basicConfig(level=logging.INFO) - def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: - """ - 요청 데이터를 기반으로 블로그 콘텐츠 생성 + def _fetch_images_from_s3(self, keyword: str, product_index: int = 1) -> List[Dict]: + """S3에서 해당 상품의 이미지 정보를 조회""" + try: + # 폴더 패턴: 20250922_키워드_1/ 형식으로 검색 + from datetime import datetime + date_str = datetime.now().strftime("%Y%m%d") + + # 키워드 정리 (S3UploadUtil과 동일한 방식) + safe_keyword = ( + keyword.replace("/", "-") + .replace("\\", "-") + .replace(" ", "_") + .replace(":", "-") + .replace("*", "-") + .replace("?", "-") + .replace('"', "-") + .replace("<", "-") + .replace(">", "-") + .replace("|", "-")[:20] + ) + + folder_prefix = f"product/{date_str}_{safe_keyword}_{product_index}/" + + logger.debug(f"S3에서 이미지 조회: {folder_prefix}") + + # S3에서 해당 폴더의 파일 목록 조회 + response = self.s3_client.list_objects_v2( + Bucket=self.bucket_name, + Prefix=folder_prefix + ) + + if 'Contents' not in response: + logger.warning(f"S3에서 이미지를 찾을 수 없음: {folder_prefix}") + return [] + + images = [] + base_url = f"https://{self.bucket_name}.s3.ap-northeast-2.amazonaws.com" + + # 이미지 파일만 필터링 (image_*.jpg 패턴) + for obj in response['Contents']: + key = obj['Key'] + file_name = key.split('/')[-1] # 마지막 부분이 파일명 + + # 이미지 파일인지 확인 + if file_name.startswith('image_') and file_name.endswith(('.jpg', '.jpeg', '.png')): + # 파일 크기 정보 (bytes -> KB) + file_size_kb = obj['Size'] / 1024 - Args: - request: RequestBlogCreate 객체 + # 인덱스 추출 (image_001.jpg -> 1) + try: + index = int(file_name.split('_')[1].split('.')[0]) + except: + index = len(images) + 1 - Returns: - Dict: {"title": str, "content": str, "tags": List[str]} 형태의 결과 + images.append({ + "index": index, + "s3_url": f"{base_url}/{key}", + "file_name": file_name, + "file_size_kb": round(file_size_kb, 2), + "original_url": "" # 원본 URL은 S3에서 조회 불가 + }) + + # 인덱스 순으로 정렬 + images.sort(key=lambda x: x['index']) + + logger.success(f"S3에서 이미지 {len(images)}개 조회 완료") + return images + + except Exception as e: + logger.error(f"S3 이미지 조회 실패: {e}") + return [] + + def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: + """ + 요청 데이터를 기반으로 블로그 콘텐츠 생성 (이미지 자동 배치 포함) """ try: logger.debug("[STEP1] 콘텐츠 컨텍스트 준비 시작") @@ -50,6 +126,22 @@ def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: result = self._parse_generated_content(generated_content, request) logger.debug("[STEP4 완료]") + # STEP5: S3에서 이미지 정보 조회 (새로 추가) + uploaded_images = request.uploaded_images + if not uploaded_images and request.keyword: + logger.debug("[STEP5-1] S3에서 이미지 정보 조회 시작") + uploaded_images = self._fetch_images_from_s3(request.keyword) + logger.debug(f"[STEP5-1 완료] 조회된 이미지: {len(uploaded_images)}개") + + # STEP6: 이미지 자동 배치 + if uploaded_images and len(uploaded_images) > 0: + logger.debug("[STEP6] 이미지 자동 배치 시작") + result['content'] = self._insert_images_to_content( + result['content'], + uploaded_images + ) + logger.debug("[STEP6 완료] 이미지 배치 완료") + return result except Exception as e: @@ -60,29 +152,29 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: """요청 데이터를 콘텐츠 생성용 컨텍스트로 변환""" context_parts = [] - # 키워드 정보 추가 + # 키워드 정보 if request.keyword: context_parts.append(f"주요 키워드: {request.keyword}") - # 상품 정보 추가 + # 상품 정보 if request.product_info: context_parts.append("\n상품 정보:") - # 상품 기본 정보 if request.product_info.get("title"): context_parts.append(f"- 상품명: {request.product_info['title']}") if request.product_info.get("price"): - context_parts.append(f"- 가격: {request.product_info['price']:,}원") + try: + context_parts.append(f"- 가격: {int(request.product_info['price']):,}원") + except Exception: + context_parts.append(f"- 가격: {request.product_info.get('price')}") if request.product_info.get("rating"): context_parts.append(f"- 평점: {request.product_info['rating']}/5.0") - # 상품 상세 정보 if request.product_info.get("description"): context_parts.append(f"- 설명: {request.product_info['description']}") - # 상품 사양 (material_info 등) if request.product_info.get("material_info"): context_parts.append("- 주요 사양:") specs = request.product_info["material_info"] @@ -90,37 +182,135 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: for key, value in specs.items(): context_parts.append(f" * {key}: {value}") - # 상품 옵션 if request.product_info.get("options"): options = request.product_info["options"] context_parts.append(f"- 구매 옵션 ({len(options)}개):") - for i, option in enumerate(options[:5], 1): # 최대 5개만 + for i, option in enumerate(options[:5], 1): if isinstance(option, dict): option_name = option.get("name", f"옵션 {i}") context_parts.append(f" {i}. {option_name}") else: context_parts.append(f" {i}. {option}") - # 구매 링크 - if request.product_info.get("url") or request.product_info.get( - "product_url" - ): - url = request.product_info.get("url") or request.product_info.get( - "product_url" - ) + if request.product_info.get("url") or request.product_info.get("product_url"): + url = request.product_info.get("url") or request.product_info.get("product_url") context_parts.append(f"- 구매 링크: {url}") + # 번역 텍스트 (translation_language) 추가 + if request.translation_language: + context_parts.append("\n이미지(OCR)에서 추출·번역된 텍스트:") + context_parts.append(request.translation_language.strip()) + return "\n".join(context_parts) if context_parts else "키워드 기반 콘텐츠 생성" + def _select_best_images(self, uploaded_images: List[Dict], target_count: int = 4) -> List[Dict]: + """크기 기반으로 최적의 이미지 4개 선별""" + if not uploaded_images: + return [] + + logger.debug(f"이미지 선별 시작: 전체 {len(uploaded_images)}개 -> 목표 {target_count}개") + + # 1단계: 너무 작은 이미지 제외 (20KB 이하는 아이콘, 로고 가능성) + filtered = [img for img in uploaded_images if img.get('file_size_kb', 0) > 20] + logger.debug(f"크기 필터링 후: {len(filtered)}개 이미지 남음") + + if len(filtered) == 0: + # 모든 이미지가 너무 작다면 원본에서 선택 + filtered = uploaded_images + + # 2단계: 크기순 정렬 (큰 이미지 = 메인 상품 사진일 가능성) + sorted_images = sorted(filtered, key=lambda x: x.get('file_size_kb', 0), reverse=True) + + # 3단계: 상위 이미지 선택하되, 너무 많으면 균등 분산 + if len(sorted_images) <= target_count: + selected = sorted_images + else: + # 상위 2개 (메인 이미지) + 나머지에서 균등분산으로 2개 + selected = sorted_images[:2] # 큰 이미지 2개 + + remaining = sorted_images[2:] + if len(remaining) >= 2: + step = len(remaining) // 2 + selected.extend([remaining[i * step] for i in range(2)]) + + result = selected[:target_count] + + logger.debug(f"최종 선택된 이미지: {len(result)}개") + for i, img in enumerate(result): + logger.debug(f" {i + 1}. {img.get('file_name', 'unknown')} ({img.get('file_size_kb', 0):.1f}KB)") + + return result + + def _insert_images_to_content(self, content: str, uploaded_images: List[Dict]) -> str: + """AI가 적절한 위치에 이미지 4개를 자동 배치""" + + # 1단계: 최적의 이미지 4개 선별 + selected_images = self._select_best_images(uploaded_images, target_count=4) + + if not selected_images: + logger.warning("선별된 이미지가 없어서 이미지 배치를 건너뜀") + return content + + logger.debug(f"이미지 배치 시작: {len(selected_images)}개 이미지") + + # 2단계: AI에게 이미지 배치 위치 물어보기 + image_placement_prompt = f""" +다음 HTML 콘텐츠에서 이미지 {len(selected_images)}개를 적절한 위치에 배치해주세요. + +콘텐츠: +{content} + +이미지 개수: {len(selected_images)}개 + +요구사항: +- 각 섹션(h2, h3 태그)마다 골고루 분산 배치 +- 너무 몰려있지 않게 적절한 간격 유지 +- 글의 흐름을 방해하지 않는 자연스러운 위치 +- [IMAGE_1], [IMAGE_2], [IMAGE_3], [IMAGE_4] 형식의 플레이스홀더로 표시 + +⚠️ 주의사항: +- 기존 HTML 구조와 내용은 그대로 유지 +- 오직 이미지 플레이스홀더만 적절한 위치에 삽입 +- 코드 블록(```)은 사용하지 말고 수정된 HTML만 반환 + +수정된 HTML을 반환해주세요. +""" + + try: + # 3단계: AI로 배치 위치 결정 + modified_content = self._generate_with_openai(image_placement_prompt) + + # 4단계: 플레이스홀더를 실제 img 태그로 교체 + for i, img in enumerate(selected_images): + img_tag = f''' +
+ 상품 이미지 {i + 1} +
''' + + placeholder = f"[IMAGE_{i + 1}]" + modified_content = modified_content.replace(placeholder, img_tag) + + # 5단계: 남은 플레이스홀더 제거 (혹시 AI가 더 많이 만들었을 경우) + import re + modified_content = re.sub(r'\[IMAGE_\d+\]', '', modified_content) + + logger.success(f"이미지 배치 완료: {len(selected_images)}개 이미지 삽입") + return modified_content + + except Exception as e: + logger.error(f"이미지 배치 중 오류: {e}, 원본 콘텐츠 반환") + return content + def _create_content_prompt(self, context: str, request: RequestBlogCreate) -> str: """콘텐츠 생성용 프롬프트 생성""" # 기본 키워드가 없으면 상품 제목에서 추출 main_keyword = request.keyword if ( - not main_keyword - and request.product_info - and request.product_info.get("title") + not main_keyword + and request.product_info + and request.product_info.get("title") ): main_keyword = request.product_info["title"] @@ -138,7 +328,7 @@ def _create_content_prompt(self, context: str, request: RequestBlogCreate) -> st 작성 요구사항: 1. SEO 친화적이고 클릭하고 싶은 매력적인 제목 2. 독자의 관심을 끄는 도입부 -3. 핵심 특징과 장점을 구체적으로 설명 +3. 핵심 특징과 장점을 구체적으로 설명 (h2, h3 태그로 구조화) 4. 실제 사용 시나리오나 활용 팁 5. 구매 결정에 도움이 되는 정보 @@ -147,6 +337,7 @@ def _create_content_prompt(self, context: str, request: RequestBlogCreate) -> st - 출력 시 ```나 ```html 같은 코드 블록 구문을 포함하지 마세요. - 오직 HTML 태그만 사용하여 구조화된 콘텐츠를 작성해주세요. (예:

,

,

,

    ,
  • 등) +- 이미지는 나중에 자동으로 삽입되므로 img 태그를 작성하지 마세요. """ return prompt @@ -170,11 +361,11 @@ def _generate_with_openai(self, prompt: str) -> str: return response.choices[0].message.content except Exception as e: - self.logger.error(f"OpenAI API 호출 실패: {e}") + logger.error(f"OpenAI API 호출 실패: {e}") raise def _parse_generated_content( - self, content: str, request: RequestBlogCreate + self, content: str, request: RequestBlogCreate ) -> Dict[str, Any]: """생성된 콘텐츠를 파싱하여 구조화""" @@ -303,44 +494,4 @@ def _create_fallback_content(self, request: RequestBlogCreate) -> Dict[str, Any] "title": title, "content": content, "tags": self._generate_tags(request), - } - - -# if __name__ == '__main__': -# # 테스트용 요청 데이터 -# test_request = RequestBlogCreate( -# keyword="아이폰 케이스", -# product_info={ -# "title": "아이폰 15 프로 투명 케이스", -# "price": 29900, -# "rating": 4.8, -# "description": "9H 강화 보호 기능을 제공하는 투명 케이스", -# "material_info": { -# "소재": "TPU + PC", -# "두께": "1.2mm", -# "색상": "투명", -# "호환성": "아이폰 15 Pro" -# }, -# "options": [ -# {"name": "투명"}, -# {"name": "반투명"}, -# {"name": "블랙"} -# ], -# "url": "https://example.com/iphone-case" -# } -# ) -# -# # 서비스 실행 -# service = BlogContentService() -# print("=== 블로그 콘텐츠 생성 테스트 ===") -# print(f"키워드: {test_request.keyword}") -# print(f"상품: {test_request.product_info['title']}") -# print("\n--- 생성 시작 ---") -# -# result = service.generate_blog_content(test_request) -# -# print(f"\n=== 생성 결과 ===") -# print(f"제목: {result['title']}") -# print(f"\n태그: {', '.join(result['tags'])}") -# print(f"\n내용:\n{result['content']}") -# print(f"\n글자수: {len(result['content'])}자") + } \ No newline at end of file From 6544dc8df2d6455c2a9e80dac427920f828f54d9 Mon Sep 17 00:00:00 2001 From: thkim7 Date: Sat, 27 Sep 2025 17:25:31 +0900 Subject: [PATCH 4/4] chore: poetry run black . --- .../app/model/schemas.py | 16 +++- .../app/service/blog/blog_create_service.py | 96 +++++++++++-------- .../app/utils/s3_upload_util.py | 16 ++-- 3 files changed, 79 insertions(+), 49 deletions(-) diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index b50077cd..4a49ca0e 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -187,8 +187,12 @@ class S3ImageInfo(BaseModel): ) s3_url: str = Field(..., title="S3 URL", description="S3에서 접근 가능한 URL") # 새로 추가: 파일 크기 정보 (이미지 선별용) - file_size_kb: Optional[float] = Field(None, title="파일 크기(KB)", description="이미지 파일 크기") - file_name: Optional[str] = Field(None, title="파일명", description="S3에 저장된 파일명") + file_size_kb: Optional[float] = Field( + None, title="파일 크기(KB)", description="이미지 파일 크기" + ) + file_name: Optional[str] = Field( + None, title="파일명", description="S3에 저장된 파일명" + ) # 상품별 S3 업로드 결과 @@ -278,13 +282,17 @@ class RequestBlogCreate(RequestBase): None, title="키워드", description="콘텐츠 생성용 키워드" ) translation_language: Optional[str] = Field( - None, title="번역한 언어", description="이미지에서 중국어를 한국어로 번역한 언어" + None, + title="번역한 언어", + description="이미지에서 중국어를 한국어로 번역한 언어", ) product_info: Optional[Dict] = Field( None, title="상품 정보", description="블로그 콘텐츠에 포함할 상품 정보" ) uploaded_images: Optional[List[Dict]] = Field( - None, title="업로드된 이미지", description="S3에 업로드된 이미지 목록 (크기 정보 포함)" + None, + title="업로드된 이미지", + description="S3에 업로드된 이미지 목록 (크기 정보 포함)", ) diff --git a/apps/pre-processing-service/app/service/blog/blog_create_service.py b/apps/pre-processing-service/app/service/blog/blog_create_service.py index 11d92285..fdc8b6a0 100644 --- a/apps/pre-processing-service/app/service/blog/blog_create_service.py +++ b/apps/pre-processing-service/app/service/blog/blog_create_service.py @@ -27,7 +27,7 @@ def __init__(self): "s3", aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - region_name=os.getenv("AWS_REGION", "ap-northeast-2") + region_name=os.getenv("AWS_REGION", "ap-northeast-2"), ) self.bucket_name = os.getenv("S3_BUCKET_NAME", "icebang4-dev-bucket") @@ -38,6 +38,7 @@ def _fetch_images_from_s3(self, keyword: str, product_index: int = 1) -> List[Di try: # 폴더 패턴: 20250922_키워드_1/ 형식으로 검색 from datetime import datetime + date_str = datetime.now().strftime("%Y%m%d") # 키워드 정리 (S3UploadUtil과 동일한 방식) @@ -60,11 +61,10 @@ def _fetch_images_from_s3(self, keyword: str, product_index: int = 1) -> List[Di # S3에서 해당 폴더의 파일 목록 조회 response = self.s3_client.list_objects_v2( - Bucket=self.bucket_name, - Prefix=folder_prefix + Bucket=self.bucket_name, Prefix=folder_prefix ) - if 'Contents' not in response: + if "Contents" not in response: logger.warning(f"S3에서 이미지를 찾을 수 없음: {folder_prefix}") return [] @@ -72,31 +72,35 @@ def _fetch_images_from_s3(self, keyword: str, product_index: int = 1) -> List[Di base_url = f"https://{self.bucket_name}.s3.ap-northeast-2.amazonaws.com" # 이미지 파일만 필터링 (image_*.jpg 패턴) - for obj in response['Contents']: - key = obj['Key'] - file_name = key.split('/')[-1] # 마지막 부분이 파일명 + for obj in response["Contents"]: + key = obj["Key"] + file_name = key.split("/")[-1] # 마지막 부분이 파일명 # 이미지 파일인지 확인 - if file_name.startswith('image_') and file_name.endswith(('.jpg', '.jpeg', '.png')): + if file_name.startswith("image_") and file_name.endswith( + (".jpg", ".jpeg", ".png") + ): # 파일 크기 정보 (bytes -> KB) - file_size_kb = obj['Size'] / 1024 + file_size_kb = obj["Size"] / 1024 # 인덱스 추출 (image_001.jpg -> 1) try: - index = int(file_name.split('_')[1].split('.')[0]) + index = int(file_name.split("_")[1].split(".")[0]) except: index = len(images) + 1 - images.append({ - "index": index, - "s3_url": f"{base_url}/{key}", - "file_name": file_name, - "file_size_kb": round(file_size_kb, 2), - "original_url": "" # 원본 URL은 S3에서 조회 불가 - }) + images.append( + { + "index": index, + "s3_url": f"{base_url}/{key}", + "file_name": file_name, + "file_size_kb": round(file_size_kb, 2), + "original_url": "", # 원본 URL은 S3에서 조회 불가 + } + ) # 인덱스 순으로 정렬 - images.sort(key=lambda x: x['index']) + images.sort(key=lambda x: x["index"]) logger.success(f"S3에서 이미지 {len(images)}개 조회 완료") return images @@ -136,9 +140,8 @@ def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: # STEP6: 이미지 자동 배치 if uploaded_images and len(uploaded_images) > 0: logger.debug("[STEP6] 이미지 자동 배치 시작") - result['content'] = self._insert_images_to_content( - result['content'], - uploaded_images + result["content"] = self._insert_images_to_content( + result["content"], uploaded_images ) logger.debug("[STEP6 완료] 이미지 배치 완료") @@ -165,7 +168,9 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: if request.product_info.get("price"): try: - context_parts.append(f"- 가격: {int(request.product_info['price']):,}원") + context_parts.append( + f"- 가격: {int(request.product_info['price']):,}원" + ) except Exception: context_parts.append(f"- 가격: {request.product_info.get('price')}") @@ -192,8 +197,12 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: else: context_parts.append(f" {i}. {option}") - if request.product_info.get("url") or request.product_info.get("product_url"): - url = request.product_info.get("url") or request.product_info.get("product_url") + if request.product_info.get("url") or request.product_info.get( + "product_url" + ): + url = request.product_info.get("url") or request.product_info.get( + "product_url" + ) context_parts.append(f"- 구매 링크: {url}") # 번역 텍스트 (translation_language) 추가 @@ -203,15 +212,19 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: return "\n".join(context_parts) if context_parts else "키워드 기반 콘텐츠 생성" - def _select_best_images(self, uploaded_images: List[Dict], target_count: int = 4) -> List[Dict]: + def _select_best_images( + self, uploaded_images: List[Dict], target_count: int = 4 + ) -> List[Dict]: """크기 기반으로 최적의 이미지 4개 선별""" if not uploaded_images: return [] - logger.debug(f"이미지 선별 시작: 전체 {len(uploaded_images)}개 -> 목표 {target_count}개") + logger.debug( + f"이미지 선별 시작: 전체 {len(uploaded_images)}개 -> 목표 {target_count}개" + ) # 1단계: 너무 작은 이미지 제외 (20KB 이하는 아이콘, 로고 가능성) - filtered = [img for img in uploaded_images if img.get('file_size_kb', 0) > 20] + filtered = [img for img in uploaded_images if img.get("file_size_kb", 0) > 20] logger.debug(f"크기 필터링 후: {len(filtered)}개 이미지 남음") if len(filtered) == 0: @@ -219,7 +232,9 @@ def _select_best_images(self, uploaded_images: List[Dict], target_count: int = 4 filtered = uploaded_images # 2단계: 크기순 정렬 (큰 이미지 = 메인 상품 사진일 가능성) - sorted_images = sorted(filtered, key=lambda x: x.get('file_size_kb', 0), reverse=True) + sorted_images = sorted( + filtered, key=lambda x: x.get("file_size_kb", 0), reverse=True + ) # 3단계: 상위 이미지 선택하되, 너무 많으면 균등 분산 if len(sorted_images) <= target_count: @@ -237,11 +252,15 @@ def _select_best_images(self, uploaded_images: List[Dict], target_count: int = 4 logger.debug(f"최종 선택된 이미지: {len(result)}개") for i, img in enumerate(result): - logger.debug(f" {i + 1}. {img.get('file_name', 'unknown')} ({img.get('file_size_kb', 0):.1f}KB)") + logger.debug( + f" {i + 1}. {img.get('file_name', 'unknown')} ({img.get('file_size_kb', 0):.1f}KB)" + ) return result - def _insert_images_to_content(self, content: str, uploaded_images: List[Dict]) -> str: + def _insert_images_to_content( + self, content: str, uploaded_images: List[Dict] + ) -> str: """AI가 적절한 위치에 이미지 4개를 자동 배치""" # 1단계: 최적의 이미지 4개 선별 @@ -282,18 +301,19 @@ def _insert_images_to_content(self, content: str, uploaded_images: List[Dict]) - # 4단계: 플레이스홀더를 실제 img 태그로 교체 for i, img in enumerate(selected_images): - img_tag = f''' + img_tag = f"""
    상품 이미지 {i + 1} -
    ''' +""" placeholder = f"[IMAGE_{i + 1}]" modified_content = modified_content.replace(placeholder, img_tag) # 5단계: 남은 플레이스홀더 제거 (혹시 AI가 더 많이 만들었을 경우) import re - modified_content = re.sub(r'\[IMAGE_\d+\]', '', modified_content) + + modified_content = re.sub(r"\[IMAGE_\d+\]", "", modified_content) logger.success(f"이미지 배치 완료: {len(selected_images)}개 이미지 삽입") return modified_content @@ -308,9 +328,9 @@ def _create_content_prompt(self, context: str, request: RequestBlogCreate) -> st # 기본 키워드가 없으면 상품 제목에서 추출 main_keyword = request.keyword if ( - not main_keyword - and request.product_info - and request.product_info.get("title") + not main_keyword + and request.product_info + and request.product_info.get("title") ): main_keyword = request.product_info["title"] @@ -365,7 +385,7 @@ def _generate_with_openai(self, prompt: str) -> str: raise def _parse_generated_content( - self, content: str, request: RequestBlogCreate + self, content: str, request: RequestBlogCreate ) -> Dict[str, Any]: """생성된 콘텐츠를 파싱하여 구조화""" @@ -494,4 +514,4 @@ def _create_fallback_content(self, request: RequestBlogCreate) -> Dict[str, Any] "title": title, "content": content, "tags": self._generate_tags(request), - } \ No newline at end of file + } diff --git a/apps/pre-processing-service/app/utils/s3_upload_util.py b/apps/pre-processing-service/app/utils/s3_upload_util.py index a47f2ae3..374bfd8e 100644 --- a/apps/pre-processing-service/app/utils/s3_upload_util.py +++ b/apps/pre-processing-service/app/utils/s3_upload_util.py @@ -159,12 +159,12 @@ def get_s3_url(self, s3_key: str) -> str: return f"{self.base_url}/{s3_key}" async def upload_single_product_images( - self, - session: aiohttp.ClientSession, - product_info: Dict, - product_index: int, - keyword: str, - base_folder: str = "product", + self, + session: aiohttp.ClientSession, + product_info: Dict, + product_index: int, + keyword: str, + base_folder: str = "product", ) -> Dict: """단일 상품의 모든 데이터(이미지 + JSON)를 S3에 업로드""" @@ -256,7 +256,9 @@ async def upload_single_product_images( } ) - logger.debug(f"상품 {product_index}, 이미지 {img_idx} 업로드 완료 ({file_size_kb:.1f}KB)") + logger.debug( + f"상품 {product_index}, 이미지 {img_idx} 업로드 완료 ({file_size_kb:.1f}KB)" + ) else: fail_count += 1