diff --git a/apps/commerce-api/build.gradle.kts b/apps/commerce-api/build.gradle.kts index 461c16ef1..ccaf93ceb 100644 --- a/apps/commerce-api/build.gradle.kts +++ b/apps/commerce-api/build.gradle.kts @@ -1,6 +1,6 @@ dependencies { implementation(project(":apps:commerce-core")) - + // add-ons implementation(project(":modules:jpa")) implementation(project(":modules:redis")) @@ -19,6 +19,9 @@ dependencies { exclude(group = "org.yaml", module = "snakeyaml") } + // Spring Batch + implementation("org.springframework.boot:spring-boot-starter-batch") + // Resilience4j implementation("io.github.resilience4j:resilience4j-spring-boot3:${project.properties["resilience4jVersion"]}") implementation("org.springframework.boot:spring-boot-starter-aop") @@ -29,6 +32,9 @@ dependencies { annotationProcessor("jakarta.persistence:jakarta.persistence-api") annotationProcessor("jakarta.annotation:jakarta.annotation-api") + // Spring Batch Test (테스트용) + testImplementation("org.springframework.batch:spring-batch-test") + // test-fixtures testImplementation(testFixtures(project(":modules:jpa"))) testImplementation(testFixtures(project(":modules:redis"))) diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationInfo.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationInfo.java new file mode 100644 index 000000000..d260e65a5 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationInfo.java @@ -0,0 +1,68 @@ +package com.loopers.application.ranking; + +import lombok.Getter; +import org.springframework.batch.core.JobExecution; + +/** + * 랭킹 집계 배치 실행 결과 정보 + */ +@Getter +public class RankingAggregationInfo { + + private final Long jobExecutionId; + private final String jobName; + private final String status; + private final String exitStatus; + private final String period; // "weekly" or "monthly" + private final String message; + + private RankingAggregationInfo( + Long jobExecutionId, + String jobName, + String status, + String exitStatus, + String period, + String message + ) { + this.jobExecutionId = jobExecutionId; + this.jobName = jobName; + this.status = status; + this.exitStatus = exitStatus; + this.period = period; + this.message = message; + } + + public static RankingAggregationInfo from(JobExecution jobExecution, String period) { + return new RankingAggregationInfo( + jobExecution.getId(), + jobExecution.getJobInstance().getJobName(), + jobExecution.getStatus().name(), + jobExecution.getExitStatus() != null ? jobExecution.getExitStatus().getExitCode() : null, + period, + jobExecution.getExitStatus() != null ? jobExecution.getExitStatus().getExitDescription() : null + ); + } + + public static RankingAggregationInfo combined( + RankingAggregationInfo weekly, + RankingAggregationInfo monthly + ) { + boolean bothSuccess = "COMPLETED".equals(weekly.status) && "COMPLETED".equals(monthly.status); + String combinedStatus = bothSuccess ? "COMPLETED" : "FAILED"; + String combinedMessage = String.format( + "Weekly: %s, Monthly: %s", + weekly.status, + monthly.status + ); + + return new RankingAggregationInfo( + null, // combined에는 jobExecutionId 없음 + "weeklyAndMonthlyRankingJob", + combinedStatus, + bothSuccess ? "COMPLETED" : "FAILED", + "both", + combinedMessage + ); + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationScheduler.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationScheduler.java new file mode 100644 index 000000000..8f2ff34e4 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationScheduler.java @@ -0,0 +1,47 @@ +package com.loopers.application.ranking; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.ZonedDateTime; + +/** + * 주간, 월간 랭킹 스케줄러 + *

+ * 매일 자정에 일간 집계 완료 후 주간, 월간 랭킹을 갱신합니다. + * 스프링 배치를 사용하여 tb_product_metrics_daily의 일간 데이터를 집계하고, + * 주간 및 월간 랭킹을 계산합니다. + * 이 작업은 Chunk 단위로 처리하여 대량의 데이터를 효율적으로 처리합니다. + * spring batch read process write 패턴을 따릅니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingAggregationScheduler { + + private final RankingAggregationService rankingAggregationService; + + /** + * 매일 자정에 실행 (일간 집계 완료 후) + * 주간, 월간 랭킹 갱신 + */ + @Scheduled(cron = "0 0 0 * * *") // 매일 자정 + public void aggregateWeeklyAndMonthlyRankings() { + try { + log.info("스케줄러: 주간 및 월간 랭킹 집계 배치 실행 시작"); + + // 어제 날짜 기준으로 집계 (오늘 자정에 실행되므로 어제 데이터까지 집계) + ZonedDateTime targetDate = ZonedDateTime.now().minusDays(1); + + rankingAggregationService.executeWeeklyAndMonthlyRanking(targetDate); + + log.info("스케줄러: 주간 및 월간 랭킹 집계 배치 실행 완료"); + + } catch (Exception e) { + log.error("스케줄러: 주간 및 월간 랭킹 집계 배치 실행 실패", e); + // 스케줄러는 예외를 던지지 않고 로그만 남김 (다음 실행에 영향 주지 않음) + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationService.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationService.java new file mode 100644 index 000000000..773fe66fc --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingAggregationService.java @@ -0,0 +1,172 @@ +package com.loopers.application.ranking; + +import com.loopers.support.error.CoreException; +import com.loopers.support.error.ErrorType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.time.DayOfWeek; +import java.time.ZonedDateTime; +import java.time.temporal.TemporalAdjusters; + +/** + * 랭킹 집계 배치 서비스 + *

+ * 주간/월간 랭킹 집계 배치를 실행하는 서비스를 제공합니다. + */ +@Slf4j +@Component +public class RankingAggregationService { + + private final JobLauncher jobLauncher; + private final Job rankingJob; + + public RankingAggregationService( + JobLauncher jobLauncher, + @Qualifier("rankingJob") Job rankingJob + ) { + this.jobLauncher = jobLauncher; + this.rankingJob = rankingJob; + } + + /** + * 주간 랭킹 집계 배치 실행 + * + * @param targetDate 집계 대상 날짜 (이 날짜가 속한 주간을 집계) + * @return JobExecution 실행 결과 + * @note Spring Batch는 자체 트랜잭션을 관리하므로 @Transactional을 사용하지 않습니다. + */ + public RankingAggregationInfo executeWeeklyRanking(ZonedDateTime targetDate) { + if (targetDate == null) { + throw new CoreException(ErrorType.BAD_REQUEST, "집계 대상 날짜는 필수입니다."); + } + + log.info("주간 랭킹 집계 배치 실행 시작: targetDate={}", targetDate); + + try { + // 주간 시작일/종료일 계산 + ZonedDateTime weekStart = targetDate.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)) + .toLocalDate() + .atStartOfDay(targetDate.getZone()); + ZonedDateTime weekEnd = targetDate.with(TemporalAdjusters.nextOrSame(DayOfWeek.SUNDAY)) + .toLocalDate() + .atTime(23, 59, 59) + .atZone(targetDate.getZone()); + ZonedDateTime rankingDate = weekEnd; + + // Job 파라미터 생성 + JobParameters jobParameters = new JobParametersBuilder() + .addString("periodType", "weekly") + .addString("rankingDate", rankingDate.toString()) + .addString("startDate", weekStart.toLocalDate().toString()) + .addString("endDate", weekEnd.toLocalDate().toString()) + .addLong("timestamp", System.currentTimeMillis()) + .toJobParameters(); + + // 배치 실행 + JobExecution jobExecution = jobLauncher.run(rankingJob, jobParameters); + + log.info("주간 랭킹 집계 배치 실행 완료: jobExecutionId={}, status={}, exitStatus={}", + jobExecution.getId(), + jobExecution.getStatus(), + jobExecution.getExitStatus()); + + return RankingAggregationInfo.from(jobExecution, "weekly"); + + } catch (Exception e) { + log.error("주간 랭킹 집계 배치 실행 실패: targetDate={}", targetDate, e); + throw new CoreException(ErrorType.INTERNAL_ERROR, + "주간 랭킹 집계 배치 실행에 실패했습니다: " + e.getMessage()); + } + } + + /** + * 월간 랭킹 집계 배치 실행 + * + * @param targetDate 집계 대상 날짜 (이 날짜가 속한 월간을 집계) + * @return JobExecution 실행 결과 + * @note Spring Batch는 자체 트랜잭션을 관리하므로 @Transactional을 사용하지 않습니다. + */ + public RankingAggregationInfo executeMonthlyRanking(ZonedDateTime targetDate) { + if (targetDate == null) { + throw new CoreException(ErrorType.BAD_REQUEST, "집계 대상 날짜는 필수입니다."); + } + + log.info("월간 랭킹 집계 배치 실행 시작: targetDate={}", targetDate); + + try { + // 월간 시작일/종료일 계산 + ZonedDateTime monthStart = targetDate.with(TemporalAdjusters.firstDayOfMonth()) + .toLocalDate() + .atStartOfDay(targetDate.getZone()); + ZonedDateTime monthEnd = targetDate.with(TemporalAdjusters.lastDayOfMonth()) + .toLocalDate() + .atTime(23, 59, 59) + .atZone(targetDate.getZone()); + ZonedDateTime rankingDate = monthEnd; + + // Job 파라미터 생성 + // monthly는 Reader에서 최근 30일로 자동 계산하므로 startDate/endDate 생략 + JobParameters jobParameters = new JobParametersBuilder() + .addString("periodType", "monthly") + .addString("rankingDate", rankingDate.toString()) + // startDate, endDate는 Reader에서 자동 계산 (최근 30일) + .addLong("timestamp", System.currentTimeMillis()) + .toJobParameters(); + + // 배치 실행 + JobExecution jobExecution = jobLauncher.run(rankingJob, jobParameters); + + log.info("월간 랭킹 집계 배치 실행 완료: jobExecutionId={}, status={}, exitStatus={}", + jobExecution.getId(), + jobExecution.getStatus(), + jobExecution.getExitStatus()); + + return RankingAggregationInfo.from(jobExecution, "monthly"); + + } catch (Exception e) { + log.error("월간 랭킹 집계 배치 실행 실패: targetDate={}", targetDate, e); + throw new CoreException(ErrorType.INTERNAL_ERROR, + "월간 랭킹 집계 배치 실행에 실패했습니다: " + e.getMessage()); + } + } + + /** + * 주간 및 월간 랭킹 집계 배치 실행 + * + * @param targetDate 집계 대상 날짜 + * @return 배치 실행 결과 + * @note Spring Batch는 자체 트랜잭션을 관리하므로 @Transactional을 사용하지 않습니다. + */ + public RankingAggregationInfo executeWeeklyAndMonthlyRanking(ZonedDateTime targetDate) { + if (targetDate == null) { + throw new CoreException(ErrorType.BAD_REQUEST, "집계 대상 날짜는 필수입니다."); + } + + log.info("주간 및 월간 랭킹 집계 배치 실행 시작: targetDate={}", targetDate); + + try { + // 주간 랭킹 집계 + RankingAggregationInfo weeklyResult = executeWeeklyRanking(targetDate); + + // 월간 랭킹 집계 + RankingAggregationInfo monthlyResult = executeMonthlyRanking(targetDate); + + log.info("주간 및 월간 랭킹 집계 배치 실행 완료"); + + return RankingAggregationInfo.combined(weeklyResult, monthlyResult); + + } catch (Exception e) { + log.error("주간 및 월간 랭킹 집계 배치 실행 실패: targetDate={}", targetDate, e); + throw new CoreException(ErrorType.INTERNAL_ERROR, + "주간 및 월간 랭킹 집계 배치 실행에 실패했습니다: " + e.getMessage()); + } + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/ProductMetricsDailyReaderFactory.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/ProductMetricsDailyReaderFactory.java new file mode 100644 index 000000000..7d25de0a8 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/ProductMetricsDailyReaderFactory.java @@ -0,0 +1,136 @@ +package com.loopers.application.ranking.batch; + +import com.loopers.domain.metrics.product.ProductMetricsDailyAggregated; +import com.loopers.domain.metrics.product.ProductMetricsDailyRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.AbstractPagingItemReader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.util.concurrent.CopyOnWriteArrayList; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductMetricsDailyReaderFactory { + private final ProductMetricsDailyRepository repository; + + @Bean + @StepScope + public AbstractPagingItemReader productMetricsDailyReader( + @Value("#{jobParameters['periodType'] ?: 'weekly'}") String periodType, + @Value("#{jobParameters['startDate'] ?: null}") String startDateStr, + @Value("#{jobParameters['endDate'] ?: null}") String endDateStr, + @Value("#{jobParameters['pageSize'] ?: 1000}") int pageSize + ) { + LocalDate startDate; + LocalDate endDate; + + // periodType에 따라 날짜 범위 자동 계산 + // monthly는 최근 30일, weekly는 최근 7일로 자동 계산 + endDate = LocalDate.now(); + if ("monthly".equals(periodType)) { + startDate = endDate.minusDays(30); // 최근 30일 + log.info("[ProductMetricsDailyReaderFactory] Auto-calculated date range for monthly: startDate={}, endDate={}", startDate, endDate); + } else { + // weekly: Job 파라미터에서 날짜가 제공된 경우 사용, 없으면 최근 7일 + if (startDateStr != null && endDateStr != null) { + startDate = LocalDate.parse(startDateStr); + endDate = LocalDate.parse(endDateStr); + log.info("[ProductMetricsDailyReaderFactory] Using provided date range for weekly: startDate={}, endDate={}", startDate, endDate); + } else { + startDate = endDate.minusDays(7); // 최근 7일 + log.info("[ProductMetricsDailyReaderFactory] Auto-calculated date range for weekly: startDate={}, endDate={}", startDate, endDate); + } + } + + // Reader 생성 및 설정 + ProductMetricsDailyReader reader = new ProductMetricsDailyReader(repository); + reader.setDateRange(startDate, endDate); + reader.setPageSize(pageSize); + reader.setSaveState(true); // 재시작을 위한 상태 저장 + reader.setName("productMetricsDailyReader"); + + return reader; + } + + @RequiredArgsConstructor + public static class ProductMetricsDailyReader extends AbstractPagingItemReader { + + private final ProductMetricsDailyRepository repository; + + private LocalDate startDate; + private LocalDate endDate; + + /** + * 날짜 범위 설정 + * StepScope Bean으로 생성될 때 Job 파라미터에서 주입받음 + */ + public void setDateRange(LocalDate startDate, LocalDate endDate) { + this.startDate = startDate; + this.endDate = endDate; + } + + /** + * 한 페이지의 데이터를 로드하여 results에 저장 + * AbstractPagingItemReader가 자동으로 호출 + */ + @Override + protected void doReadPage() { + if (startDate == null || endDate == null) { + throw new IllegalStateException("Date range must be set before reading. Use setDateRange() method."); + } + + // 현재 페이지 번호는 부모 클래스의 page 필드에서 가져옴 + Pageable pageable = PageRequest.of(getPage(), getPageSize()); + + // Repository에서 페이징 조회 + Page page = + repository.findAggregatedByDateBetweenPaged(startDate, endDate, pageable); + + // 결과를 results에 저장 (부모 클래스의 필드) + if (results == null) { + results = new CopyOnWriteArrayList<>(); + } else { + results.clear(); + } + + results.addAll(page.getContent()); + + log.info("Loaded page {}: {} items (total: {})", + getPage(), page.getContent().size(), page.getTotalElements()); + + // 더 이상 읽을 페이지가 없으면 종료 + if (!page.hasNext()) { + log.info("No more pages to read. Total items loaded: {}", page.getTotalElements()); + } + } + + /** + * Reader 초기화 + * Step 시작 시 호출됨 + */ + @Override + protected void doOpen() throws Exception { + super.doOpen(); + log.info("ProductMetricsDailyReader opened: startDate={}, endDate={}", startDate, endDate); + } + + /** + * Reader 종료 + * Step 종료 시 호출됨 + */ + @Override + protected void doClose() throws Exception { + super.doClose(); + log.info("ProductMetricsDailyReader closed"); + } + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingAggregationJobConfig.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingAggregationJobConfig.java new file mode 100644 index 000000000..f1bdf13d2 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingAggregationJobConfig.java @@ -0,0 +1,27 @@ +package com.loopers.application.ranking.batch; + +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class RankingAggregationJobConfig { + + private final JobRepository jobRepository; + private final Step rankingChunkStep; + private final JobExecutionListener rankingJobExecutionListener; + + @Bean + public Job rankingJob() { + return new JobBuilder("rankingJob", jobRepository) + .start(rankingChunkStep) + .listener(rankingJobExecutionListener) + .build(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJobExecutionListener.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJobExecutionListener.java new file mode 100644 index 000000000..c4edb670c --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJobExecutionListener.java @@ -0,0 +1,206 @@ +package com.loopers.application.ranking.batch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankMonthlyRepository; +import com.loopers.domain.ranking.MvProductRankWeekly; +import com.loopers.domain.ranking.MvProductRankWeeklyRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +@JobScope +@RequiredArgsConstructor +public class RankingJobExecutionListener implements JobExecutionListener { + + private final MvProductRankWeeklyRepository weeklyRepository; + private final MvProductRankMonthlyRepository monthlyRepository; + private final RankingJsonConverter jsonConverter; + + @Value("#{jobParameters['rankingDate']}") + private ZonedDateTime rankingDate; + + @Value("#{jobParameters['periodType'] ?: 'weekly'}") + private String periodType; + + @Override + public void beforeJob(JobExecution jobExecution) { + String periodName = "weekly".equals(periodType) ? "주간" : "월간"; + log.info("[RankingJobExecutionListener] beforeJob: {} 랭킹 작업 시작 - rankingDate={}, jobExecutionId={}, thread={}", + periodName, rankingDate, jobExecution.getId(), Thread.currentThread().getName()); + if (rankingDate == null) { + log.error("[RankingJobExecutionListener] beforeJob: rankingDate가 NULL입니다! Job parameters: {}", + jobExecution.getJobParameters().getParameters()); + } + if (!"weekly".equals(periodType) && !"monthly".equals(periodType)) { + log.error("[RankingJobExecutionListener] beforeJob: 잘못된 periodType={}, 'weekly' 또는 'monthly'여야 합니다", periodType); + } + } + + @Override + @Transactional + public void afterJob(JobExecution jobExecution) { + String periodName = "weekly".equals(periodType) ? "주간" : "월간"; + log.info("[RankingJobExecutionListener] afterJob: {} 랭킹 작업 완료. TOP 100 데이터베이스 저장 중", periodName); + log.info("[RankingJobExecutionListener] afterJob: 작업 실행 상세 - jobExecutionId={}, status={}, exitStatus={}, thread={}", + jobExecution.getId(), jobExecution.getStatus(), jobExecution.getExitStatus(), Thread.currentThread().getName()); + + try { + ExecutionContext executionContext = jobExecution.getExecutionContext(); + log.info("[RankingJobExecutionListener] afterJob: ExecutionContext 키: {}", + executionContext.entrySet().stream().map(Map.Entry::getKey).toList()); + + if ("weekly".equals(periodType)) { + processWeeklyRanking(executionContext, jobExecution); + } else if ("monthly".equals(periodType)) { + processMonthlyRanking(executionContext, jobExecution); + } else { + log.error("[RankingJobExecutionListener] afterJob: 잘못된 periodType={}, 'weekly' 또는 'monthly'여야 합니다", periodType); + jobExecution.addFailureException(new IllegalArgumentException("Invalid periodType: " + periodType)); + } + + } catch (Exception e) { + log.error("[RankingJobExecutionListener] afterJob: {} 랭킹 데이터베이스 저장 실패", periodName, e); + jobExecution.addFailureException(e); + } + } + + private void processWeeklyRanking(ExecutionContext executionContext, JobExecution jobExecution) throws JsonProcessingException { + Object jsonObj = executionContext.get("weeklyRankingTop100"); + Object countObj = executionContext.get("weeklyRankingCount"); + + log.info("[RankingJobExecutionListener] processWeeklyRanking: ExecutionContext에서 조회 - jsonObj={}, countObj={}", + jsonObj != null ? "not null (type: " + jsonObj.getClass().getName() + ")" : "null", + countObj != null ? "not null (value: " + countObj + ")" : "null"); + + if (jsonObj == null) { + log.error("[RankingJobExecutionListener] processWeeklyRanking: ExecutionContext에 주간 랭킹 데이터가 없습니다. 스텝이 실행되지 않았을 수 있습니다."); + log.error("[RankingJobExecutionListener] processWeeklyRanking: ExecutionContext에서 사용 가능한 키: {}", + executionContext.entrySet().stream().map(Map.Entry::getKey).toList()); + return; + } + + String json = jsonObj.toString(); + Integer count = countObj != null ? Integer.parseInt(countObj.toString()) : 0; + + log.info("[RankingJobExecutionListener] processWeeklyRanking: JSON 문자열 길이: {}, 건수: {}", json.length(), count); + + if (json.isEmpty()) { + log.error("[RankingJobExecutionListener] processWeeklyRanking: ExecutionContext의 주간 랭킹 데이터가 비어있습니다"); + return; + } + + log.info("[RankingJobExecutionListener] processWeeklyRanking: ExecutionContext에서 {}건 발견", count); + log.debug("[RankingJobExecutionListener] processWeeklyRanking: JSON 미리보기 (처음 200자): {}", + json.length() > 200 ? json.substring(0, 200) + "..." : json); + + log.info("[RankingJobExecutionListener] processWeeklyRanking: JSON을 List로 역직렬화 중..."); + List top100 = jsonConverter.fromJsonWeekly(json); + log.info("[RankingJobExecutionListener] processWeeklyRanking: 역직렬화 완료 - {}건", top100.size()); + + if (top100.isEmpty()) { + log.error("[RankingJobExecutionListener] processWeeklyRanking: 역직렬화된 리스트가 비어있습니다!"); + return; + } + + for (int i = 0; i < Math.min(3, top100.size()); i++) { + MvProductRankWeekly item = top100.get(i); + log.info("[RankingJobExecutionListener] processWeeklyRanking: 아이템[{}] - productId={}, productName={}, brandName={}, score={}, rankingDate={}", + i, item.getProductId(), item.getProductName(), item.getBrandName(), item.getScore(), item.getRankingDate()); + } + + log.info("[RankingJobExecutionListener] processWeeklyRanking: {}건에 순위 할당 중...", top100.size()); + for (int i = 0; i < top100.size(); i++) { + top100.get(i).setRanking(i + 1); + } + log.info("[RankingJobExecutionListener] processWeeklyRanking: 순위 할당 완료"); + + log.info("[RankingJobExecutionListener] processWeeklyRanking: 기존 주간 랭킹 데이터 삭제 중 - 날짜: {}", rankingDate); + weeklyRepository.deleteByRankingDate(rankingDate); + log.info("[RankingJobExecutionListener] processWeeklyRanking: 기존 주간 랭킹 데이터 삭제 완료 - 날짜: {}", rankingDate); + + log.info("[RankingJobExecutionListener] processWeeklyRanking: {}건 데이터베이스 저장 중...", top100.size()); + weeklyRepository.saveAll(top100); + log.info("[RankingJobExecutionListener] processWeeklyRanking: 주간 랭킹 {}건 데이터베이스 저장 완료", top100.size()); + + List saved = weeklyRepository.findTop100ByRankingDateOrderByRankingAsc(rankingDate); + log.info("[RankingJobExecutionListener] processWeeklyRanking: 검증 - rankingDate={}에 대해 데이터베이스에서 {}건 발견", + rankingDate, saved.size()); + } + + private void processMonthlyRanking(ExecutionContext executionContext, JobExecution jobExecution) throws JsonProcessingException { + Object jsonObj = executionContext.get("monthlyRankingTop100"); + Object countObj = executionContext.get("monthlyRankingCount"); + + log.info("[RankingJobExecutionListener] processMonthlyRanking: ExecutionContext에서 조회 - jsonObj={}, countObj={}", + jsonObj != null ? "not null (type: " + jsonObj.getClass().getName() + ")" : "null", + countObj != null ? "not null (value: " + countObj + ")" : "null"); + + if (jsonObj == null) { + log.error("[RankingJobExecutionListener] processMonthlyRanking: ExecutionContext에 월간 랭킹 데이터가 없습니다. 스텝이 실행되지 않았을 수 있습니다."); + log.error("[RankingJobExecutionListener] processMonthlyRanking: ExecutionContext에서 사용 가능한 키: {}", + executionContext.entrySet().stream().map(Map.Entry::getKey).toList()); + return; + } + + String json = jsonObj.toString(); + Integer count = countObj != null ? Integer.parseInt(countObj.toString()) : 0; + + log.info("[RankingJobExecutionListener] processMonthlyRanking: JSON 문자열 길이: {}, 건수: {}", json.length(), count); + + if (json.isEmpty()) { + log.error("[RankingJobExecutionListener] processMonthlyRanking: ExecutionContext의 월간 랭킹 데이터가 비어있습니다"); + return; + } + + log.info("[RankingJobExecutionListener] processMonthlyRanking: ExecutionContext에서 {}건 발견", count); + log.debug("[RankingJobExecutionListener] processMonthlyRanking: JSON 미리보기 (처음 200자): {}", + json.length() > 200 ? json.substring(0, 200) + "..." : json); + + log.info("[RankingJobExecutionListener] processMonthlyRanking: JSON을 List로 역직렬화 중..."); + List top100 = jsonConverter.fromJsonMonthly(json); + log.info("[RankingJobExecutionListener] processMonthlyRanking: 역직렬화 완료 - {}건", top100.size()); + + if (top100.isEmpty()) { + log.error("[RankingJobExecutionListener] processMonthlyRanking: 역직렬화된 리스트가 비어있습니다!"); + return; + } + + for (int i = 0; i < Math.min(3, top100.size()); i++) { + MvProductRankMonthly item = top100.get(i); + log.info("[RankingJobExecutionListener] processMonthlyRanking: 아이템[{}] - productId={}, productName={}, brandName={}, score={}, rankingDate={}", + i, item.getProductId(), item.getProductName(), item.getBrandName(), item.getScore(), item.getRankingDate()); + } + + log.info("[RankingJobExecutionListener] processMonthlyRanking: {}건에 순위 할당 중...", top100.size()); + for (int i = 0; i < top100.size(); i++) { + top100.get(i).setRanking(i + 1); + } + log.info("[RankingJobExecutionListener] processMonthlyRanking: 순위 할당 완료"); + + log.info("[RankingJobExecutionListener] processMonthlyRanking: 기존 월간 랭킹 데이터 삭제 중 - 날짜: {}", rankingDate); + monthlyRepository.deleteByRankingDate(rankingDate); + log.info("[RankingJobExecutionListener] processMonthlyRanking: 기존 월간 랭킹 데이터 삭제 완료 - 날짜: {}", rankingDate); + + log.info("[RankingJobExecutionListener] processMonthlyRanking: {}건 데이터베이스 저장 중...", top100.size()); + monthlyRepository.saveAll(top100); + log.info("[RankingJobExecutionListener] processMonthlyRanking: 월간 랭킹 {}건 데이터베이스 저장 완료", top100.size()); + + List saved = monthlyRepository.findTop100ByRankingDateOrderByRankingAsc(rankingDate); + log.info("[RankingJobExecutionListener] processMonthlyRanking: 검증 - rankingDate={}에 대해 데이터베이스에서 {}건 발견", + rankingDate, saved.size()); + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJsonConverter.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJsonConverter.java new file mode 100644 index 000000000..3c6ea8722 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingJsonConverter.java @@ -0,0 +1,41 @@ +package com.loopers.application.ranking.batch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankWeekly; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +@RequiredArgsConstructor +public class RankingJsonConverter { + + private final ObjectMapper objectMapper; + + public String toJson(List items) throws JsonProcessingException { + return objectMapper.writeValueAsString(items); + } + + public List fromJsonWeekly(String json) throws JsonProcessingException { + return objectMapper.readValue( + json, + objectMapper.getTypeFactory().constructCollectionType( + List.class, + MvProductRankWeekly.class + ) + ); + } + + public List fromJsonMonthly(String json) throws JsonProcessingException { + return objectMapper.readValue( + json, + objectMapper.getTypeFactory().constructCollectionType( + List.class, + MvProductRankMonthly.class + ) + ); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingProcessor.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingProcessor.java new file mode 100644 index 000000000..c925a9560 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingProcessor.java @@ -0,0 +1,113 @@ +package com.loopers.application.ranking.batch; + +import com.loopers.domain.metrics.product.ProductMetricsDailyAggregated; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterChunk; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +@Slf4j +@Component +@StepScope +@RequiredArgsConstructor +public class RankingProcessor implements ItemProcessor { + + private static final ThreadLocal> top100Queue = + ThreadLocal.withInitial(() -> new PriorityQueue<>( + 100, + Comparator.comparing(ProductMetricsDailyAggregated::calculateScore) + )); + + private StepExecution stepExecution; + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + this.stepExecution = stepExecution; + top100Queue.remove(); + top100Queue.set(new PriorityQueue<>( + 100, + Comparator.comparing(ProductMetricsDailyAggregated::calculateScore) + )); + log.info("[RankingProcessor] beforeStep: PriorityQueue 초기화 완료, stepName={}, thread={}", + stepExecution.getStepName(), Thread.currentThread().getName()); + } + + @Override + public ProductMetricsDailyAggregated process(ProductMetricsDailyAggregated item) throws Exception { + if (item == null) { + log.warn("[RankingProcessor] process: null 아이템 수신"); + return null; + } + + log.info("[RankingProcessor] process: 아이템 처리 중 - productId={}, score={}, likeCount={}, viewCount={}, soldCount={}", + item.getProductId(), item.calculateScore(), item.getTotalLikeCount(), item.getTotalViewCount(), item.getTotalSoldCount()); + + PriorityQueue queue = top100Queue.get(); + if (queue == null) { + log.error("[RankingProcessor] process: PriorityQueue가 null입니다! 초기화 중..."); + queue = new PriorityQueue<>(100, Comparator.comparing(ProductMetricsDailyAggregated::calculateScore)); + top100Queue.set(queue); + } + + synchronized (queue) { + if (queue.size() < 100) { + queue.offer(item); + log.info("[RankingProcessor] process: 큐에 아이템 추가 - productId={}, score={}, queueSize={}", + item.getProductId(), item.calculateScore(), queue.size()); + } else { + ProductMetricsDailyAggregated min = queue.peek(); + if (min != null && item.calculateScore() > min.calculateScore()) { + ProductMetricsDailyAggregated removed = queue.poll(); + queue.offer(item); + log.info("[RankingProcessor] process: 큐에서 아이템 교체 - 제거된 productId={} (score={}), 추가된 productId={} (score={}), queueSize={}", + removed.getProductId(), removed.calculateScore(), item.getProductId(), item.calculateScore(), queue.size()); + } else { + log.debug("[RankingProcessor] process: 아이템 거부됨 - productId={}, score={}, minScore={}", + item.getProductId(), item.calculateScore(), min != null ? min.calculateScore() : "null"); + } + } + } + + return null; + } + + @AfterChunk + public void afterChunk(ChunkContext chunkContext) { + PriorityQueue queue = top100Queue.get(); + if (queue == null) { + log.warn("[RankingProcessor] afterChunk: PriorityQueue가 null입니다!"); + } else if (queue.isEmpty()) { + log.warn("[RankingProcessor] afterChunk: PriorityQueue가 비어있습니다!"); + } else { + log.info("[RankingProcessor] afterChunk: 청크 처리 완료 - queue size={}, thread={}", + queue.size(), Thread.currentThread().getName()); + ProductMetricsDailyAggregated min = queue.peek(); + List items = new ArrayList<>(queue); + items.sort(Comparator.comparingDouble(ProductMetricsDailyAggregated::calculateScore).reversed()); + if (!items.isEmpty()) { + log.info("[RankingProcessor] afterChunk: 큐 통계 - minScore={}, maxScore={}, top3ProductIds={}", + min != null ? min.calculateScore() : "null", + items.get(0).calculateScore(), + items.stream().limit(3).map(ProductMetricsDailyAggregated::getProductId).toList()); + } + } + } + + public static PriorityQueue getTop100Queue() { + return top100Queue.get(); + } + + public static void clearTop100Queue() { + top100Queue.remove(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepConfig.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepConfig.java new file mode 100644 index 000000000..a363c4136 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepConfig.java @@ -0,0 +1,39 @@ +package com.loopers.application.ranking.batch; + +import com.loopers.domain.metrics.product.ProductMetricsDailyAggregated; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemReader; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +@Slf4j +@Configuration +@RequiredArgsConstructor +public class RankingStepConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final ItemReader productMetricsDailyReader; + private final RankingProcessor rankingProcessor; + private final RankingStepExecutionListener rankingStepExecutionListener; + + @Bean + public Step rankingChunkStep() { + return new StepBuilder("rankingChunkStep", jobRepository) + .chunk(1000, transactionManager) + .reader(productMetricsDailyReader) + .processor(rankingProcessor) + .writer(chunk -> { + if (!chunk.isEmpty()) { + log.debug("Writing ranking chunk of size: {}", chunk.size()); + } + }) + .listener(rankingStepExecutionListener) + .build(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepExecutionListener.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepExecutionListener.java new file mode 100644 index 000000000..ab1584ed0 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/batch/RankingStepExecutionListener.java @@ -0,0 +1,236 @@ +package com.loopers.application.ranking.batch; + +import com.loopers.domain.brand.Brand; +import com.loopers.domain.brand.BrandService; +import com.loopers.domain.metrics.product.ProductMetricsDailyAggregated; +import com.loopers.domain.product.Product; +import com.loopers.domain.product.ProductService; +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankWeekly; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.ZonedDateTime; +import java.util.*; + +@Slf4j +@Component +@StepScope +@RequiredArgsConstructor +public class RankingStepExecutionListener implements StepExecutionListener { + + private final ProductService productService; + private final BrandService brandService; + private final RankingJsonConverter jsonConverter; + + @Value("#{jobParameters['rankingDate']}") + private ZonedDateTime rankingDate; + + @Value("#{jobParameters['periodType'] ?: 'weekly'}") + private String periodType; // "weekly" or "monthly" + + @Override + public void beforeStep(StepExecution stepExecution) { + String periodName = "weekly".equals(periodType) ? "주간" : "월간"; + log.info("[RankingStepExecutionListener] beforeStep: {} 랭킹 스텝 시작 - rankingDate={}, stepName={}, thread={}", + periodName, rankingDate, stepExecution.getStepName(), Thread.currentThread().getName()); + if (rankingDate == null) { + log.error("[RankingStepExecutionListener] beforeStep: rankingDate가 NULL입니다! Job parameters: {}", + stepExecution.getJobExecution().getJobParameters().getParameters()); + } + if (!"weekly".equals(periodType) && !"monthly".equals(periodType)) { + log.error("[RankingStepExecutionListener] beforeStep: 잘못된 periodType={}, 'weekly' 또는 'monthly'여야 합니다", periodType); + } + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + String periodName = "weekly".equals(periodType) ? "주간" : "월간"; + log.info("[RankingStepExecutionListener] afterStep: {} 랭킹 스텝 완료. PriorityQueue에서 TOP 100 추출 중", periodName); + log.info("[RankingStepExecutionListener] afterStep: 스텝 실행 상세 - stepName={}, readCount={}, writeCount={}, commitCount={}, thread={}", + stepExecution.getStepName(), stepExecution.getReadCount(), stepExecution.getWriteCount(), + stepExecution.getCommitCount(), Thread.currentThread().getName()); + + try { + log.info("[RankingStepExecutionListener] afterStep: RankingProcessor에서 PriorityQueue 가져오는 중..."); + PriorityQueue queue = RankingProcessor.getTop100Queue(); + + if (queue == null) { + log.error("[RankingStepExecutionListener] afterStep: PriorityQueue가 null입니다!"); + return stepExecution.getExitStatus(); + } + + if (queue.isEmpty()) { + log.warn("[RankingStepExecutionListener] afterStep: PriorityQueue가 비어있습니다!"); + return stepExecution.getExitStatus(); + } + + log.info("[RankingStepExecutionListener] afterStep: PriorityQueue 가져오기 성공 - size={}", queue.size()); + List allItems = new ArrayList<>(queue); + log.info("[RankingStepExecutionListener] afterStep: PriorityQueue에서 총 {}건 조회", allItems.size()); + + for (int i = 0; i < Math.min(5, allItems.size()); i++) { + ProductMetricsDailyAggregated item = allItems.get(i); + log.info("[RankingStepExecutionListener] afterStep: 아이템[{}] - productId={}, score={}, likeCount={}, viewCount={}, soldCount={}", + i, item.getProductId(), item.calculateScore(), item.getTotalLikeCount(), + item.getTotalViewCount(), item.getTotalSoldCount()); + } + + log.info("[RankingStepExecutionListener] afterStep: 점수 기준 내림차순 정렬 중..."); + allItems.sort(Comparator.comparingDouble(ProductMetricsDailyAggregated::calculateScore).reversed()); + log.info("[RankingStepExecutionListener] afterStep: 정렬 완료. 상위 3개 점수: {}", + allItems.stream().limit(3).map(item -> item.calculateScore()).toList()); + List top100 = allItems.stream() + .limit(100) + .toList(); + + log.info("[RankingStepExecutionListener] afterStep: TOP 100 선택 완료 (실제 건수: {})", top100.size()); + if ("weekly".equals(periodType)) { + log.info("[RankingStepExecutionListener] afterStep: {}건에 대해 MvProductRankWeekly 객체 생성 중...", top100.size()); + List weeklyRankings = createWeeklyRankings(top100); + String json = jsonConverter.toJson(weeklyRankings); + log.info("[RankingStepExecutionListener] afterStep: JSON 변환 완료 - 길이={} 문자", json.length()); + + ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext(); + jobExecutionContext.put("weeklyRankingTop100", json); + jobExecutionContext.put("weeklyRankingCount", weeklyRankings.size()); + + log.info("[RankingStepExecutionListener] afterStep: 주간 집계 {}건 저장됨 to JobExecution ExecutionContext", weeklyRankings.size()); + + } else if ("monthly".equals(periodType)) { + log.info("[RankingStepExecutionListener] afterStep: {}건에 대해 MvProductRankMonthly 객체 생성 중...", top100.size()); + List monthlyRankings = createMonthlyRankings(top100); + String json = jsonConverter.toJson(monthlyRankings); + log.info("[RankingStepExecutionListener] afterStep: JSON 변환 완료 - 길이={} 문자", json.length()); + + ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext(); + jobExecutionContext.put("monthlyRankingTop100", json); + jobExecutionContext.put("monthlyRankingCount", monthlyRankings.size()); + + log.info("[RankingStepExecutionListener] afterStep: 월간 집계 {}건 저장됨 to JobExecution ExecutionContext", monthlyRankings.size()); + + } else { + log.error("[RankingStepExecutionListener] afterStep: 잘못된 periodType={}, 'weekly' 또는 'monthly'여야 합니다", periodType); + stepExecution.addFailureException(new IllegalArgumentException("Invalid periodType: " + periodType)); + return ExitStatus.FAILED; + } + + } catch (Exception e) { + log.error("[RankingStepExecutionListener] afterStep: {} 랭킹 처리 실패", periodName, e); + stepExecution.addFailureException(e); + return ExitStatus.FAILED; + } finally { + RankingProcessor.clearTop100Queue(); + log.info("[RankingStepExecutionListener] afterStep: RankingProcessor의 PriorityQueue 초기화 완료"); + } + + return stepExecution.getExitStatus(); + } + + private List createWeeklyRankings(List top100) { + int successCount = 0; + int failureCount = 0; + List weeklyRankings = new ArrayList<>(); + + for (int i = 0; i < top100.size(); i++) { + ProductMetricsDailyAggregated aggregated = top100.get(i); + try { + log.debug("[RankingStepExecutionListener] afterStep: 아이템[{}] 처리 중 - productId={}", i, aggregated.getProductId()); + Product product = productService.getProductById(aggregated.getProductId()) + .orElseThrow(() -> new RuntimeException("Product not found: " + aggregated.getProductId())); + Brand brand = brandService.getBrandById(product.getBrandId()); + + MvProductRankWeekly weekly = new MvProductRankWeekly(); + weekly.setProductId(aggregated.getProductId()); + weekly.setProductName(product.getName()); + weekly.setBrandId(product.getBrandId()); + weekly.setBrandName(brand.getName()); + weekly.setScore(aggregated.calculateScore()); + weekly.setLikeCount(aggregated.getTotalLikeCount().intValue()); + weekly.setViewCount(aggregated.getTotalViewCount().intValue()); + weekly.setOrderCount(aggregated.getTotalSoldCount().intValue()); + weekly.setRankingDate(rankingDate); + + if (weekly.getRankingDate() == null) { + log.error("[RankingStepExecutionListener] afterStep: 아이템[{}]의 rankingDate가 NULL입니다 - productId={}", + i, weekly.getProductId()); + } + + weeklyRankings.add(weekly); + successCount++; + + if (i < 3) { + log.info("[RankingStepExecutionListener] afterStep: MvProductRankWeekly[{}] 생성 완료 - productId={}, productName={}, brandName={}, score={}, rankingDate={}", + i, weekly.getProductId(), weekly.getProductName(), weekly.getBrandName(), weekly.getScore(), weekly.getRankingDate()); + } + + } catch (Exception e) { + failureCount++; + log.error("[RankingStepExecutionListener] afterStep: productId={}에 대한 MvProductRankWeekly 생성 실패", + aggregated.getProductId(), e); + } + } + + log.info("[RankingStepExecutionListener] afterStep: MvProductRankWeekly 생성 완료 - 성공: {}, 실패: {}, 총: {}", + successCount, failureCount, weeklyRankings.size()); + + return weeklyRankings; + } + + private List createMonthlyRankings(List top100) { + int successCount = 0; + int failureCount = 0; + List monthlyRankings = new ArrayList<>(); + + for (int i = 0; i < top100.size(); i++) { + ProductMetricsDailyAggregated aggregated = top100.get(i); + try { + log.debug("[RankingStepExecutionListener] afterStep: 아이템[{}] 처리 중 - productId={}", i, aggregated.getProductId()); + Product product = productService.getProductById(aggregated.getProductId()) + .orElseThrow(() -> new RuntimeException("Product not found: " + aggregated.getProductId())); + Brand brand = brandService.getBrandById(product.getBrandId()); + + MvProductRankMonthly monthly = new MvProductRankMonthly(); + monthly.setProductId(aggregated.getProductId()); + monthly.setProductName(product.getName()); + monthly.setBrandId(product.getBrandId()); + monthly.setBrandName(brand.getName()); + monthly.setScore(aggregated.calculateScore()); + monthly.setLikeCount(aggregated.getTotalLikeCount().intValue()); + monthly.setViewCount(aggregated.getTotalViewCount().intValue()); + monthly.setOrderCount(aggregated.getTotalSoldCount().intValue()); + monthly.setRankingDate(rankingDate); + + if (monthly.getRankingDate() == null) { + log.error("[RankingStepExecutionListener] afterStep: 아이템[{}]의 rankingDate가 NULL입니다 - productId={}", + i, monthly.getProductId()); + } + + monthlyRankings.add(monthly); + successCount++; + + if (i < 3) { + log.info("[RankingStepExecutionListener] afterStep: MvProductRankMonthly[{}] 생성 완료 - productId={}, productName={}, brandName={}, score={}, rankingDate={}", + i, monthly.getProductId(), monthly.getProductName(), monthly.getBrandName(), monthly.getScore(), monthly.getRankingDate()); + } + + } catch (Exception e) { + failureCount++; + log.error("[RankingStepExecutionListener] afterStep: productId={}에 대한 MvProductRankMonthly 생성 실패", + aggregated.getProductId(), e); + } + } + + log.info("[RankingStepExecutionListener] afterStep: MvProductRankMonthly 생성 완료 - 성공: {}, 실패: {}, 총: {}", + successCount, failureCount, monthlyRankings.size()); + + return monthlyRankings; + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/batch/BatchConfig.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/batch/BatchConfig.java new file mode 100644 index 000000000..3ea7d349c --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/batch/BatchConfig.java @@ -0,0 +1,52 @@ +package com.loopers.infrastructure.batch; + +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.support.DatabaseType; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; + +@Configuration +@EnableBatchProcessing +@RequiredArgsConstructor +public class BatchConfig { + private final DataSource dataSource; + private final PlatformTransactionManager transactionManager; + + @Bean + public JobRepository jobRepository() throws Exception { + JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTransactionManager(transactionManager); + factoryBean.setDatabaseType(DatabaseType.MYSQL.getProductName()); + factoryBean.setTablePrefix("BATCH_"); + factoryBean.afterPropertiesSet(); + return factoryBean.getObject(); + } + + @Bean + public JobLauncher jobLauncher(JobRepository jobRepository) { + TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher(); + jobLauncher.setJobRepository(jobRepository); + return jobLauncher; + } + + @Bean + public JobExplorer jobExplorer() throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setDataSource(dataSource); + factoryBean.setTablePrefix("BATCH_"); + factoryBean.setTransactionManager(transactionManager); + factoryBean.afterPropertiesSet(); + return factoryBean.getObject(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1ApiSpec.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1ApiSpec.java new file mode 100644 index 000000000..7c4cfef6e --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1ApiSpec.java @@ -0,0 +1,44 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.interfaces.api.ApiResponse; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "랭킹 배치 API 스펙") +public interface RankingBatchV1ApiSpec { + + @Operation( + method = "POST", + summary = "주간 랭킹 집계 배치 실행", + description = "지정한 날짜가 속한 주간의 랭킹을 집계합니다." + ) + ApiResponse executeWeeklyRanking( + @Parameter(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + @Schema(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + String targetDate + ); + + @Operation( + method = "POST", + summary = "월간 랭킹 집계 배치 실행", + description = "지정한 날짜가 속한 월간의 랭킹을 집계합니다." + ) + ApiResponse executeMonthlyRanking( + @Parameter(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + @Schema(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + String targetDate + ); + + @Operation( + method = "POST", + summary = "주간 및 월간 랭킹 집계 배치 실행", + description = "지정한 날짜가 속한 주간과 월간의 랭킹을 모두 집계합니다." + ) + ApiResponse executeWeeklyAndMonthlyRanking( + @Parameter(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + @Schema(description = "집계 대상 날짜 (yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식, 미지정 시 현재 시간)", example = "2024-12-19") + String targetDate + ); +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Controller.java new file mode 100644 index 000000000..e4bd6affa --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Controller.java @@ -0,0 +1,82 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.application.ranking.RankingAggregationInfo; +import com.loopers.application.ranking.RankingAggregationService; +import com.loopers.interfaces.api.ApiResponse; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +@RequiredArgsConstructor +@RestController +@RequestMapping("/api/v1/rankings/batch") +public class RankingBatchV1Controller implements RankingBatchV1ApiSpec { + + private final RankingAggregationService rankingAggregationService; + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ISO_ZONED_DATE_TIME; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + @PostMapping("/weekly") + @Override + public ApiResponse executeWeeklyRanking( + @RequestParam(value = "targetDate", required = false) String targetDate + ) { + ZonedDateTime date = parseDateTime(targetDate); + RankingAggregationInfo result = rankingAggregationService.executeWeeklyRanking(date); + return ApiResponse.success(RankingBatchV1Dto.BatchExecutionResponse.from(result)); + } + + @PostMapping("/monthly") + @Override + public ApiResponse executeMonthlyRanking( + @RequestParam(value = "targetDate", required = false) String targetDate + ) { + ZonedDateTime date = parseDateTime(targetDate); + RankingAggregationInfo result = rankingAggregationService.executeMonthlyRanking(date); + return ApiResponse.success(RankingBatchV1Dto.BatchExecutionResponse.from(result)); + } + + @PostMapping("/weekly-and-monthly") + @Override + public ApiResponse executeWeeklyAndMonthlyRanking( + @RequestParam(value = "targetDate", required = false) String targetDate + ) { + ZonedDateTime date = parseDateTime(targetDate); + RankingAggregationInfo result = rankingAggregationService.executeWeeklyAndMonthlyRanking(date); + return ApiResponse.success(RankingBatchV1Dto.BatchExecutionResponse.from(result)); + } + + /** + * 날짜 문자열을 ZonedDateTime으로 파싱 + * 미지정 시 현재 시간 반환 + * yyyy-MM-dd 형식 또는 ISO_ZONED_DATE_TIME 형식 지원 + */ + private ZonedDateTime parseDateTime(String dateStr) { + if (dateStr == null || dateStr.isEmpty()) { + return ZonedDateTime.now(); + } + + try { + // ISO_ZONED_DATE_TIME 형식 시도 + return ZonedDateTime.parse(dateStr, DATE_TIME_FORMATTER); + } catch (DateTimeParseException e1) { + try { + // yyyy-MM-dd 형식 시도 (자정으로 변환) + return java.time.LocalDate.parse(dateStr, DATE_FORMATTER) + .atStartOfDay(java.time.ZoneId.systemDefault()); + } catch (DateTimeParseException e2) { + throw new IllegalArgumentException( + "날짜 형식이 올바르지 않습니다. yyyy-MM-dd 또는 ISO_ZONED_DATE_TIME 형식으로 입력해주세요. 입력값: " + dateStr + ); + } + } + } +} + diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Dto.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Dto.java new file mode 100644 index 000000000..d9c2fe646 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingBatchV1Dto.java @@ -0,0 +1,41 @@ +package com.loopers.interfaces.api.ranking; + +import com.loopers.application.ranking.RankingAggregationInfo; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * 랭킹 배치 API DTO + */ +public class RankingBatchV1Dto { + + /** + * 배치 실행 응답 + */ + @Schema(description = "배치 실행 응답") + public record BatchExecutionResponse( + @Schema(description = "Job Execution ID") + Long jobExecutionId, + @Schema(description = "Job 이름") + String jobName, + @Schema(description = "배치 실행 상태 (COMPLETED, FAILED 등)") + String status, + @Schema(description = "종료 상태") + String exitStatus, + @Schema(description = "집계 기간 (weekly, monthly, both)") + String period, + @Schema(description = "메시지") + String message + ) { + public static BatchExecutionResponse from(RankingAggregationInfo info) { + return new BatchExecutionResponse( + info.getJobExecutionId(), + info.getJobName(), + info.getStatus(), + info.getExitStatus(), + info.getPeriod(), + info.getMessage() + ); + } + } +} + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetrics.java b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetrics.java index 603f19dae..87ebba615 100644 --- a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetrics.java +++ b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetrics.java @@ -12,6 +12,7 @@ @Table( name = "tb_product_metrics", indexes = { + @Index(name = "idx_product_metrics_product_id", columnList = "product_id"), @Index(name = "idx_product_metrics_like_count", columnList = "like_count"), @Index(name = "idx_product_metrics_brand_id", columnList = "brand_id"), @Index(name = "idx_product_metrics_brand_like_count", columnList = "brand_id,like_count") diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDaily.java b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDaily.java new file mode 100644 index 000000000..4afd0cb06 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDaily.java @@ -0,0 +1,96 @@ +package com.loopers.domain.metrics.product; + +import com.loopers.domain.BaseEntity; +import com.loopers.support.error.CoreException; +import com.loopers.support.error.ErrorType; +import jakarta.persistence.*; +import lombok.Getter; + +import java.time.LocalDate; + +@Entity +@Table( + name = "tb_product_metrics_daily", + indexes = { + @Index(name = "idx_product_metrics_daily_product_id", columnList = "product_id"), + @Index(name = "idx_product_metrics_daily_date", columnList = "date"), + @Index(name = "idx_product_metrics_daily_product_date", columnList = "product_id,date") + }, + uniqueConstraints = { + @UniqueConstraint( + name = "uk_product_metrics_daily_product_date", + columnNames = {"product_id", "date"} + ) + } +) +@Getter +public class ProductMetricsDaily extends BaseEntity { + + @Column(name = "product_id", nullable = false) + private Long productId; + + @Column(name = "date", nullable = false) + private LocalDate date; // yyyy-MM-dd 형식 + + @Column(name = "like_count", nullable = false) + private Integer likeCount; + + @Column(name = "view_count", nullable = false) + private Long viewCount; + + @Column(name = "sold_count", nullable = false) + private Long soldCount; + + protected ProductMetricsDaily() { + } + + public static ProductMetricsDaily create( + Long productId, + LocalDate date + ) { + if (productId == null || productId <= 0) { + throw new CoreException(ErrorType.BAD_REQUEST, "상품 ID는 1 이상이어야 합니다."); + } + if (date == null) { + throw new CoreException(ErrorType.BAD_REQUEST, "날짜는 필수입니다."); + } + + ProductMetricsDaily daily = new ProductMetricsDaily(); + daily.productId = productId; + daily.date = date; + daily.likeCount = 0; + daily.viewCount = 0L; + daily.soldCount = 0L; + return daily; + } + + public void incrementLikeCount() { + this.likeCount += 1; + } + + public void decrementLikeCount() { + if (this.likeCount <= 0) { + throw new CoreException(ErrorType.BAD_REQUEST, "좋아요 수는 0 미만으로 내려갈 수 없습니다."); + } + this.likeCount -= 1; + } + + public void incrementViewCount() { + if (this.viewCount == null) { + this.viewCount = 0L; + } + this.viewCount += 1; + } + + public void incrementSoldCount(Long quantity) { + if (this.soldCount == null) { + this.soldCount = 0L; + } + if (quantity == null || quantity <= 0) { + throw new CoreException(ErrorType.BAD_REQUEST, "판매 수량은 1 이상이어야 합니다."); + } + this.soldCount += quantity; + } +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyAggregated.java b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyAggregated.java new file mode 100644 index 000000000..6edde7cb0 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyAggregated.java @@ -0,0 +1,23 @@ +package com.loopers.domain.metrics.product; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 일별 메트릭 집계 결과 DTO + * 배치에서 사용하기 위한 집계 데이터 + */ +@Getter +@AllArgsConstructor +public class ProductMetricsDailyAggregated { + private Long productId; + private Long totalLikeCount; + private Long totalViewCount; + private Long totalSoldCount; + + public double calculateScore() { + return totalViewCount * 0.1 + totalLikeCount * 0.2 + totalSoldCount * 0.7; + } +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyRepository.java b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyRepository.java new file mode 100644 index 000000000..e7e4ff142 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyRepository.java @@ -0,0 +1,55 @@ +package com.loopers.domain.metrics.product; + +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; + +import java.time.LocalDate; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +public interface ProductMetricsDailyRepository { + /** + * 특정 상품의 특정 날짜 메트릭 조회 + */ + Optional findByProductIdAndDate(Long productId, LocalDate date); + + /** + * 특정 상품의 특정 날짜 메트릭 조회 (락) + */ + Optional findByProductIdAndDateForUpdate(Long productId, LocalDate date); + + /** + * 특정 날짜 범위의 메트릭 조회 (배치용) + */ + List findByDateBetween(LocalDate startDate, LocalDate endDate); + + /** + * 특정 날짜 범위의 메트릭을 product_id별로 집계하여 조회 (배치용) + */ + List findAggregatedByDateBetween( + LocalDate startDate, + LocalDate endDate + ); + + /** + * 특정 날짜 범위의 메트릭을 product_id별로 집계하여 페이징 조회 (배치용) + */ + Page findAggregatedByDateBetweenPaged( + LocalDate startDate, + LocalDate endDate, + Pageable pageable + ); + + /** + * 메트릭 저장 + */ + ProductMetricsDaily save(ProductMetricsDaily daily); + + /** + * 메트릭 일괄 저장 + */ + List saveAll(Collection dailies); +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyService.java b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyService.java new file mode 100644 index 000000000..77b3e2beb --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/metrics/product/ProductMetricsDailyService.java @@ -0,0 +1,96 @@ +package com.loopers.domain.metrics.product; + +import com.loopers.support.error.CoreException; +import com.loopers.support.error.ErrorType; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDate; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProductMetricsDailyService { + + private final ProductMetricsDailyRepository repository; + + /** + * 일별 메트릭 조회 또는 생성 + * + * @param productId 상품 ID + * @param brandId 브랜드 ID + * @param date 날짜 + * @return 일별 메트릭 (없으면 생성) + */ + @Transactional + public ProductMetricsDaily getOrCreate(Long productId, Long brandId, LocalDate date) { + return repository.findByProductIdAndDate(productId, date) + .orElseGet(() -> { + ProductMetricsDaily daily = ProductMetricsDaily.create(productId, date); + return repository.save(daily); + }); + } + + /** + * 좋아요 수 증가 (락 사용) + */ + @Transactional + public void incrementLikeCount(Long productId, LocalDate date) { + ProductMetricsDaily daily = repository.findByProductIdAndDateForUpdate(productId, date) + .orElseGet(() -> { + ProductMetricsDaily newDaily = ProductMetricsDaily.create(productId, date); + return repository.save(newDaily); + }); + daily.incrementLikeCount(); + repository.save(daily); + log.debug("Incremented daily like count: productId={}, date={}", productId, date); + } + + /** + * 좋아요 수 감소 (락 사용) + */ + @Transactional + public void decrementLikeCount(Long productId, LocalDate date) { + ProductMetricsDaily daily = repository.findByProductIdAndDateForUpdate(productId, date) + .orElseThrow(() -> new CoreException(ErrorType.NOT_FOUND, + "해당 상품의 일별 메트릭 정보를 찾을 수 없습니다.")); + daily.decrementLikeCount(); + repository.save(daily); + log.debug("Decremented daily like count: productId={}, date={}", productId, date); + } + + /** + * 조회 수 증가 (락 사용) + */ + @Transactional + public void incrementViewCount(Long productId, LocalDate date) { + ProductMetricsDaily daily = repository.findByProductIdAndDateForUpdate(productId, date) + .orElseGet(() -> { + ProductMetricsDaily newDaily = ProductMetricsDaily.create(productId, date); + return repository.save(newDaily); + }); + daily.incrementViewCount(); + repository.save(daily); + log.debug("Incremented daily view count: productId={}, date={}", productId, date); + } + + /** + * 판매 수 증가 (락 사용) + */ + @Transactional + public void incrementSoldCount(Long productId, LocalDate date, Long quantity) { + ProductMetricsDaily daily = repository.findByProductIdAndDateForUpdate(productId, date) + .orElseGet(() -> { + ProductMetricsDaily newDaily = ProductMetricsDaily.create(productId, date); + return repository.save(newDaily); + }); + daily.incrementSoldCount(quantity); + repository.save(daily); + log.debug("Incremented daily sold count: productId={}, date={}, quantity={}", + productId, date, quantity); + } +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java new file mode 100644 index 000000000..ca30f4051 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java @@ -0,0 +1,26 @@ +package com.loopers.domain.ranking; + +import com.loopers.domain.BaseEntity; +import jakarta.persistence.Entity; +import jakarta.persistence.Table; +import lombok.Getter; +import lombok.Setter; + +import java.time.ZonedDateTime; + +@Entity +@Table(name = "tb_mv_product_rank_monthly") +@Getter +@Setter +public class MvProductRankMonthly extends BaseEntity { + private Long productId; + private String productName; + private Long brandId; + private String brandName; + private Double score; + private Integer likeCount; + private Integer viewCount; + private Integer orderCount; + private Integer ranking; + private ZonedDateTime rankingDate; +} diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java new file mode 100644 index 000000000..aed461f1a --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java @@ -0,0 +1,12 @@ +package com.loopers.domain.ranking; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +public interface MvProductRankMonthlyRepository { + List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate); + Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate); + void deleteByRankingDate(ZonedDateTime rankingDate); + void saveAll(List rankings); +} diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java new file mode 100644 index 000000000..71356b0e6 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java @@ -0,0 +1,31 @@ +package com.loopers.domain.ranking; + +import com.loopers.domain.BaseEntity; +import jakarta.persistence.Entity; +import jakarta.persistence.Index; +import jakarta.persistence.Table; +import lombok.Getter; +import lombok.Setter; + +import java.time.ZonedDateTime; + +@Entity +@Table(name = "tb_mv_product_rank_weekly", indexes = { + @Index(name = "idx_mv_product_rank_weekly_ranking_date", columnList = "ranking_date"), + @Index(name = "idx_mv_product_rank_weekly_product_date", columnList = "product_id,ranking_date"), + @Index(name = "idx_mv_product_rank_weekly_date_ranking", columnList = "ranking_date,ranking") +}) +@Getter +@Setter +public class MvProductRankWeekly extends BaseEntity { + private Long productId; + private String productName; + private Long brandId; + private String brandName; + private Double score; + private Integer likeCount; + private Integer viewCount; + private Integer orderCount; + private Integer ranking; + private ZonedDateTime rankingDate; +} diff --git a/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java new file mode 100644 index 000000000..30f9319af --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java @@ -0,0 +1,27 @@ +package com.loopers.domain.ranking; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +public interface MvProductRankWeeklyRepository { + /** + * 특정 날짜의 주간 랭킹 TOP 100 조회 (rank 오름차순) + */ + List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate); + + /** + * 특정 상품의 주간 랭킹 조회 + */ + Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate); + + /** + * 특정 날짜의 주간 랭킹 데이터 삭제 (배치 재실행 전) + */ + void deleteByRankingDate(ZonedDateTime rankingDate); + + /** + * 주간 랭킹 데이터 일괄 저장 + */ + void saveAll(List rankings); +} diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyJpaRepository.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyJpaRepository.java new file mode 100644 index 000000000..a8382f554 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyJpaRepository.java @@ -0,0 +1,79 @@ +package com.loopers.infrastructure.metrics.product; + +import com.loopers.domain.metrics.product.ProductMetricsDaily; +import jakarta.persistence.LockModeType; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.LocalDate; +import java.util.List; +import java.util.Optional; + +public interface ProductMetricsDailyJpaRepository extends JpaRepository { + + Optional findByProductIdAndDate(Long productId, LocalDate date); + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT pmd FROM ProductMetricsDaily pmd WHERE pmd.productId = :productId AND pmd.date = :date") + Optional findByProductIdAndDateForUpdate( + @Param("productId") Long productId, + @Param("date") LocalDate date + ); + + List findByDateBetween(LocalDate startDate, LocalDate endDate); + + /** + * 특정 날짜 범위의 메트릭을 product_id별로 집계 + */ + @Query( + "SELECT " + + " pmd.productId as productId, " + + " SUM(pmd.likeCount) as totalLikeCount, " + + " SUM(pmd.viewCount) as totalViewCount, " + + " SUM(pmd.soldCount) as totalSoldCount " + + "FROM ProductMetricsDaily pmd " + + "WHERE pmd.date >= :startDate AND pmd.date <= :endDate " + + "GROUP BY pmd.productId " + ) + List findAggregatedByDateBetween( + @Param("startDate") LocalDate startDate, + @Param("endDate") LocalDate endDate + ); + + /** + * 특정 날짜 범위의 메트릭을 product_id별로 집계하여 페이징 조회 (배치용) + * + * @param startDate 시작일 + * @param endDate 종료일 + * @param pageable 페이징 정보 + * @return 집계된 메트릭 페이지 + */ + @Query( + value = + "SELECT " + + " pmd.product_id as productId, " + + " SUM(pmd.like_count) as totalLikeCount, " + + " SUM(pmd.view_count) as totalViewCount, " + + " SUM(pmd.sold_count) as totalSoldCount " + + "FROM tb_product_metrics_daily pmd " + + "WHERE pmd.date >= :startDate AND pmd.date <= :endDate " + + "GROUP BY pmd.product_id " + + "ORDER BY pmd.product_id", + countQuery = + "SELECT COUNT(DISTINCT pmd.product_id) " + + "FROM tb_product_metrics_daily pmd " + + "WHERE pmd.date >= :startDate AND pmd.date <= :endDate", + nativeQuery = true + ) + Page findAggregatedByDateBetweenPaged( + @Param("startDate") LocalDate startDate, + @Param("endDate") LocalDate endDate, + Pageable pageable + ); +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyRepositoryImpl.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyRepositoryImpl.java new file mode 100644 index 000000000..ef6b3b61c --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/metrics/product/ProductMetricsDailyRepositoryImpl.java @@ -0,0 +1,87 @@ +package com.loopers.infrastructure.metrics.product; + +import com.loopers.domain.metrics.product.ProductMetricsDaily; +import com.loopers.domain.metrics.product.ProductMetricsDailyAggregated; +import com.loopers.domain.metrics.product.ProductMetricsDailyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDate; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Component +@RequiredArgsConstructor +public class ProductMetricsDailyRepositoryImpl implements ProductMetricsDailyRepository { + + private final ProductMetricsDailyJpaRepository jpaRepository; + + @Override + @Transactional(readOnly = true) + public Optional findByProductIdAndDate(Long productId, LocalDate date) { + return jpaRepository.findByProductIdAndDate(productId, date); + } + + @Override + @Transactional + public Optional findByProductIdAndDateForUpdate(Long productId, LocalDate date) { + return jpaRepository.findByProductIdAndDateForUpdate(productId, date); + } + + @Override + @Transactional(readOnly = true) + public List findByDateBetween(LocalDate startDate, LocalDate endDate) { + return jpaRepository.findByDateBetween(startDate, endDate); + } + + @Override + @Transactional(readOnly = true) + public List findAggregatedByDateBetween( + LocalDate startDate, + LocalDate endDate + ) { + List results = jpaRepository.findAggregatedByDateBetween(startDate, endDate); + return results.stream() + .map(row -> new ProductMetricsDailyAggregated( + ((Number) row[0]).longValue(), // productId + ((Number) row[1]).longValue(), // totalLikeCount + ((Number) row[2]).longValue(), // totalViewCount + ((Number) row[3]).longValue() // totalSoldCount + )) + .collect(Collectors.toList()); + } + + @Override + @Transactional(readOnly = true) + public Page findAggregatedByDateBetweenPaged(LocalDate startDate, LocalDate endDate, Pageable pageable) { + Page results = jpaRepository.findAggregatedByDateBetweenPaged( + startDate, endDate, pageable + ); + + return results.map(row -> new ProductMetricsDailyAggregated( + ((Number) row[0]).longValue(), // productId + ((Number) row[1]).longValue(), // totalLikeCount + ((Number) row[2]).longValue(), // totalViewCount + ((Number) row[3]).longValue() // totalSoldCount + )); + } + + @Override + @Transactional + public ProductMetricsDaily save(ProductMetricsDaily daily) { + return jpaRepository.save(daily); + } + + @Override + @Transactional + public List saveAll(Collection dailies) { + return jpaRepository.saveAll(dailies); + } +} + + diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java new file mode 100644 index 000000000..cb760de1b --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankMonthly; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +public interface MvProductRankMonthlyJpaRepository extends JpaRepository { + List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate); + + Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate); + + @Modifying + @Query("DELETE FROM MvProductRankMonthly r WHERE r.rankingDate = :rankingDate") + void deleteByRankingDate(@Param("rankingDate") ZonedDateTime rankingDate); +} diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java new file mode 100644 index 000000000..cfeafe55c --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java @@ -0,0 +1,41 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankMonthlyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +@Component +@RequiredArgsConstructor +public class MvProductRankMonthlyRepositoryImpl implements MvProductRankMonthlyRepository { + private final MvProductRankMonthlyJpaRepository jpaRepository; + + @Override + @Transactional(readOnly = true) + public List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate) { + return jpaRepository.findTop100ByRankingDateOrderByRankingAsc(rankingDate); + } + + @Override + @Transactional(readOnly = true) + public Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate) { + return jpaRepository.findByProductIdAndRankingDate(productId, rankingDate); + } + + @Override + @Transactional + public void deleteByRankingDate(ZonedDateTime rankingDate) { + jpaRepository.deleteByRankingDate(rankingDate); + } + + @Override + @Transactional + public void saveAll(List rankings) { + jpaRepository.saveAll(rankings); + } +} diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java new file mode 100644 index 000000000..e368f7e08 --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankWeekly; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +public interface MvProductRankWeeklyJpaRepository extends JpaRepository { + List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate); + + Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate); + + @Modifying + @Query("DELETE FROM MvProductRankWeekly r WHERE r.rankingDate = :rankingDate") + void deleteByRankingDate(@Param("rankingDate") ZonedDateTime rankingDate); +} diff --git a/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java new file mode 100644 index 000000000..1bc99f1bb --- /dev/null +++ b/apps/commerce-core/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java @@ -0,0 +1,41 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankWeekly; +import com.loopers.domain.ranking.MvProductRankWeeklyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +@Component +@RequiredArgsConstructor +public class MvProductRankWeeklyRepositoryImpl implements MvProductRankWeeklyRepository { + private final MvProductRankWeeklyJpaRepository jpaRepository; + + @Override + @Transactional(readOnly = true) + public List findTop100ByRankingDateOrderByRankingAsc(ZonedDateTime rankingDate) { + return jpaRepository.findTop100ByRankingDateOrderByRankingAsc(rankingDate); + } + + @Override + @Transactional(readOnly = true) + public Optional findByProductIdAndRankingDate(Long productId, ZonedDateTime rankingDate) { + return jpaRepository.findByProductIdAndRankingDate(productId, rankingDate); + } + + @Override + @Transactional + public void deleteByRankingDate(ZonedDateTime rankingDate) { + jpaRepository.deleteByRankingDate(rankingDate); + } + + @Override + @Transactional + public void saveAll(List rankings) { + jpaRepository.saveAll(rankings); + } +} diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/CatalogEventListener.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/CatalogEventListener.java index ac6ab4bf4..c09090e00 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/CatalogEventListener.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/CatalogEventListener.java @@ -1,6 +1,7 @@ package com.loopers.infrastructure.listener; import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.domain.metrics.product.ProductMetricsDailyService; import com.loopers.infrastructure.dlq.DlqService; import com.loopers.infrastructure.idempotency.IdempotencyService; import com.loopers.domain.metrics.product.ProductMetricsService; @@ -13,6 +14,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.LocalDate; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -23,7 +25,8 @@ public class CatalogEventListener { private final IdempotencyService idempotencyService; - private final ProductMetricsService productMetricsService; + private final ProductMetricsService productMetricsService; // 누계용 + private final ProductMetricsDailyService productMetricsDailyService; // 일별 집계용 private final DlqService dlqService; private final ObjectMapper objectMapper; @@ -56,6 +59,8 @@ public void handleCatalogEvents( ) { log.debug("Received {} messages from catalog-events", records.size()); + LocalDate today = LocalDate.now(); + for (ConsumerRecord record : records) { try { String key = record.key() != null ? record.key().toString() : null; @@ -82,7 +87,7 @@ public void handleCatalogEvents( } // 이벤트 타입별 처리 - handleEvent(eventType, aggregateId, message); + handleEvent(eventType, aggregateId, today); // 처리 완료 기록 (비즈니스 로직과 같은 트랜잭션) idempotencyService.markAsHandled( @@ -137,20 +142,23 @@ public void handleCatalogEvents( log.debug("Acknowledged {} messages", records.size()); } - private void handleEvent(String eventType, String aggregateId, Map message) { + private void handleEvent(String eventType, String aggregateId, LocalDate date) { Long productId = Long.parseLong(aggregateId); switch (eventType) { case "ProductLiked": productMetricsService.incrementLikeCount(productId); + productMetricsDailyService.incrementLikeCount(productId, date); log.debug("Incremented like count for product: productId={}", productId); break; case "ProductUnliked": productMetricsService.decrementLikeCount(productId); + productMetricsDailyService.decrementLikeCount(productId, date); log.debug("Decremented like count for product: productId={}", productId); break; case "ProductViewed": productMetricsService.incrementViewCount(productId); + productMetricsDailyService.incrementViewCount(productId, date); log.debug("Incremented view count for product: productId={}", productId); break; default: diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/OrderEventListener.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/OrderEventListener.java index 7a7d12c5a..99a8e5bbc 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/OrderEventListener.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/listener/OrderEventListener.java @@ -1,6 +1,7 @@ package com.loopers.infrastructure.listener; import com.loopers.confg.kafka.KafkaConfig; +import com.loopers.domain.metrics.product.ProductMetricsDailyService; import com.loopers.domain.order.Order; import com.loopers.domain.order.OrderRepository; import com.loopers.infrastructure.dlq.DlqService; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.LocalDate; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,6 +30,7 @@ public class OrderEventListener { private final IdempotencyService idempotencyService; private final ProductMetricsService productMetricsService; + private final ProductMetricsDailyService productMetricsDailyService; private final OrderRepository orderRepository; private final DlqService dlqService; private final ObjectMapper objectMapper; @@ -199,6 +202,7 @@ private void handleOrderPaidEvent(Map message) { try { productMetricsService.incrementSoldCount(productId, Long.valueOf(quantity)); + productMetricsDailyService.incrementSoldCount(productId, LocalDate.now(), Long.valueOf(quantity)); log.debug("Incremented sold count for product: productId={}, quantity={}", productId, quantity); } catch (Exception e) {