Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.loopers.application.ranking;

import com.loopers.domain.event.LikeEvent;
import com.loopers.domain.event.OrderEvent;
import com.loopers.domain.event.ProductEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.LocalDate;

/**
* 랭킹 이벤트 핸들러.
* <p>
* 좋아요 추가/취소, 주문 생성, 상품 조회 이벤트를 받아 랭킹 점수를 집계하는 애플리케이션 로직을 처리합니다.
* </p>
* <p>
* <b>DDD/EDA 관점:</b>
* <ul>
* <li><b>책임 분리:</b> RankingService는 랭킹 점수 계산/적재, RankingEventHandler는 이벤트 처리 로직</li>
* <li><b>이벤트 핸들러:</b> 이벤트를 받아서 처리하는 역할을 명확히 나타냄</li>
* <li><b>도메인 경계 준수:</b> 랭킹은 파생 View로 취급하며, 도메인 이벤트를 구독하여 집계</li>
* </ul>
* </p>
*
* @author Loopers
* @version 1.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RankingEventHandler {

private final RankingService rankingService;

/**
* 좋아요 추가 이벤트를 처리하여 랭킹 점수를 추가합니다.
*
* @param event 좋아요 추가 이벤트
*/
public void handleLikeAdded(LikeEvent.LikeAdded event) {
log.debug("좋아요 추가 이벤트 처리: productId={}, userId={}",
event.productId(), event.userId());

LocalDate date = LocalDate.now();
rankingService.addLikeScore(event.productId(), date, true);

log.debug("좋아요 점수 추가 완료: productId={}", event.productId());
}

/**
* 좋아요 취소 이벤트를 처리하여 랭킹 점수를 차감합니다.
*
* @param event 좋아요 취소 이벤트
*/
public void handleLikeRemoved(LikeEvent.LikeRemoved event) {
log.debug("좋아요 취소 이벤트 처리: productId={}, userId={}",
event.productId(), event.userId());

LocalDate date = LocalDate.now();
rankingService.addLikeScore(event.productId(), date, false);

log.debug("좋아요 점수 차감 완료: productId={}", event.productId());
}

/**
* 주문 생성 이벤트를 처리하여 랭킹 점수를 추가합니다.
* <p>
* <b>주문 금액 계산:</b>
* <ul>
* <li>OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없음</li>
* <li>subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함</li>
* <li>향후 개선: 주문 이벤트에 개별 상품 가격 정보 추가</li>
* </ul>
* </p>
*
* @param event 주문 생성 이벤트
*/
public void handleOrderCreated(OrderEvent.OrderCreated event) {
log.debug("주문 생성 이벤트 처리: orderId={}", event.orderId());

LocalDate date = LocalDate.now();

// 주문 아이템별로 점수 집계
// 주의: OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없으므로
// subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함
int totalQuantity = event.orderItems().stream()
.mapToInt(OrderEvent.OrderCreated.OrderItemInfo::quantity)
.sum();

if (totalQuantity > 0 && event.subtotal() != null) {
double averagePrice = (double) event.subtotal() / totalQuantity;

for (OrderEvent.OrderCreated.OrderItemInfo item : event.orderItems()) {
double orderAmount = averagePrice * item.quantity();
rankingService.addOrderScore(item.productId(), date, orderAmount);
}
}

log.debug("주문 점수 추가 완료: orderId={}", event.orderId());
}

