Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.smore.bidcompetition.application.factory;

import com.smore.bidcompetition.application.handler.BidResultFinalizedHandler;
import com.smore.bidcompetition.application.handler.ClearStockHandler;
import com.smore.bidcompetition.application.handler.InventoryConfirmTimeout;
import com.smore.bidcompetition.application.handler.ProductInventoryAdjustedHandler;
import com.smore.bidcompetition.application.handler.OutboxHandler;
import com.smore.bidcompetition.application.handler.SaveStockHandler;
import com.smore.bidcompetition.application.handler.WinnerCreatedHandler;
import com.smore.bidcompetition.domain.model.Outbox;
import com.smore.bidcompetition.infrastructure.redis.StockRedisService;
import io.micrometer.tracing.propagation.Propagator;
import io.micrometer.tracing.Tracer;
import lombok.RequiredArgsConstructor;
Expand All @@ -32,13 +35,16 @@ public class OutboxHandlerFactory {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Tracer tracer;
private final Propagator propagator;
private final StockRedisService stockRedisService;

public OutboxHandler from(Outbox outbox) {
return switch (outbox.getEventType()) {
case BID_WINNER_SELECTED -> new WinnerCreatedHandler(tracer, propagator, winerCreatedTopic, kafkaTemplate, outbox);
case PRODUCT_INVENTORY_ADJUSTED -> new ProductInventoryAdjustedHandler(tracer, propagator, productInventoryAdjustedTopic, kafkaTemplate, outbox);
case BID_RESULT_FINALIZED -> new BidResultFinalizedHandler(tracer, propagator, bidResultFinalizedTopic, kafkaTemplate, outbox);
case BID_INVENTORY_CONFIRM_TIMEOUT -> new InventoryConfirmTimeout(tracer, propagator, bidInventoryConfirmTimeoutTopic, kafkaTemplate, outbox);
case SAVE_STOCK -> new SaveStockHandler(tracer, propagator, stockRedisService, outbox);
case DELETE_STOCK -> new ClearStockHandler(tracer, propagator, stockRedisService, outbox);
default -> throw new IllegalArgumentException(
"지원되지 않은 이벤트입니다." + outbox.getEventType()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.smore.bidcompetition.application.handler;

import com.smore.bidcompetition.domain.model.Outbox;
import com.smore.bidcompetition.domain.status.OutboxResult;
import com.smore.bidcompetition.infrastructure.redis.StockRedisService;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "ClearStockHandler")
public class ClearStockHandler implements OutboxHandler{

private final StockRedisService stockRedisService;
private final Outbox outbox;
private final Tracer tracer;
private final Propagator propagator;

public ClearStockHandler(Tracer tracer, Propagator propagator,
StockRedisService stockRedisService, Outbox outbox) {
this.tracer = tracer;
this.propagator = propagator;
this.stockRedisService = stockRedisService;
this.outbox = outbox;
}

@Override
public OutboxResult execute() {

Span newSpan = restoreAndStartSpan();

try (Tracer.SpanInScope ws = tracer.withSpan(newSpan)) {
try {
boolean deleted = stockRedisService.deleteStock(outbox.getAggregateId());
if (!deleted) {
log.info("stock 키가 없어 삭제할 게 없습니다. bidId={}", outbox.getAggregateId());
} else {
log.info("stock 키 삭제 완료. bidId={}", outbox.getAggregateId());
}
return OutboxResult.SUCCESS;
} catch (Exception e) {
log.error("stock 키 삭제 중 예외. bidId={}", outbox.getAggregateId(), e);
return OutboxResult.FAIL;
}
} finally {
newSpan.end();
}
}

private Span restoreAndStartSpan() {
Span.Builder spanBuilder = propagator.extract(outbox, (carrier, key) -> {
if ("X-B3-TraceId".equalsIgnoreCase(key)) {
return carrier.getTraceId();
}
if ("X-B3-SpanId".equalsIgnoreCase(key)) {
return carrier.getSpanId();
}
return null;
});

Span newSpan = spanBuilder
.name("redis-clear-stock")
.start();
return newSpan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.smore.bidcompetition.application.handler;

import com.smore.bidcompetition.domain.model.Outbox;
import com.smore.bidcompetition.domain.status.OutboxResult;
import com.smore.bidcompetition.infrastructure.redis.StockRedisService;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "SaveStockHandler")
public class SaveStockHandler implements OutboxHandler{

private final StockRedisService stockRedisService;
private final Outbox outbox;
private final Tracer tracer;
private final Propagator propagator;

public SaveStockHandler(Tracer tracer, Propagator propagator,
StockRedisService stockRedisService, Outbox outbox) {
this.tracer = tracer;
this.propagator = propagator;
this.stockRedisService = stockRedisService;
this.outbox = outbox;
}

@Override
public OutboxResult execute() {

Span newSpan = restoreAndStartSpan();

try (Tracer.SpanInScope ws = tracer.withSpan(newSpan)) {
try {
long setResult = stockRedisService.setStock(outbox.getAggregateId(), Integer.parseInt(outbox.getPayload()));

if (setResult == -1L) {
log.error("재고 초기화 실패: bidId={}, stock={}",outbox.getAggregateId(), Integer.parseInt(outbox.getPayload()));
return OutboxResult.FAIL;
} else if (setResult == 0L) {
log.info("이미 재고 키가 존재합니다. bidId={}", outbox.getAggregateId(), Integer.parseInt(outbox.getPayload()));
} else {
log.info("재고 초기화 완료: bidId={}, stock={}", outbox.getAggregateId(), Integer.parseInt(outbox.getPayload()));
}
} catch (Exception e) {
log.error("재고 초기화 중 예외 발생. bidId={}", outbox.getAggregateId(), e);
return OutboxResult.FAIL;
}
return OutboxResult.SUCCESS;
} finally {
newSpan.end();
}
}

private Span restoreAndStartSpan() {
Span.Builder spanBuilder = propagator.extract(outbox, (carrier, key) -> {
if ("X-B3-TraceId".equalsIgnoreCase(key)) {
return carrier.getTraceId();
}
if ("X-B3-SpanId".equalsIgnoreCase(key)) {
return carrier.getSpanId();
}
return null;
});

Span newSpan = spanBuilder
.name("redis-saved-stock")
.start();
return newSpan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.smore.bidcompetition.domain.status.EventStatus;
import com.smore.bidcompetition.domain.model.Outbox;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

Expand All @@ -12,6 +14,8 @@ public interface OutboxRepository {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

Outbox save(Outbox outbox);

int claim(Long outboxId, EventStatus eventStatus);
Expand All @@ -22,4 +26,6 @@ public interface OutboxRepository {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,22 @@ public void createBid(BidCreateCommand command) {

BidCompetition saved = bidCompetitionRepository.save(newBid);

try {
long setResult = stockRedisService.setStock(saved.getId(), saved.getTotalQuantity());

if (setResult == -1L) {
log.error("재고 초기화 실패: bidId={}, stock={}", saved.getId(), saved.getTotalQuantity());
} else if (setResult == 0L) {
log.info("이미 재고 키가 존재합니다. bidId={}", saved.getId());
} else {
log.info("재고 초기화 완료: bidId={}, stock={}", saved.getId(), setResult);
}
} catch (Exception e) {
log.error("재고 초기화 중 예외 발생. bidId={}", saved.getId(), e);
Outbox outbox = Outbox.create(
AggregateType.BID,
saved.getId(),
EventType.SAVE_STOCK,
UUID.randomUUID(),
String.valueOf(saved.getTotalQuantity())
);

if (tracer.currentSpan() != null) {
outbox.attachTracing(
tracer.currentSpan().context().traceId(),
tracer.currentSpan().context().spanId()
);
}

outboxRepository.save(outbox);
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ public void finalizeBid(UUID bidId, LocalDateTime now) {
}

outboxRepository.save(outbox);

Outbox redisOutbox = Outbox.create(
AggregateType.BID,
bid.getId(),
EventType.DELETE_STOCK,
UUID.randomUUID(),
""
);

if (tracer.currentSpan() != null) {
redisOutbox.attachTracing(
tracer.currentSpan().context().traceId(),
tracer.currentSpan().context().spanId()
);
}

outboxRepository.save(redisOutbox);
}

private String makePayload(BidEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ public enum EventType {
BID_WINNER_SELECTED("경쟁 승리자 선정"),
PRODUCT_INVENTORY_ADJUSTED("환불"),
BID_RESULT_FINALIZED("경쟁 결과 최종 확정"),
BID_INVENTORY_CONFIRM_TIMEOUT("재고 확보 실패")
BID_INVENTORY_CONFIRM_TIMEOUT("재고 확보 실패"),
SAVE_STOCK("재고 저장"),
DELETE_STOCK("재고 클리어")
;

private final String description;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.smore.bidcompetition.infrastructure.persistence.repository.outbox;

import com.smore.bidcompetition.domain.status.EventStatus;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

public interface OutboxJpaRepositoryCustom {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

int claim(Long outboxId, EventStatus eventStatus);

int markSent(Long outboxId, EventStatus eventStatus);
Expand All @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.smore.bidcompetition.domain.status.EventStatus;
import com.smore.bidcompetition.infrastructure.persistence.entity.QOutboxEntity;
import jakarta.persistence.EntityManager;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -44,12 +45,26 @@ public Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageab
return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne);
}

@Override
public List<Long> findExpiredProcessingIds(LocalDateTime expiredAt) {

return queryFactory
.select(outboxEntity.id)
.from(outboxEntity)
.where(
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
outboxEntity.updatedAt.loe(expiredAt)
)
.fetch();
}

@Override
public int claim(Long outboxId, EventStatus eventStatus) {

long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PENDING)
Expand All @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) {
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
Expand All @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
return (int) updated;
}

@Override
public int bulkResetExpiredProcessingToReady(List<Long> ids) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, EventStatus.PENDING)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.in(ids),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
)
.execute();

em.flush();
em.clear();

return (int) updated;
}

}
Loading