From d7c735daff3b724b8848f1cdbcbe4baeb41bce3b Mon Sep 17 00:00:00 2001 From: minor7295 Date: Wed, 24 Dec 2025 02:18:28 +0900 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20zset=20=EB=AA=A8=EB=93=88=EC=97=90?= =?UTF-8?q?=20zunionstore=20=EC=97=B0=EC=82=B0=20=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=EB=A9=94=EC=86=8C=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/loopers/zset/RedisZSetTemplate.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java b/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java index 2d762a879..1b31f4eac 100644 --- a/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java +++ b/modules/redis/src/main/java/com/loopers/zset/RedisZSetTemplate.java @@ -137,4 +137,91 @@ public Long getSize(String key) { return 0L; } } + + /** + * 여러 ZSET을 합쳐서 새로운 ZSET을 생성합니다. + *

+ * ZUNIONSTORE 명령어를 사용하여 여러 소스 ZSET의 점수를 합산합니다. + * 같은 멤버가 여러 ZSET에 있으면 점수가 합산됩니다. + *

+ *

+ * 사용 사례: + *

+ *

+ * + * @param destination 목적지 ZSET 키 + * @param sourceKeys 소스 ZSET 키 목록 + * @return 합쳐진 ZSET의 멤버 수 + */ + public Long unionStore(String destination, List sourceKeys) { + try { + if (sourceKeys.isEmpty()) { + log.warn("소스 키가 비어있습니다: destination={}", destination); + return 0L; + } + + Long result = redisTemplate.opsForZSet().unionAndStore( + sourceKeys.get(0), + sourceKeys.subList(1, sourceKeys.size()), + destination + ); + return result != null ? result : 0L; + } catch (Exception e) { + log.warn("ZSET 합치기 실패: destination={}, sourceKeys={}", destination, sourceKeys, e); + return 0L; + } + } + + /** + * 단일 ZSET을 가중치를 적용하여 목적지 ZSET에 합산합니다. + *

+ * 소스 ZSET의 점수에 가중치를 곱한 후 목적지 ZSET에 합산합니다. + * 목적지 ZSET이 이미 존재하면 기존 점수에 합산됩니다. + *

+ *

+ * 사용 사례: + *

+ *

+ * + * @param destination 목적지 ZSET 키 + * @param sourceKey 소스 ZSET 키 + * @param weight 가중치 (예: 0.1 = 10%) + * @return 합쳐진 ZSET의 멤버 수 + */ + public Long unionStoreWithWeight(String destination, String sourceKey, double weight) { + try { + // ZUNIONSTORE를 사용하여 가중치 적용 + // destination과 sourceKey를 합치되, sourceKey에만 가중치 적용 + // 이를 위해 임시 키를 사용하거나 직접 구현 + + // 방법: sourceKey의 모든 멤버를 읽어서 가중치를 적용한 후 destination에 추가 + Set> sourceMembers = redisTemplate.opsForZSet() + .rangeWithScores(sourceKey, 0, -1); + + if (sourceMembers == null || sourceMembers.isEmpty()) { + return 0L; + } + + // 가중치를 적용하여 destination에 추가 + for (ZSetOperations.TypedTuple tuple : sourceMembers) { + String member = tuple.getValue(); + Double originalScore = tuple.getScore(); + if (member != null && originalScore != null) { + double weightedScore = originalScore * weight; + redisTemplate.opsForZSet().incrementScore(destination, member, weightedScore); + } + } + + return (long) sourceMembers.size(); + } catch (Exception e) { + log.warn("ZSET 가중치 합치기 실패: destination={}, sourceKey={}, weight={}", + destination, sourceKey, weight, e); + return 0L; + } + } } From 06dce08db6aee451db5214177bf1c087b9b26f06 Mon Sep 17 00:00:00 2001 From: minor7295 Date: Wed, 24 Dec 2025 02:39:52 +0900 Subject: [PATCH 2/3] =?UTF-8?q?test:=20=EB=9E=AD=ED=82=B9=20=EC=A7=91?= =?UTF-8?q?=EA=B3=84=EC=97=90=20=ED=95=84=EC=9A=94=ED=95=9C=20=EB=8D=B0?= =?UTF-8?q?=EC=9D=B4=ED=84=B0=20=EC=88=98=EC=A7=91=EA=B3=BC=20=EB=9E=AD?= =?UTF-8?q?=ED=82=B9=20=EA=B3=84=EC=82=B0=20=EB=A1=9C=EC=A7=81=EC=9D=84=20?= =?UTF-8?q?application=20event=20=EA=B8=B0=EC=A4=80=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=B6=84=EB=A6=AC=ED=95=98=EB=8F=84=EB=A1=9D=20=ED=85=8C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interfaces/consumer/RankingConsumer.java | 52 +++--- .../ranking/RankingEventHandlerTest.java | 158 ++++++++++++++++++ .../ranking/RankingServiceTest.java | 95 +++++++++++ .../consumer/RankingConsumerTest.java | 42 ++--- 4 files changed, 291 insertions(+), 56 deletions(-) create mode 100644 apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java index 7b79bc95d..cfe6fd9b7 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/RankingConsumer.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.loopers.application.eventhandled.EventHandledService; -import com.loopers.application.ranking.RankingService; import com.loopers.confg.kafka.KafkaConfig; import com.loopers.domain.event.LikeEvent; import com.loopers.domain.event.OrderEvent; @@ -11,18 +10,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; -import java.time.LocalDate; import java.util.List; /** * 랭킹 집계 Kafka Consumer. *

- * Kafka에서 이벤트를 수취하여 Redis ZSET에 랭킹 점수를 적재합니다. + * Kafka에서 이벤트를 수취하여 Spring ApplicationEvent로 발행합니다. * 조회, 좋아요, 주문 이벤트를 기반으로 실시간 랭킹을 구축합니다. *

*

@@ -43,20 +42,22 @@ *

* 설계 원칙: *

    - *
  • Eventually Consistent: 일시적인 지연/중복 허용
  • - *
  • CQRS Read Model: Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET
  • + *
  • 관심사 분리: Consumer는 Kafka 메시지 수신/파싱만 담당, 비즈니스 로직은 EventHandler에서 처리
  • + *
  • 이벤트 핸들러 패턴: Kafka Event → Spring ApplicationEvent → RankingEventListener → RankingEventHandler
  • + *
  • Eventually Consistent: 일시적인 지연/중복 허용
  • + *
  • CQRS Read Model: Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET
  • *
*

* * @author Loopers - * @version 1.0 + * @version 2.0 */ @Slf4j @Component @RequiredArgsConstructor public class RankingConsumer { - private final RankingService rankingService; + private final ApplicationEventPublisher applicationEventPublisher; private final EventHandledService eventHandledService; private final ObjectMapper objectMapper; @@ -104,28 +105,31 @@ public void consumeLikeEvents( Object value = record.value(); String eventType; - LocalDate date = LocalDate.now(); // Spring Kafka가 자동으로 역직렬화한 경우 if (value instanceof LikeEvent.LikeAdded) { LikeEvent.LikeAdded event = (LikeEvent.LikeAdded) value; - rankingService.addLikeScore(event.productId(), date, true); + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); eventType = "LikeAdded"; } else if (value instanceof LikeEvent.LikeRemoved) { LikeEvent.LikeRemoved event = (LikeEvent.LikeRemoved) value; - rankingService.addLikeScore(event.productId(), date, false); + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); eventType = "LikeRemoved"; } else { // JSON 문자열인 경우 이벤트 타입 헤더로 구분 String eventTypeHeader = extractEventType(record); if ("LikeRemoved".equals(eventTypeHeader)) { LikeEvent.LikeRemoved event = parseLikeRemovedEvent(value); - rankingService.addLikeScore(event.productId(), date, false); + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); eventType = "LikeRemoved"; } else { // 기본값은 LikeAdded LikeEvent.LikeAdded event = parseLikeEvent(value); - rankingService.addLikeScore(event.productId(), date, true); + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); eventType = "LikeAdded"; } } @@ -202,23 +206,8 @@ public void consumeOrderEvents( Object value = record.value(); OrderEvent.OrderCreated event = parseOrderCreatedEvent(value); - LocalDate date = LocalDate.now(); - - // 주문 아이템별로 점수 집계 - // 주의: OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없으므로 - // subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함 - int totalQuantity = event.orderItems().stream() - .mapToInt(OrderEvent.OrderCreated.OrderItemInfo::quantity) - .sum(); - - if (totalQuantity > 0 && event.subtotal() != null) { - double averagePrice = (double) event.subtotal() / totalQuantity; - - for (OrderEvent.OrderCreated.OrderItemInfo item : event.orderItems()) { - double orderAmount = averagePrice * item.quantity(); - rankingService.addOrderScore(item.productId(), date, orderAmount); - } - } + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); // 이벤트 처리 기록 저장 eventHandledService.markAsHandled(eventId, "OrderCreated", "order-events"); @@ -284,9 +273,8 @@ public void consumeProductEvents( Object value = record.value(); ProductEvent.ProductViewed event = parseProductViewedEvent(value); - LocalDate date = LocalDate.now(); - - rankingService.addViewScore(event.productId(), date); + // Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트) + applicationEventPublisher.publishEvent(event); // 이벤트 처리 기록 저장 eventHandledService.markAsHandled(eventId, "ProductViewed", "product-events"); diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java new file mode 100644 index 000000000..a32182b98 --- /dev/null +++ b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingEventHandlerTest.java @@ -0,0 +1,158 @@ +package com.loopers.application.ranking; + +import com.loopers.domain.event.LikeEvent; +import com.loopers.domain.event.OrderEvent; +import com.loopers.domain.event.ProductEvent; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * RankingEventHandler 테스트. + */ +@ExtendWith(MockitoExtension.class) +class RankingEventHandlerTest { + + @Mock + private RankingService rankingService; + + @InjectMocks + private RankingEventHandler rankingEventHandler; + + @DisplayName("좋아요 추가 이벤트를 처리할 수 있다.") + @Test + void canHandleLikeAdded() { + // arrange + Long productId = 1L; + Long userId = 100L; + LikeEvent.LikeAdded event = new LikeEvent.LikeAdded(userId, productId, LocalDateTime.now()); + + // act + rankingEventHandler.handleLikeAdded(event); + + // assert + verify(rankingService).addLikeScore(eq(productId), any(LocalDate.class), eq(true)); + } + + @DisplayName("좋아요 취소 이벤트를 처리할 수 있다.") + @Test + void canHandleLikeRemoved() { + // arrange + Long productId = 1L; + Long userId = 100L; + LikeEvent.LikeRemoved event = new LikeEvent.LikeRemoved(userId, productId, LocalDateTime.now()); + + // act + rankingEventHandler.handleLikeRemoved(event); + + // assert + verify(rankingService).addLikeScore(eq(productId), any(LocalDate.class), eq(false)); + } + + @DisplayName("주문 생성 이벤트를 처리할 수 있다.") + @Test + void canHandleOrderCreated() { + // arrange + Long orderId = 1L; + Long userId = 100L; + OrderEvent.OrderCreated.OrderItemInfo item1 = + new OrderEvent.OrderCreated.OrderItemInfo(1L, 2); + OrderEvent.OrderCreated.OrderItemInfo item2 = + new OrderEvent.OrderCreated.OrderItemInfo(2L, 3); + OrderEvent.OrderCreated event = new OrderEvent.OrderCreated( + orderId, + userId, + null, // couponCode + 10000, // subtotal + null, // usedPointAmount + List.of(item1, item2), + LocalDateTime.now() + ); + + // act + rankingEventHandler.handleOrderCreated(event); + + // assert + // totalQuantity = 2 + 3 = 5 + // averagePrice = 10000 / 5 = 2000 + // item1: 2000 * 2 = 4000 + // item2: 2000 * 3 = 6000 + verify(rankingService).addOrderScore(eq(1L), any(LocalDate.class), eq(4000.0)); + verify(rankingService).addOrderScore(eq(2L), any(LocalDate.class), eq(6000.0)); + } + + @DisplayName("주문 아이템이 없으면 점수를 추가하지 않는다.") + @Test + void doesNothing_whenOrderItemsIsEmpty() { + // arrange + Long orderId = 1L; + Long userId = 100L; + OrderEvent.OrderCreated event = new OrderEvent.OrderCreated( + orderId, + userId, + null, // couponCode + 10000, // subtotal + null, // usedPointAmount + List.of(), + LocalDateTime.now() + ); + + // act + rankingEventHandler.handleOrderCreated(event); + + // assert + verify(rankingService, never()).addOrderScore(any(), any(), anyDouble()); + } + + @DisplayName("주문 subtotal이 null이면 점수를 추가하지 않는다.") + @Test + void doesNothing_whenSubtotalIsNull() { + // arrange + Long orderId = 1L; + Long userId = 100L; + OrderEvent.OrderCreated.OrderItemInfo item = + new OrderEvent.OrderCreated.OrderItemInfo(1L, 2); + OrderEvent.OrderCreated event = new OrderEvent.OrderCreated( + orderId, + userId, + null, // couponCode + null, // subtotal + null, // usedPointAmount + List.of(item), + LocalDateTime.now() + ); + + // act + rankingEventHandler.handleOrderCreated(event); + + // assert + verify(rankingService, never()).addOrderScore(any(), any(), anyDouble()); + } + + @DisplayName("상품 조회 이벤트를 처리할 수 있다.") + @Test + void canHandleProductViewed() { + // arrange + Long productId = 1L; + Long userId = 100L; + ProductEvent.ProductViewed event = new ProductEvent.ProductViewed(productId, userId, LocalDateTime.now()); + + // act + rankingEventHandler.handleProductViewed(event); + + // assert + verify(rankingService).addViewScore(eq(productId), any(LocalDate.class)); + } +} + diff --git a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java index cad3228aa..3851331be 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/application/ranking/RankingServiceTest.java @@ -11,7 +11,10 @@ import java.time.Duration; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -244,4 +247,96 @@ void accumulatesScoresForSameProduct() { // TTL 설정은 각 호출마다 수행됨 (incrementScore 내부에서 호출) verify(zSetTemplate, times(3)).setTtlIfNotExists(eq(expectedKey), eq(Duration.ofDays(2))); } + + @DisplayName("시간 단위 랭킹을 일간 랭킹으로 집계할 수 있다.") + @Test + void canAggregateHourlyToDaily() { + // arrange + LocalDate date = LocalDate.of(2024, 12, 15); + String dailyKey = "ranking:all:20241215"; + List expectedHourlyKeys = new ArrayList<>(); + + // 00시 ~ 23시 키 생성 + for (int hour = 0; hour < 24; hour++) { + LocalDateTime dateTime = date.atTime(hour, 0); + String hourlyKey = "ranking:hourly:" + dateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHH")); + expectedHourlyKeys.add(hourlyKey); + when(keyGenerator.generateHourlyKey(dateTime)).thenReturn(hourlyKey); + } + + when(keyGenerator.generateDailyKey(date)).thenReturn(dailyKey); + when(zSetTemplate.unionStore(eq(dailyKey), any(List.class))).thenReturn(100L); + + // act + Long result = rankingService.aggregateHourlyToDaily(date); + + // assert + assertThat(result).isEqualTo(100L); + verify(keyGenerator).generateDailyKey(date); + + // 24개의 시간 단위 키 생성 확인 + for (int hour = 0; hour < 24; hour++) { + LocalDateTime dateTime = date.atTime(hour, 0); + verify(keyGenerator).generateHourlyKey(dateTime); + } + + // ZUNIONSTORE 호출 확인 + ArgumentCaptor> keysCaptor = ArgumentCaptor.forClass(List.class); + verify(zSetTemplate).unionStore(eq(dailyKey), keysCaptor.capture()); + List capturedKeys = keysCaptor.getValue(); + assertThat(capturedKeys).hasSize(24); + assertThat(capturedKeys).containsAll(expectedHourlyKeys); + + // TTL 설정 확인 + verify(zSetTemplate).setTtlIfNotExists(eq(dailyKey), eq(Duration.ofDays(2))); + } + + @DisplayName("Score Carry-Over로 오늘 랭킹을 내일 랭킹에 반영할 수 있다.") + @Test + void canCarryOverScore() { + // arrange + LocalDate today = LocalDate.of(2024, 12, 15); + LocalDate tomorrow = LocalDate.of(2024, 12, 16); + String todayKey = "ranking:all:20241215"; + String tomorrowKey = "ranking:all:20241216"; + double carryOverWeight = 0.1; // 10% + + when(keyGenerator.generateDailyKey(today)).thenReturn(todayKey); + when(keyGenerator.generateDailyKey(tomorrow)).thenReturn(tomorrowKey); + when(zSetTemplate.unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight))) + .thenReturn(50L); + + // act + Long result = rankingService.carryOverScore(today, tomorrow, carryOverWeight); + + // assert + assertThat(result).isEqualTo(50L); + verify(keyGenerator).generateDailyKey(today); + verify(keyGenerator).generateDailyKey(tomorrow); + verify(zSetTemplate).unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight)); + verify(zSetTemplate).setTtlIfNotExists(eq(tomorrowKey), eq(Duration.ofDays(2))); + } + + @DisplayName("Score Carry-Over 가중치가 0일 때도 정상적으로 처리된다.") + @Test + void canCarryOverScore_withZeroWeight() { + // arrange + LocalDate today = LocalDate.of(2024, 12, 15); + LocalDate tomorrow = LocalDate.of(2024, 12, 16); + String todayKey = "ranking:all:20241215"; + String tomorrowKey = "ranking:all:20241216"; + double carryOverWeight = 0.0; + + when(keyGenerator.generateDailyKey(today)).thenReturn(todayKey); + when(keyGenerator.generateDailyKey(tomorrow)).thenReturn(tomorrowKey); + when(zSetTemplate.unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight))) + .thenReturn(0L); + + // act + Long result = rankingService.carryOverScore(today, tomorrow, carryOverWeight); + + // assert + assertThat(result).isEqualTo(0L); + verify(zSetTemplate).unionStoreWithWeight(eq(tomorrowKey), eq(todayKey), eq(carryOverWeight)); + } } diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java index 99e91a877..67485df03 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/RankingConsumerTest.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.loopers.application.eventhandled.EventHandledService; -import com.loopers.application.ranking.RankingService; import com.loopers.domain.event.LikeEvent; import com.loopers.domain.event.OrderEvent; import com.loopers.domain.event.ProductEvent; @@ -17,6 +16,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.kafka.support.Acknowledgment; @@ -35,7 +35,7 @@ class RankingConsumerTest { @Mock - private RankingService rankingService; + private ApplicationEventPublisher applicationEventPublisher; @Mock private EventHandledService eventHandledService; @@ -73,7 +73,7 @@ void canConsumeLikeAddedEvent() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService).addLikeScore(eq(productId), any(), eq(true)); + verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class)); verify(eventHandledService).markAsHandled(eventId, "LikeAdded", "like-events"); verify(acknowledgment).acknowledge(); } @@ -103,7 +103,7 @@ void canConsumeLikeRemovedEvent() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService).addLikeScore(eq(productId), any(), eq(false)); + verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeRemoved.class)); verify(eventHandledService).markAsHandled(eventId, "LikeRemoved", "like-events"); verify(acknowledgment).acknowledge(); } @@ -142,13 +142,7 @@ void canConsumeOrderCreatedEvent() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - - // 평균 단가 계산: 10000 / (3 + 2) = 2000 - // productId1: 2000 * 3 = 6000 - // productId2: 2000 * 2 = 4000 - verify(rankingService).addOrderScore(eq(productId1), any(), eq(6000.0)); - verify(rankingService).addOrderScore(eq(productId2), any(), eq(4000.0)); - + verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class)); verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events"); verify(acknowledgment).acknowledge(); } @@ -177,7 +171,7 @@ void canConsumeProductViewedEvent() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService).addViewScore(eq(productId), any()); + verify(applicationEventPublisher).publishEvent(any(ProductEvent.ProductViewed.class)); verify(eventHandledService).markAsHandled(eventId, "ProductViewed", "product-events"); verify(acknowledgment).acknowledge(); } @@ -214,8 +208,8 @@ void canConsumeMultipleEvents() { // assert verify(eventHandledService).isAlreadyHandled(eventId1); verify(eventHandledService).isAlreadyHandled(eventId2); - verify(rankingService).addLikeScore(eq(productId), any(), eq(true)); - verify(rankingService).addViewScore(eq(productId), any()); + verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class)); + verify(applicationEventPublisher).publishEvent(any(ProductEvent.ProductViewed.class)); verify(eventHandledService).markAsHandled(eventId1, "LikeAdded", "like-events"); verify(eventHandledService).markAsHandled(eventId2, "ProductViewed", "product-events"); verify(acknowledgment, times(2)).acknowledge(); @@ -245,7 +239,7 @@ void skipsAlreadyHandledEvent() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService, never()).addLikeScore(any(), any(), anyBoolean()); + verify(applicationEventPublisher, never()).publishEvent(any()); verify(eventHandledService, never()).markAsHandled(any(), any(), any()); verify(acknowledgment).acknowledge(); } @@ -268,7 +262,7 @@ void skipsEventWithoutEventId() { // assert verify(eventHandledService, never()).isAlreadyHandled(any()); - verify(rankingService, never()).addLikeScore(any(), any(), anyBoolean()); + verify(applicationEventPublisher, never()).publishEvent(any()); verify(acknowledgment).acknowledge(); } @@ -291,8 +285,6 @@ void continuesProcessing_whenIndividualEventFails() { when(eventHandledService.isAlreadyHandled(eventId1)).thenReturn(false); when(eventHandledService.isAlreadyHandled(eventId2)).thenReturn(false); - doThrow(new RuntimeException("처리 실패")) - .when(rankingService).addLikeScore(any(), any(), anyBoolean()); List> records = List.of( new ConsumerRecord<>("like-events", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "key", invalidEvent, headers1, Optional.empty()), @@ -305,7 +297,9 @@ void continuesProcessing_whenIndividualEventFails() { // assert verify(eventHandledService).isAlreadyHandled(eventId1); verify(eventHandledService).isAlreadyHandled(eventId2); - verify(rankingService, atLeastOnce()).addLikeScore(any(), any(), anyBoolean()); + // 첫 번째 이벤트는 파싱 실패로 publishEvent가 호출되지 않음 + // 두 번째 이벤트는 정상적으로 publishEvent가 호출됨 + verify(applicationEventPublisher, times(1)).publishEvent(any(LikeEvent.LikeAdded.class)); verify(acknowledgment).acknowledge(); } @@ -335,7 +329,7 @@ void handlesDataIntegrityViolationException() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService).addLikeScore(eq(productId), any(), eq(true)); + verify(applicationEventPublisher).publishEvent(any(LikeEvent.LikeAdded.class)); verify(eventHandledService).markAsHandled(eventId, "LikeAdded", "like-events"); verify(acknowledgment).acknowledge(); } @@ -371,7 +365,7 @@ void doesNotAddScore_whenTotalQuantityIsZero() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService, never()).addOrderScore(any(), any(), anyDouble()); + verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class)); verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events"); verify(acknowledgment).acknowledge(); } @@ -407,7 +401,7 @@ void doesNotAddScore_whenSubtotalIsNull() { // assert verify(eventHandledService).isAlreadyHandled(eventId); - verify(rankingService, never()).addOrderScore(any(), any(), anyDouble()); + verify(applicationEventPublisher).publishEvent(any(OrderEvent.OrderCreated.class)); verify(eventHandledService).markAsHandled(eventId, "OrderCreated", "order-events"); verify(acknowledgment).acknowledge(); } @@ -444,8 +438,8 @@ void handlesDuplicateMessagesIdempotently() { // isAlreadyHandled는 3번 호출됨 (각 메시지마다) verify(eventHandledService, times(3)).isAlreadyHandled(eventId); - // addLikeScore는 한 번만 호출되어야 함 (첫 번째 메시지만 처리) - verify(rankingService, times(1)).addLikeScore(eq(productId), any(), eq(true)); + // publishEvent는 한 번만 호출되어야 함 (첫 번째 메시지만 처리) + verify(applicationEventPublisher, times(1)).publishEvent(any(LikeEvent.LikeAdded.class)); // markAsHandled는 한 번만 호출되어야 함 (첫 번째 메시지만 처리) verify(eventHandledService, times(1)).markAsHandled(eventId, "LikeAdded", "like-events"); From 472dbc9435b84e2e832a1cf23b5286225cd91349 Mon Sep 17 00:00:00 2001 From: minor7295 Date: Wed, 24 Dec 2025 02:40:13 +0900 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=EB=9E=AD=ED=82=B9=20=EC=A7=91?= =?UTF-8?q?=EA=B3=84=EC=97=90=20=ED=95=84=EC=9A=94=ED=95=9C=20=EB=8D=B0?= =?UTF-8?q?=EC=9D=B4=ED=84=B0=20=EC=88=98=EC=A7=91=EA=B3=BC=20=EB=9E=AD?= =?UTF-8?q?=ED=82=B9=20=EA=B3=84=EC=82=B0=20=EB=A1=9C=EC=A7=81=EC=9D=84=20?= =?UTF-8?q?application=20event=20=EA=B8=B0=EC=A4=80=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=B6=84=EB=A6=AC=ED=95=98=EB=8F=84=EB=A1=9D=20=ED=95=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ranking/RankingEventHandler.java | 117 +++++++++++++++++ .../application/ranking/RankingService.java | 60 +++++++++ .../event/ranking/RankingEventListener.java | 121 ++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingEventHandler.java create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingEventHandler.java b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingEventHandler.java new file mode 100644 index 000000000..016670ca7 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingEventHandler.java @@ -0,0 +1,117 @@ +package com.loopers.application.ranking; + +import com.loopers.domain.event.LikeEvent; +import com.loopers.domain.event.OrderEvent; +import com.loopers.domain.event.ProductEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; + +/** + * 랭킹 이벤트 핸들러. + *

+ * 좋아요 추가/취소, 주문 생성, 상품 조회 이벤트를 받아 랭킹 점수를 집계하는 애플리케이션 로직을 처리합니다. + *

+ *

+ * DDD/EDA 관점: + *

    + *
  • 책임 분리: RankingService는 랭킹 점수 계산/적재, RankingEventHandler는 이벤트 처리 로직
  • + *
  • 이벤트 핸들러: 이벤트를 받아서 처리하는 역할을 명확히 나타냄
  • + *
  • 도메인 경계 준수: 랭킹은 파생 View로 취급하며, 도메인 이벤트를 구독하여 집계
  • + *
+ *

+ * + * @author Loopers + * @version 1.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingEventHandler { + + private final RankingService rankingService; + + /** + * 좋아요 추가 이벤트를 처리하여 랭킹 점수를 추가합니다. + * + * @param event 좋아요 추가 이벤트 + */ + public void handleLikeAdded(LikeEvent.LikeAdded event) { + log.debug("좋아요 추가 이벤트 처리: productId={}, userId={}", + event.productId(), event.userId()); + + LocalDate date = LocalDate.now(); + rankingService.addLikeScore(event.productId(), date, true); + + log.debug("좋아요 점수 추가 완료: productId={}", event.productId()); + } + + /** + * 좋아요 취소 이벤트를 처리하여 랭킹 점수를 차감합니다. + * + * @param event 좋아요 취소 이벤트 + */ + public void handleLikeRemoved(LikeEvent.LikeRemoved event) { + log.debug("좋아요 취소 이벤트 처리: productId={}, userId={}", + event.productId(), event.userId()); + + LocalDate date = LocalDate.now(); + rankingService.addLikeScore(event.productId(), date, false); + + log.debug("좋아요 점수 차감 완료: productId={}", event.productId()); + } + + /** + * 주문 생성 이벤트를 처리하여 랭킹 점수를 추가합니다. + *

+ * 주문 금액 계산: + *

    + *
  • OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없음
  • + *
  • subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함
  • + *
  • 향후 개선: 주문 이벤트에 개별 상품 가격 정보 추가
  • + *
+ *

+ * + * @param event 주문 생성 이벤트 + */ + public void handleOrderCreated(OrderEvent.OrderCreated event) { + log.debug("주문 생성 이벤트 처리: orderId={}", event.orderId()); + + LocalDate date = LocalDate.now(); + + // 주문 아이템별로 점수 집계 + // 주의: OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없으므로 + // subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함 + int totalQuantity = event.orderItems().stream() + .mapToInt(OrderEvent.OrderCreated.OrderItemInfo::quantity) + .sum(); + + if (totalQuantity > 0 && event.subtotal() != null) { + double averagePrice = (double) event.subtotal() / totalQuantity; + + for (OrderEvent.OrderCreated.OrderItemInfo item : event.orderItems()) { + double orderAmount = averagePrice * item.quantity(); + rankingService.addOrderScore(item.productId(), date, orderAmount); + } + } + + log.debug("주문 점수 추가 완료: orderId={}", event.orderId()); + } + + /** + * 상품 조회 이벤트를 처리하여 랭킹 점수를 추가합니다. + * + * @param event 상품 조회 이벤트 + */ + public void handleProductViewed(ProductEvent.ProductViewed event) { + log.debug("상품 조회 이벤트 처리: productId={}", event.productId()); + + LocalDate date = LocalDate.now(); + rankingService.addViewScore(event.productId(), date); + + log.debug("조회 점수 추가 완료: productId={}", event.productId()); + } +} + diff --git a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingService.java b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingService.java index 4e6e2293b..dfdf5d57e 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingService.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/application/ranking/RankingService.java @@ -7,6 +7,9 @@ import java.time.Duration; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -120,6 +123,63 @@ public void addScoresBatch(Map scoreMap, LocalDate date) { log.debug("배치 점수 적재 완료: date={}, count={}", date, scoreMap.size()); } + /** + * 시간 단위 랭킹을 일간 랭킹으로 집계합니다. + *

+ * 하루의 모든 시간 단위 랭킹을 ZUNIONSTORE로 합쳐서 일간 랭킹을 생성합니다. + *

+ * + * @param date 날짜 + * @return 집계된 멤버 수 + */ + public Long aggregateHourlyToDaily(LocalDate date) { + String dailyKey = keyGenerator.generateDailyKey(date); + List hourlyKeys = new ArrayList<>(); + + // 해당 날짜의 모든 시간 단위 키 생성 (00시 ~ 23시) + for (int hour = 0; hour < 24; hour++) { + LocalDateTime dateTime = date.atTime(hour, 0); + String hourlyKey = keyGenerator.generateHourlyKey(dateTime); + hourlyKeys.add(hourlyKey); + } + + // ZUNIONSTORE로 모든 시간 단위 랭킹을 일간 랭킹으로 집계 + Long result = zSetTemplate.unionStore(dailyKey, hourlyKeys); + + // TTL 설정 + zSetTemplate.setTtlIfNotExists(dailyKey, TTL); + + log.info("시간 단위 랭킹을 일간 랭킹으로 집계 완료: date={}, memberCount={}", date, result); + return result; + } + + /** + * Score Carry-Over: 오늘의 랭킹을 내일 랭킹에 일부 반영합니다. + *

+ * 콜드 스타트 문제를 완화하기 위해 오늘의 랭킹을 가중치를 적용하여 내일 랭킹에 반영합니다. + * 예: 오늘 랭킹의 10%를 내일 랭킹에 반영 + *

+ * + * @param today 오늘 날짜 + * @param tomorrow 내일 날짜 + * @param carryOverWeight Carry-Over 가중치 (예: 0.1 = 10%) + * @return 반영된 멤버 수 + */ + public Long carryOverScore(LocalDate today, LocalDate tomorrow, double carryOverWeight) { + String todayKey = keyGenerator.generateDailyKey(today); + String tomorrowKey = keyGenerator.generateDailyKey(tomorrow); + + // 오늘 랭킹을 가중치를 적용하여 내일 랭킹에 합산 + Long result = zSetTemplate.unionStoreWithWeight(tomorrowKey, todayKey, carryOverWeight); + + // TTL 설정 + zSetTemplate.setTtlIfNotExists(tomorrowKey, TTL); + + log.info("Score Carry-Over 완료: today={}, tomorrow={}, weight={}, memberCount={}", + today, tomorrow, carryOverWeight, result); + return result; + } + /** * ZSET에 점수를 증가시킵니다. *

diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java new file mode 100644 index 000000000..b72cc4a48 --- /dev/null +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/event/ranking/RankingEventListener.java @@ -0,0 +1,121 @@ +package com.loopers.interfaces.event.ranking; + +import com.loopers.application.ranking.RankingEventHandler; +import com.loopers.domain.event.LikeEvent; +import com.loopers.domain.event.OrderEvent; +import com.loopers.domain.event.ProductEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * 랭킹 이벤트 리스너. + *

+ * 좋아요 추가/취소, 주문 생성, 상품 조회 이벤트를 받아서 랭킹 점수를 집계하는 인터페이스 레이어의 어댑터입니다. + *

+ *

+ * 레이어 역할: + *

    + *
  • 인터페이스 레이어: 외부 이벤트(도메인 이벤트)를 받아서 애플리케이션 핸들러를 호출하는 어댑터
  • + *
  • 비즈니스 로직 없음: 단순히 이벤트를 받아서 애플리케이션 핸들러를 호출하는 역할만 수행
  • + *
+ *

+ *

+ * EDA 원칙: + *

    + *
  • 비동기 처리: @Async로 집계 처리를 비동기로 실행하여 Kafka Consumer의 성능에 영향 없음
  • + *
  • 이벤트 기반: 좋아요, 주문, 조회 이벤트를 구독하여 랭킹 점수 집계
  • + *
+ *

+ * + * @author Loopers + * @version 1.0 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RankingEventListener { + + private final RankingEventHandler rankingEventHandler; + + /** + * 좋아요 추가 이벤트를 처리합니다. + *

+ * 비동기로 실행되어 랭킹 점수를 집계합니다. + *

+ * + * @param event 좋아요 추가 이벤트 + */ + @Async + @EventListener + public void handleLikeAdded(LikeEvent.LikeAdded event) { + try { + rankingEventHandler.handleLikeAdded(event); + } catch (Exception e) { + log.error("좋아요 추가 이벤트 처리 중 오류 발생: productId={}, userId={}", + event.productId(), event.userId(), e); + // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴 + } + } + + /** + * 좋아요 취소 이벤트를 처리합니다. + *

+ * 비동기로 실행되어 랭킹 점수를 차감합니다. + *

+ * + * @param event 좋아요 취소 이벤트 + */ + @Async + @EventListener + public void handleLikeRemoved(LikeEvent.LikeRemoved event) { + try { + rankingEventHandler.handleLikeRemoved(event); + } catch (Exception e) { + log.error("좋아요 취소 이벤트 처리 중 오류 발생: productId={}, userId={}", + event.productId(), event.userId(), e); + // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴 + } + } + + /** + * 주문 생성 이벤트를 처리합니다. + *

+ * 비동기로 실행되어 랭킹 점수를 집계합니다. + *

+ * + * @param event 주문 생성 이벤트 + */ + @Async + @EventListener + public void handleOrderCreated(OrderEvent.OrderCreated event) { + try { + rankingEventHandler.handleOrderCreated(event); + } catch (Exception e) { + log.error("주문 생성 이벤트 처리 중 오류 발생: orderId={}", event.orderId(), e); + // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴 + } + } + + /** + * 상품 조회 이벤트를 처리합니다. + *

+ * 비동기로 실행되어 랭킹 점수를 집계합니다. + *

+ * + * @param event 상품 조회 이벤트 + */ + @Async + @EventListener + public void handleProductViewed(ProductEvent.ProductViewed event) { + try { + rankingEventHandler.handleProductViewed(event); + } catch (Exception e) { + log.error("상품 조회 이벤트 처리 중 오류 발생: productId={}", event.productId(), e); + // 이벤트 처리 실패는 다른 리스너에 영향을 주지 않도록 예외를 삼킴 + } + } +} +