hourlyKeys = new ArrayList<>();
+
+ // 해당 날짜의 모든 시간 단위 키 생성 (00시 ~ 23시)
+ for (int hour = 0; hour < 24; hour++) {
+ LocalDateTime dateTime = date.atTime(hour, 0);
+ String hourlyKey = keyGenerator.generateHourlyKey(dateTime);
+ hourlyKeys.add(hourlyKey);
+ }
+
+ // ZUNIONSTORE로 모든 시간 단위 랭킹을 일간 랭킹으로 집계
+ Long result = zSetTemplate.unionStore(dailyKey, hourlyKeys);
+
+ // TTL 설정
+ zSetTemplate.setTtlIfNotExists(dailyKey, TTL);
+
+ log.info("시간 단위 랭킹을 일간 랭킹으로 집계 완료: date={}, memberCount={}", date, result);
+ return result;
+ }
+
+ /**
+ * Score Carry-Over: 오늘의 랭킹을 내일 랭킹에 일부 반영합니다.
+ *
+ * 콜드 스타트 문제를 완화하기 위해 오늘의 랭킹을 가중치를 적용하여 내일 랭킹에 반영합니다.
+ * 예: 오늘 랭킹의 10%를 내일 랭킹에 반영
+ *
+ *
+ * @param today 오늘 날짜
+ * @param tomorrow 내일 날짜
+ * @param carryOverWeight Carry-Over 가중치 (예: 0.1 = 10%)
+ * @return 반영된 멤버 수
+ */
+ public Long carryOverScore(LocalDate today, LocalDate tomorrow, double carryOverWeight) {
+ String todayKey = keyGenerator.generateDailyKey(today);
+ String tomorrowKey = keyGenerator.generateDailyKey(tomorrow);
+
+ // 오늘 랭킹을 가중치를 적용하여 내일 랭킹에 합산
+ Long result = zSetTemplate.unionStoreWithWeight(tomorrowKey, todayKey, carryOverWeight);
+
+ // TTL 설정
+ zSetTemplate.setTtlIfNotExists(tomorrowKey, TTL);
+
+ log.info("Score Carry-Over 완료: today={}, tomorrow={}, weight={}, memberCount={}",
+ today, tomorrow, carryOverWeight, result);
+ return result;
+ }
+
/**
* ZSET에 점수를 증가시킵니다.
*
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java
index 7b79bc95d..cfe6fd9b7 100644
--- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java
+++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java
@@ -2,7 +2,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.application.eventhandled.EventHandledService;
-import com.loopers.application.ranking.RankingService;
import com.loopers.confg.kafka.KafkaConfig;
import com.loopers.domain.event.LikeEvent;
import com.loopers.domain.event.OrderEvent;
@@ -11,18 +10,18 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
-import java.time.LocalDate;
import java.util.List;
/**
* 랭킹 집계 Kafka Consumer.
*
- * Kafka에서 이벤트를 수취하여 Redis ZSET에 랭킹 점수를 적재합니다.
+ * Kafka에서 이벤트를 수취하여 Spring ApplicationEvent로 발행합니다.
* 조회, 좋아요, 주문 이벤트를 기반으로 실시간 랭킹을 구축합니다.
*
*
@@ -43,20 +42,22 @@
*
* 설계 원칙:
*
- * - Eventually Consistent: 일시적인 지연/중복 허용
- * - CQRS Read Model: Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET
+ * - 관심사 분리: Consumer는 Kafka 메시지 수신/파싱만 담당, 비즈니스 로직은 EventHandler에서 처리
+ * - 이벤트 핸들러 패턴: Kafka Event → Spring ApplicationEvent → RankingEventListener → RankingEventHandler
+ * - Eventually Consistent: 일시적인 지연/중복 허용
+ * - CQRS Read Model: Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET
*
*
*
* @author Loopers
- * @version 1.0
+ * @version 2.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RankingConsumer {
- private final RankingService rankingService;
+ private final ApplicationEventPublisher applicationEventPublisher;
private final EventHandledService eventHandledService;
private final ObjectMapper objectMapper;
@@ -104,28 +105,31 @@ public void consumeLikeEvents(
Object value = record.value();
String eventType;
- LocalDate date = LocalDate.now();
// Spring Kafka가 자동으로 역직렬화한 경우
if (value instanceof LikeEvent.LikeAdded) {
LikeEvent.LikeAdded event = (LikeEvent.LikeAdded) value;
- rankingService.addLikeScore(event.productId(), date, true);
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
eventType = "LikeAdded";
} else if (value instanceof LikeEvent.LikeRemoved) {
LikeEvent.LikeRemoved event = (LikeEvent.LikeRemoved) value;
- rankingService.addLikeScore(event.productId(), date, false);
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
eventType = "LikeRemoved";
} else {
// JSON 문자열인 경우 이벤트 타입 헤더로 구분
String eventTypeHeader = extractEventType(record);
if ("LikeRemoved".equals(eventTypeHeader)) {
LikeEvent.LikeRemoved event = parseLikeRemovedEvent(value);
- rankingService.addLikeScore(event.productId(), date, false);
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
eventType = "LikeRemoved";
} else {
// 기본값은 LikeAdded
LikeEvent.LikeAdded event = parseLikeEvent(value);
- rankingService.addLikeScore(event.productId(), date, true);
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
eventType = "LikeAdded";
}
}
@@ -202,23 +206,8 @@ public void consumeOrderEvents(
Object value = record.value();
OrderEvent.OrderCreated event = parseOrderCreatedEvent(value);
- LocalDate date = LocalDate.now();
-
- // 주문 아이템별로 점수 집계
- // 주의: OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없으므로
- // subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함
- int totalQuantity = event.orderItems().stream()
- .mapToInt(OrderEvent.OrderCreated.OrderItemInfo::quantity)
- .sum();
-
- if (totalQuantity > 0 && event.subtotal() != null) {
- double averagePrice = (double) event.subtotal() / totalQuantity;
-
- for (OrderEvent.OrderCreated.OrderItemInfo item : event.orderItems()) {
- double orderAmount = averagePrice * item.quantity();
- rankingService.addOrderScore(item.productId(), date, orderAmount);
- }
- }
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
// 이벤트 처리 기록 저장
eventHandledService.markAsHandled(eventId, "OrderCreated", "order-events");
@@ -284,9 +273,8 @@ public void consumeProductEvents(
Object value = record.value();
ProductEvent.ProductViewed event = parseProductViewedEvent(value);
- LocalDate date = LocalDate.now();
-
- rankingService.addViewScore(event.productId(), date);
+ // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
+ applicationEventPublisher.publishEvent(event);
// 이벤트 처리 기록 저장
eventHandledService.markAsHandled(eventId, "ProductViewed", "product-events");
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java
new file mode 100644
index 000000000..b72cc4a48
--- /dev/null
+++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java
@@ -0,0 +1,121 @@
+package com.loopers.interfaces.event.ranking;
+
+import com.loopers.application.ranking.RankingEventHandler;
+import com.loopers.domain.event.LikeEvent;
+import com.loopers.domain.event.OrderEvent;
+import com.loopers.domain.event.ProductEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+/**
+ * 랭킹 이벤트 리스너.
+ *
+ * 좋아요 추가/취소, 주문 생성, 상품 조회 이벤트를 받아서 랭킹 점수를 집계하는 인터페이스 레이어의 어댑터입니다.
+ *
+ *
+ * 레이어 역할:
+ *
+ * - 인터페이스 레이어: 외부 이벤트(도메인 이벤트)를 받아서 애플리케이션 핸들러를 호출하는 어댑터
+ * - 비즈니스 로직 없음: 단순히 이벤트를 받아서 애플리케이션 핸들러를 호출하는 역할만 수행
+ *
+ *
+ *
+ * EDA 원칙:
+ *
+ * - 비동기 처리: @Async로 집계 처리를 비동기로 실행하여 Kafka Consumer의 성능에 영향 없음
+ * - 이벤트 기반: 좋아요, 주문, 조회 이벤트를 구독하여 랭킹 점수 집계
+ *
+ *
+ *
+ * @author Loopers
+ * @version 1.0
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class RankingEventListener {
+
+ private final RankingEventHandler rankingEventHandler;
+
+ /**
+ * 좋아요 추가 이벤트를 처리합니다.
+ *
+ * 비동기로 실행되어 랭킹 점수를 집계합니다.
+ *
+ *
+ * @param event 좋아요 추가 이벤트
+ */
+ @Async
+ @EventListener
+ public void handleLikeAdded(LikeEvent.LikeAdded event) {
+ try {
+ rankingEventHandler.handleLikeAdded(event);
+ } catch (Exception e) {
+ log.error("좋아요 추가 이벤트 처리 중 오류 발생: productId={}, userId={}",
+ event.productId(), event.userId(), e);
+ // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴
+ }
+ }
+
+ /**
+ * 좋아요 취소 이벤트를 처리합니다.
+ *
+ * 비동기로 실행되어 랭킹 점수를 차감합니다.
+ *
+ *
+ * @param event 좋아요 취소 이벤트
+ */
+ @Async
+ @EventListener
+ public void handleLikeRemoved(LikeEvent.LikeRemoved event) {
+ try {
+ rankingEventHandler.handleLikeRemoved(event);
+ } catch (Exception e) {
+ log.error("좋아요 취소 이벤트 처리 중 오류 발생: productId={}, userId={}",
+ event.productId(), event.userId(), e);
+ // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴
+ }
+ }
+
+ /**
+ * 주문 생성 이벤트를 처리합니다.
+ *
+ * 비동기로 실행되어 랭킹 점수를 집계합니다.
+ *
+ *
+ * @param event 주문 생성 이벤트
+ */
+ @Async
+ @EventListener
+ public void handleOrderCreated(OrderEvent.OrderCreated event) {
+ try {
+ rankingEventHandler.handleOrderCreated(event);
+ } catch (Exception e) {
+ log.error("주문 생성 이벤트 처리 중 오류 발생: orderId={}", event.orderId(), e);
+ // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴
+ }
+ }
+
+ /**
+ * 상품 조회 이벤트를 처리합니다.
+ *
+ * 비동기로 실행되어 랭킹 점수를 집계합니다.
+ *
+ *
+ * @param event 상품 조회 이벤트
+ */
+ @Async
+ @EventListener
+ public void handleProductViewed(ProductEvent.ProductViewed event) {
+ try {
+ rankingEventHandler.handleProductViewed(event);
+ } catch (Exception e) {
+ log.error("상품 조회 이벤트 처리 중 오류 발생: productId={}", event.productId(), e);
+ // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴
+ }
+ }
+}
+
diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java
new file mode 100644
index 000000000..a32182b98
--- /dev/null
+++ b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java
@@ -0,0 +1,158 @@
+package com.loopers.application.ranking;
+
+import com.loopers.domain.event.LikeEvent;
+import com.loopers.domain.event.OrderEvent;
+import com.loopers.domain.event.ProductEvent;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * RankingEventHandler 테스트.
+ */
+@ExtendWith(MockitoExtension.class)
+class RankingEventHandlerTest {
+
+ @Mock
+ private RankingService rankingService;
+
+ @InjectMocks
+ private RankingEventHandler rankingEventHandler;
+
+ @DisplayName("좋아요 추가 이벤트를 처리할 수 있다.")
+ @Test
+ void canHandleLikeAdded() {
+ // arrange
+ Long productId = 1L;
+ Long userId = 100L;
+ LikeEvent.LikeAdded event = new LikeEvent.LikeAdded(userId, productId, LocalDateTime.now());
+
+ // act
+ rankingEventHandler.handleLikeAdded(event);
+
+ // assert
+ verify(rankingService).addLikeScore(eq(productId), any(LocalDate.class), eq(true));
+ }
+
+ @DisplayName("좋아요 취소 이벤트를 처리할 수 있다.")
+ @Test
+ void canHandleLikeRemoved() {
+ // arrange
+ Long productId = 1L;
+ Long userId = 100L;
+ LikeEvent.LikeRemoved event = new LikeEvent.LikeRemoved(userId, productId, LocalDateTime.now());
+
+ // act
+ rankingEventHandler.handleLikeRemoved(event);
+
+ // assert
+ verify(rankingService).addLikeScore(eq(productId), any(LocalDate.class), eq(false));
+ }
+
+ @DisplayName("주문 생성 이벤트를 처리할 수 있다.")
+ @Test
+ void canHandleOrderCreated() {
+ // arrange
+ Long orderId = 1L;
+ Long userId = 100L;
+ OrderEvent.OrderCreated.OrderItemInfo item1 =
+ new OrderEvent.OrderCreated.OrderItemInfo(1L, 2);
+ OrderEvent.OrderCreated.OrderItemInfo item2 =
+ new OrderEvent.OrderCreated.OrderItemInfo(2L, 3);
+ OrderEvent.OrderCreated event = new OrderEvent.OrderCreated(
+ orderId,
+ userId,
+ null, // couponCode
+ 10000, // subtotal
+ null, // usedPointAmount
+ List.of(item1, item2),
+ LocalDateTime.now()
+ );
+
+ // act
+ rankingEventHandler.handleOrderCreated(event);
+
+ // assert
+ // totalQuantity = 2 + 3 = 5
+ // averagePrice = 10000 / 5 = 2000
+ // item1: 2000 * 2 = 4000
+ // item2: 2000 * 3 = 6000
+ verify(rankingService).addOrderScore(eq(1L), any(LocalDate.class), eq(4000.0));
+ verify(rankingService).addOrderScore(eq(2L), any(LocalDate.class), eq(6000.0));
+ }
+
+ @DisplayName("주문 아이템이 없으면 점수를 추가하지 않는다.")
+ @Test
+ void doesNothing_whenOrderItemsIsEmpty() {
+ // arrange
+ Long orderId = 1L;
+ Long userId = 100L;
+ OrderEvent.OrderCreated event = new OrderEvent.OrderCreated(
+ orderId,
+ userId,
+ null, // couponCode
+ 10000, // subtotal
+ null, // usedPointAmount
+ List.of(),
+ LocalDateTime.now()
+ );
+
+ // act
+ rankingEventHandler.handleOrderCreated(event);
+
+ // assert
+ verify(rankingService, never()).addOrderScore(any(), any(), anyDouble());
+ }
+
+ @DisplayName("주문 subtotal이 null이면 점수를 추가하지 않는다.")
+ @Test
+ void doesNothing_whenSubtotalIsNull() {
+ // arrange
+ Long orderId = 1L;
+ Long userId = 100L;
+ OrderEvent.OrderCreated.OrderItemInfo item =
+ new OrderEvent.OrderCreated.OrderItemInfo(1L, 2);
+ OrderEvent.OrderCreated event = new OrderEvent.OrderCreated(
+ orderId,
+ userId,
+ null, // couponCode
+ null, // subtotal
+ null, // usedPointAmount
+ List.of(item),
+ LocalDateTime.now()
+ );
+
+ // act
+ rankingEventHandler.handleOrderCreated(event);
+
+ // assert
+ verify(rankingService, never()).addOrderScore(any(), any(), anyDouble());
+ }
+
+ @DisplayName("상품 조회 이벤트를 처리할 수 있다.")
+ @Test
+ void canHandleProductViewed() {
+ // arrange
+ Long productId = 1L;
+ Long userId = 100L;
+ ProductEvent.ProductViewed event = new ProductEvent.ProductViewed(productId, userId, LocalDateTime.now());
+
+ // act
+ rankingEventHandler.handleProductViewed(event);
+
+ // assert
+ verify(rankingService).addViewScore(eq(productId), any(LocalDate.class));
+ }
+}
+
diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java
index cad3228aa..3851331be 100644
--- a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java
+++ b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java
@@ -11,7 +11,10 @@
import java.time.Duration;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -244,4 +247,96 @@ void accumulatesScoresForSameProduct() {
// TTL 설정은 각 호출마다 수행됨 (incrementScore 내부에서 호출)
verify(zSetTemplate, times(3)).setTtlIfNotExists(eq(expectedKey), eq(Duration.ofDays(2)));
}
+
+ @DisplayName("시간 단위 랭킹을 일간 랭킹으로 집계할 수 있다.")
+ @Test
+ void canAggregateHourlyToDaily() {
+ // arrange
+ LocalDate date = LocalDate.of(2024, 12, 15);
+ String dailyKey = "ranking:all:20241215";
+ List expectedHourlyKeys = new ArrayList<>();
+
+ // 00시 ~ 23시 키 생성
+ for (int hour = 0; hour < 24; hour++) {
+ LocalDateTime dateTime = date.atTime(hour, 0);
+ String hourlyKey = "ranking:hourly:" + dateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ expectedHourlyKeys.add(hourlyKey);
+ when(keyGenerator.generateHourlyKey(dateTime)).thenReturn(hourlyKey);
+ }
+
+ when(keyGenerator.generateDailyKey(date)).thenReturn(dailyKey);
+ when(zSetTemplate.unionStore(eq(dailyKey), any(List.class))).thenReturn(100L);
+
+ // act
+ Long result = rankingService.aggregateHourlyToDaily(date);
+
+ // assert
+ assertThat(result).isEqualTo(100L);
+ verify(keyGenerator).generateDailyKey(date);
+
+ // 24개의 시간 단위 키 생성 확인
+ for (int hour = 0; hour < 24; hour++) {
+ LocalDateTime dateTime = date.atTime(hour, 0);
+ verify(keyGenerator).generateHourlyKey(dateTime);
+ }
+
+ // ZUNIONSTORE 호출 확인
+ ArgumentCaptor> keysCaptor = ArgumentCaptor.forClass(List.class);
+ verify(zSetTemplate).unionStore(eq(dailyKey), keysCaptor.capture());
+ List capturedKeys = keysCaptor.getValue();
+ assertThat(capturedKeys).hasSize(24);
+ assertThat(capturedKeys).containsAll(expectedHourlyKeys);
+
+ // TTL 설정 확인
+ verify(zSetTemplate).setTtlIfNotExists(eq(dailyKey), eq(Duration.ofDays(2)));
+ }
+
+ @DisplayName("Score Carry-Over로 오늘 랭킹을 내일 랭킹에 반영할 수 있다.")
+ @Test
+ void canCarryOverScore() {
+ // arrange
+ LocalDate today = LocalDate.of(2024, 12, 15);
+ LocalDate tomorrow = LocalDate.of(2024, 12, 16);
+ String todayKey = "ranking:all:20241215";
+ String tomorrowKey = "ranking:all:20241216";
+ double carryOverWeight = 0.1; // 10%
+
+ when(keyGenerator.generateDailyKey(today)).thenReturn(todayKey);
+ when(keyGenerator.generateDailyKey(tomorrow)).thenReturn(tomorrowKey);
+ when(zSetTemplate.unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight)))
+ .thenReturn(50L);
+
+ // act
+ Long result = rankingService.carryOverScore(today, tomorrow, carryOverWeight);
+
+ // assert
+ assertThat(result).isEqualTo(50L);
+ verify(keyGenerator).generateDailyKey(today);
+ verify(keyGenerator).generateDailyKey(tomorrow);
+ verify(zSetTemplate).unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight));
+ verify(zSetTemplate).setTtlIfNotExists(eq(tomorrowKey), eq(Duration.ofDays(2)));
+ }
+
+ @DisplayName("Score Carry-Over 가중치가 0일 때도 정상적으로 처리된다.")
+ @Test
+ void canCarryOverScore_withZeroWeight() {
+ // arrange
+ LocalDate today = LocalDate.of(2024, 12, 15);
+ LocalDate tomorrow = LocalDate.of(2024, 12, 16);
+ String todayKey = "ranking:all:20241215";
+ String tomorrowKey = "ranking:all:20241216";
+ double carryOverWeight = 0.0;
+
+ when(keyGenerator.generateDailyKey(today)).thenReturn(todayKey);
+ when(keyGenerator.generateDailyKey(tomorrow)).thenReturn(tomorrowKey);
+ when(zSetTemplate.unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight)))
+ .thenReturn(0L);
+
+ // act
+ Long result = rankingService.carryOverScore(today, tomorrow, carryOverWeight);
+
+ // assert
+ assertThat(result).isEqualTo(0L);
+ verify(zSetTemplate).unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight));
+ }
}
diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java
index 99e91a877..67485df03 100644
--- a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java
+++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java
@@ -2,7 +2,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.application.eventhandled.EventHandledService;
-import com.loopers.application.ranking.RankingService;
import com.loopers.domain.event.LikeEvent;
import com.loopers.domain.event.OrderEvent;
import com.loopers.domain.event.ProductEvent;
@@ -17,6 +16,7 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.kafka.support.Acknowledgment;
@@ -35,7 +35,7 @@
class RankingConsumerTest {
@Mock
- private RankingService rankingService;
+ private ApplicationEventPublisher applicationEventPublisher;
@Mock
private EventHandledService eventHandledService;
@@ -73,7 +73,7 @@ void canConsumeLikeAddedEvent() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService).addLikeScore(eq(productId), any(), eq(true));
+ verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class));
verify(eventHandledService).markAsHandled(eventId, "LikeAdded", "like-events");
verify(acknowledgment).acknowledge();
}
@@ -103,7 +103,7 @@ void canConsumeLikeRemovedEvent() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService).addLikeScore(eq(productId), any(), eq(false));
+ verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeRemoved.class));
verify(eventHandledService).markAsHandled(eventId, "LikeRemoved", "like-events");
verify(acknowledgment).acknowledge();
}
@@ -142,13 +142,7 @@ void canConsumeOrderCreatedEvent() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
-
- // 평균 단가 계산: 10000 / (3 + 2) = 2000
- // productId1: 2000 * 3 = 6000
- // productId2: 2000 * 2 = 4000
- verify(rankingService).addOrderScore(eq(productId1), any(), eq(6000.0));
- verify(rankingService).addOrderScore(eq(productId2), any(), eq(4000.0));
-
+ verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class));
verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events");
verify(acknowledgment).acknowledge();
}
@@ -177,7 +171,7 @@ void canConsumeProductViewedEvent() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService).addViewScore(eq(productId), any());
+ verify(applicationEventPublisher).publishEvent(any(ProductEvent.ProductViewed.class));
verify(eventHandledService).markAsHandled(eventId, "ProductViewed", "product-events");
verify(acknowledgment).acknowledge();
}
@@ -214,8 +208,8 @@ void canConsumeMultipleEvents() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId1);
verify(eventHandledService).isAlreadyHandled(eventId2);
- verify(rankingService).addLikeScore(eq(productId), any(), eq(true));
- verify(rankingService).addViewScore(eq(productId), any());
+ verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class));
+ verify(applicationEventPublisher).publishEvent(any(ProductEvent.ProductViewed.class));
verify(eventHandledService).markAsHandled(eventId1, "LikeAdded", "like-events");
verify(eventHandledService).markAsHandled(eventId2, "ProductViewed", "product-events");
verify(acknowledgment, times(2)).acknowledge();
@@ -245,7 +239,7 @@ void skipsAlreadyHandledEvent() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService, never()).addLikeScore(any(), any(), anyBoolean());
+ verify(applicationEventPublisher, never()).publishEvent(any());
verify(eventHandledService, never()).markAsHandled(any(), any(), any());
verify(acknowledgment).acknowledge();
}
@@ -268,7 +262,7 @@ void skipsEventWithoutEventId() {
// assert
verify(eventHandledService, never()).isAlreadyHandled(any());
- verify(rankingService, never()).addLikeScore(any(), any(), anyBoolean());
+ verify(applicationEventPublisher, never()).publishEvent(any());
verify(acknowledgment).acknowledge();
}
@@ -291,8 +285,6 @@ void continuesProcessing_whenIndividualEventFails() {
when(eventHandledService.isAlreadyHandled(eventId1)).thenReturn(false);
when(eventHandledService.isAlreadyHandled(eventId2)).thenReturn(false);
- doThrow(new RuntimeException("처리 실패"))
- .when(rankingService).addLikeScore(any(), any(), anyBoolean());
List> records = List.of(
new ConsumerRecord<>("like-events", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "key", invalidEvent, headers1, Optional.empty()),
@@ -305,7 +297,9 @@ void continuesProcessing_whenIndividualEventFails() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId1);
verify(eventHandledService).isAlreadyHandled(eventId2);
- verify(rankingService, atLeastOnce()).addLikeScore(any(), any(), anyBoolean());
+ // 첫 번째 이벤트는 파싱 실패로 publishEvent가 호출되지 않음
+ // 두 번째 이벤트는 정상적으로 publishEvent가 호출됨
+ verify(applicationEventPublisher, times(1)).publishEvent(any(LikeEvent.LikeAdded.class));
verify(acknowledgment).acknowledge();
}
@@ -335,7 +329,7 @@ void handlesDataIntegrityViolationException() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService).addLikeScore(eq(productId), any(), eq(true));
+ verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class));
verify(eventHandledService).markAsHandled(eventId, "LikeAdded", "like-events");
verify(acknowledgment).acknowledge();
}
@@ -371,7 +365,7 @@ void doesNotAddScore_whenTotalQuantityIsZero() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService, never()).addOrderScore(any(), any(), anyDouble());
+ verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class));
verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events");
verify(acknowledgment).acknowledge();
}
@@ -407,7 +401,7 @@ void doesNotAddScore_whenSubtotalIsNull() {
// assert
verify(eventHandledService).isAlreadyHandled(eventId);
- verify(rankingService, never()).addOrderScore(any(), any(), anyDouble());
+ verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class));
verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events");
verify(acknowledgment).acknowledge();
}
@@ -444,8 +438,8 @@ void handlesDuplicateMessagesIdempotently() {
// isAlreadyHandled는 3번 호출됨 (각 메시지마다)
verify(eventHandledService, times(3)).isAlreadyHandled(eventId);
- // addLikeScore는 한 번만 호출되어야 함 (첫 번째 메시지만 처리)
- verify(rankingService, times(1)).addLikeScore(eq(productId), any(), eq(true));
+ // publishEvent는 한 번만 호출되어야 함 (첫 번째 메시지만 처리)
+ verify(applicationEventPublisher, times(1)).publishEvent(any(LikeEvent.LikeAdded.class));
// markAsHandled는 한 번만 호출되어야 함 (첫 번째 메시지만 처리)
verify(eventHandledService, times(1)).markAsHandled(eventId, "LikeAdded", "like-events");
diff --git a/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java b/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java
index 2d762a879..1b31f4eac 100644
--- a/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java
+++ b/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java
@@ -137,4 +137,91 @@ public Long getSize(String key) {
return 0L;
}
}
+
+ /**
+ * 여러 ZSET을 합쳐서 새로운 ZSET을 생성합니다.
+ *
+ * ZUNIONSTORE 명령어를 사용하여 여러 소스 ZSET의 점수를 합산합니다.
+ * 같은 멤버가 여러 ZSET에 있으면 점수가 합산됩니다.
+ *
+ *
+ * 사용 사례:
+ *
+ * - 시간 단위 랭킹을 일간 랭킹으로 집계
+ * - Score Carry-Over: 오늘 랭킹을 내일 랭킹에 일부 반영
+ *
+ *
+ *
+ * @param destination 목적지 ZSET 키
+ * @param sourceKeys 소스 ZSET 키 목록
+ * @return 합쳐진 ZSET의 멤버 수
+ */
+ public Long unionStore(String destination, List sourceKeys) {
+ try {
+ if (sourceKeys.isEmpty()) {
+ log.warn("소스 키가 비어있습니다: destination={}", destination);
+ return 0L;
+ }
+
+ Long result = redisTemplate.opsForZSet().unionAndStore(
+ sourceKeys.get(0),
+ sourceKeys.subList(1, sourceKeys.size()),
+ destination
+ );
+ return result != null ? result : 0L;
+ } catch (Exception e) {
+ log.warn("ZSET 합치기 실패: destination={}, sourceKeys={}", destination, sourceKeys, e);
+ return 0L;
+ }
+ }
+
+ /**
+ * 단일 ZSET을 가중치를 적용하여 목적지 ZSET에 합산합니다.
+ *
+ * 소스 ZSET의 점수에 가중치를 곱한 후 목적지 ZSET에 합산합니다.
+ * 목적지 ZSET이 이미 존재하면 기존 점수에 합산됩니다.
+ *
+ *
+ * 사용 사례:
+ *
+ * - Score Carry-Over: 오늘 랭킹을 0.1 배율로 내일 랭킹에 반영
+ *
+ *
+ *
+ * @param destination 목적지 ZSET 키
+ * @param sourceKey 소스 ZSET 키
+ * @param weight 가중치 (예: 0.1 = 10%)
+ * @return 합쳐진 ZSET의 멤버 수
+ */
+ public Long unionStoreWithWeight(String destination, String sourceKey, double weight) {
+ try {
+ // ZUNIONSTORE를 사용하여 가중치 적용
+ // destination과 sourceKey를 합치되, sourceKey에만 가중치 적용
+ // 이를 위해 임시 키를 사용하거나 직접 구현
+
+ // 방법: sourceKey의 모든 멤버를 읽어서 가중치를 적용한 후 destination에 추가
+ Set> sourceMembers = redisTemplate.opsForZSet()
+ .rangeWithScores(sourceKey, 0, -1);
+
+ if (sourceMembers == null || sourceMembers.isEmpty()) {
+ return 0L;
+ }
+
+ // 가중치를 적용하여 destination에 추가
+ for (ZSetOperations.TypedTuple tuple : sourceMembers) {
+ String member = tuple.getValue();
+ Double originalScore = tuple.getScore();
+ if (member != null && originalScore != null) {
+ double weightedScore = originalScore * weight;
+ redisTemplate.opsForZSet().incrementScore(destination, member, weightedScore);
+ }
+ }
+
+ return (long) sourceMembers.size();
+ } catch (Exception e) {
+ log.warn("ZSET 가중치 합치기 실패: destination={}, sourceKey={}, weight={}",
+ destination, sourceKey, weight, e);
+ return 0L;
+ }
+ }
}