/**
* 상품 조회 이벤트를 처리하여 랭킹 점수를 추가합니다.
*
* @param event 상품 조회 이벤트
*/
public void handleProductViewed(ProductEvent.ProductViewed event) {
log.debug("상품 조회 이벤트 처리: productId={}", event.productId());

LocalDate date = LocalDate.now();
rankingService.addViewScore(event.productId(), date);

log.debug("조회 점수 추가 완료: productId={}", event.productId());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -120,6 +123,63 @@ public void addScoresBatch(Map<Long, Double> scoreMap, LocalDate date) {
log.debug("배치 점수 적재 완료: date={}, count={}", date, scoreMap.size());
}

/**
* 시간 단위 랭킹을 일간 랭킹으로 집계합니다.
* <p>
* 하루의 모든 시간 단위 랭킹을 ZUNIONSTORE로 합쳐서 일간 랭킹을 생성합니다.
* </p>
*
* @param date 날짜
* @return 집계된 멤버 수
*/
public Long aggregateHourlyToDaily(LocalDate date) {
String dailyKey = keyGenerator.generateDailyKey(date);
List<String> hourlyKeys = new ArrayList<>();

// 해당 날짜의 모든 시간 단위 키 생성 (00시 ~ 23시)
for (int hour = 0; hour < 24; hour++) {
LocalDateTime dateTime = date.atTime(hour, 0);
String hourlyKey = keyGenerator.generateHourlyKey(dateTime);
hourlyKeys.add(hourlyKey);
}

// ZUNIONSTORE로 모든 시간 단위 랭킹을 일간 랭킹으로 집계
Long result = zSetTemplate.unionStore(dailyKey, hourlyKeys);

// TTL 설정
zSetTemplate.setTtlIfNotExists(dailyKey, TTL);

log.info("시간 단위 랭킹을 일간 랭킹으로 집계 완료: date={}, memberCount={}", date, result);
return result;
}

/**
* Score Carry-Over: 오늘의 랭킹을 내일 랭킹에 일부 반영합니다.
* <p>
* 콜드 스타트 문제를 완화하기 위해 오늘의 랭킹을 가중치를 적용하여 내일 랭킹에 반영합니다.
* 예: 오늘 랭킹의 10%를 내일 랭킹에 반영
* </p>
*
* @param today 오늘 날짜
* @param tomorrow 내일 날짜
* @param carryOverWeight Carry-Over 가중치 (예: 0.1 = 10%)
* @return 반영된 멤버 수
*/
public Long carryOverScore(LocalDate today, LocalDate tomorrow, double carryOverWeight) {
String todayKey = keyGenerator.generateDailyKey(today);
String tomorrowKey = keyGenerator.generateDailyKey(tomorrow);

// 오늘 랭킹을 가중치를 적용하여 내일 랭킹에 합산
Long result = zSetTemplate.unionStoreWithWeight(tomorrowKey, todayKey, carryOverWeight);

// TTL 설정
zSetTemplate.setTtlIfNotExists(tomorrowKey, TTL);

log.info("Score Carry-Over 완료: today={}, tomorrow={}, weight={}, memberCount={}",
today, tomorrow, carryOverWeight, result);
return result;
}

/**
* ZSET에 점수를 증가시킵니다.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.application.eventhandled.EventHandledService;
import com.loopers.application.ranking.RankingService;
import com.loopers.confg.kafka.KafkaConfig;
import com.loopers.domain.event.LikeEvent;
import com.loopers.domain.event.OrderEvent;
Expand All @@ -11,18 +10,18 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.List;

/**
* 랭킹 집계 Kafka Consumer.
* <p>
* Kafka에서 이벤트를 수취하여 Redis ZSET에 랭킹 점수를 적재합니다.
* Kafka에서 이벤트를 수취하여 Spring ApplicationEvent로 발행합니다.
* 조회, 좋아요, 주문 이벤트를 기반으로 실시간 랭킹을 구축합니다.
* </p>
* <p>
Expand All @@ -43,20 +42,22 @@
* <p>
* <b>설계 원칙:</b>
* <ul>
* <li>Eventually Consistent: 일시적인 지연/중복 허용</li>
* <li>CQRS Read Model: Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET</li>
* <li><b>관심사 분리:</b> Consumer는 Kafka 메시지 수신/파싱만 담당, 비즈니스 로직은 EventHandler에서 처리</li>
* <li><b>이벤트 핸들러 패턴:</b> Kafka Event → Spring ApplicationEvent → RankingEventListener → RankingEventHandler</li>
* <li><b>Eventually Consistent:</b> 일시적인 지연/중복 허용</li>
* <li><b>CQRS Read Model:</b> Write Side(도메인) → Kafka → Read Side(Application) → Redis ZSET</li>
* </ul>
* </p>
*
* @author Loopers
* @version 1.0
* @version 2.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RankingConsumer {

private final RankingService rankingService;
private final ApplicationEventPublisher applicationEventPublisher;
private final EventHandledService eventHandledService;
private final ObjectMapper objectMapper;

Expand Down Expand Up @@ -104,28 +105,31 @@ public void consumeLikeEvents(

Object value = record.value();
String eventType;
LocalDate date = LocalDate.now();

// Spring Kafka가 자동으로 역직렬화한 경우
if (value instanceof LikeEvent.LikeAdded) {
LikeEvent.LikeAdded event = (LikeEvent.LikeAdded) value;
rankingService.addLikeScore(event.productId(), date, true);
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);
eventType = "LikeAdded";
} else if (value instanceof LikeEvent.LikeRemoved) {
LikeEvent.LikeRemoved event = (LikeEvent.LikeRemoved) value;
rankingService.addLikeScore(event.productId(), date, false);
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);
eventType = "LikeRemoved";
} else {
// JSON 문자열인 경우 이벤트 타입 헤더로 구분
String eventTypeHeader = extractEventType(record);
if ("LikeRemoved".equals(eventTypeHeader)) {
LikeEvent.LikeRemoved event = parseLikeRemovedEvent(value);
rankingService.addLikeScore(event.productId(), date, false);
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);
eventType = "LikeRemoved";
} else {
// 기본값은 LikeAdded
LikeEvent.LikeAdded event = parseLikeEvent(value);
rankingService.addLikeScore(event.productId(), date, true);
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);
eventType = "LikeAdded";
}
}
Expand Down Expand Up @@ -202,23 +206,8 @@ public void consumeOrderEvents(
Object value = record.value();
OrderEvent.OrderCreated event = parseOrderCreatedEvent(value);

LocalDate date = LocalDate.now();

// 주문 아이템별로 점수 집계
// 주의: OrderEvent.OrderCreated에는 개별 상품 가격 정보가 없으므로
// subtotal을 totalQuantity로 나눠서 평균 단가를 구하고, 각 아이템의 quantity를 곱함
int totalQuantity = event.orderItems().stream()
.mapToInt(OrderEvent.OrderCreated.OrderItemInfo::quantity)
.sum();

if (totalQuantity > 0 && event.subtotal() != null) {
double averagePrice = (double) event.subtotal() / totalQuantity;

for (OrderEvent.OrderCreated.OrderItemInfo item : event.orderItems()) {
double orderAmount = averagePrice * item.quantity();
rankingService.addOrderScore(item.productId(), date, orderAmount);
}
}
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);

// 이벤트 처리 기록 저장
eventHandledService.markAsHandled(eventId, "OrderCreated", "order-events");
Expand Down Expand Up @@ -284,9 +273,8 @@ public void consumeProductEvents(
Object value = record.value();
ProductEvent.ProductViewed event = parseProductViewedEvent(value);

LocalDate date = LocalDate.now();

rankingService.addViewScore(event.productId(), date);
// Spring ApplicationEvent 발행 (애플리케이션 내부 이벤트)
applicationEventPublisher.publishEvent(event);

// 이벤트 처리 기록 저장
eventHandledService.markAsHandled(eventId, "ProductViewed", "product-events");
Expand Down
Loading