diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/factory/OutboxHandlerFactory.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/factory/OutboxHandlerFactory.java index cac5b0d1..59fa7c7a 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/application/factory/OutboxHandlerFactory.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/factory/OutboxHandlerFactory.java @@ -1,11 +1,14 @@ package com.smore.bidcompetition.application.factory; import com.smore.bidcompetition.application.handler.BidResultFinalizedHandler; +import com.smore.bidcompetition.application.handler.ClearStockHandler; import com.smore.bidcompetition.application.handler.InventoryConfirmTimeout; import com.smore.bidcompetition.application.handler.ProductInventoryAdjustedHandler; import com.smore.bidcompetition.application.handler.OutboxHandler; +import com.smore.bidcompetition.application.handler.SaveStockHandler; import com.smore.bidcompetition.application.handler.WinnerCreatedHandler; import com.smore.bidcompetition.domain.model.Outbox; +import com.smore.bidcompetition.infrastructure.redis.StockRedisService; import io.micrometer.tracing.propagation.Propagator; import io.micrometer.tracing.Tracer; import lombok.RequiredArgsConstructor; @@ -32,6 +35,7 @@ public class OutboxHandlerFactory { private final KafkaTemplate kafkaTemplate; private final Tracer tracer; private final Propagator propagator; + private final StockRedisService stockRedisService; public OutboxHandler from(Outbox outbox) { return switch (outbox.getEventType()) { @@ -39,6 +43,8 @@ public OutboxHandler from(Outbox outbox) { case PRODUCT_INVENTORY_ADJUSTED -> new ProductInventoryAdjustedHandler(tracer, propagator, productInventoryAdjustedTopic, kafkaTemplate, outbox); case BID_RESULT_FINALIZED -> new BidResultFinalizedHandler(tracer, propagator, bidResultFinalizedTopic, kafkaTemplate, outbox); case BID_INVENTORY_CONFIRM_TIMEOUT -> new InventoryConfirmTimeout(tracer, propagator, bidInventoryConfirmTimeoutTopic, kafkaTemplate, outbox); + case SAVE_STOCK -> new SaveStockHandler(tracer, propagator, stockRedisService, outbox); + case DELETE_STOCK -> new ClearStockHandler(tracer, propagator, stockRedisService, outbox); default -> throw new IllegalArgumentException( "지원되지 않은 이벤트입니다." + outbox.getEventType() ); diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/ClearStockHandler.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/ClearStockHandler.java new file mode 100644 index 00000000..b5220238 --- /dev/null +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/ClearStockHandler.java @@ -0,0 +1,66 @@ +package com.smore.bidcompetition.application.handler; + +import com.smore.bidcompetition.domain.model.Outbox; +import com.smore.bidcompetition.domain.status.OutboxResult; +import com.smore.bidcompetition.infrastructure.redis.StockRedisService; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; +import lombok.extern.slf4j.Slf4j; + +@Slf4j(topic = "ClearStockHandler") +public class ClearStockHandler implements OutboxHandler{ + + private final StockRedisService stockRedisService; + private final Outbox outbox; + private final Tracer tracer; + private final Propagator propagator; + + public ClearStockHandler(Tracer tracer, Propagator propagator, + StockRedisService stockRedisService, Outbox outbox) { + this.tracer = tracer; + this.propagator = propagator; + this.stockRedisService = stockRedisService; + this.outbox = outbox; + } + + @Override + public OutboxResult execute() { + + Span newSpan = restoreAndStartSpan(); + + try (Tracer.SpanInScope ws = tracer.withSpan(newSpan)) { + try { + boolean deleted = stockRedisService.deleteStock(outbox.getAggregateId()); + if (!deleted) { + log.info("stock 키가 없어 삭제할 게 없습니다. bidId={}", outbox.getAggregateId()); + } else { + log.info("stock 키 삭제 완료. bidId={}", outbox.getAggregateId()); + } + return OutboxResult.SUCCESS; + } catch (Exception e) { + log.error("stock 키 삭제 중 예외. bidId={}", outbox.getAggregateId(), e); + return OutboxResult.FAIL; + } + } finally { + newSpan.end(); + } + } + + private Span restoreAndStartSpan() { + Span.Builder spanBuilder = propagator.extract(outbox, (carrier, key) -> { + if ("X-B3-TraceId".equalsIgnoreCase(key)) { + return carrier.getTraceId(); + } + if ("X-B3-SpanId".equalsIgnoreCase(key)) { + return carrier.getSpanId(); + } + return null; + }); + + Span newSpan = spanBuilder + .name("redis-clear-stock") + .start(); + return newSpan; + } +} diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/SaveStockHandler.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/SaveStockHandler.java new file mode 100644 index 00000000..9ae4cee5 --- /dev/null +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/handler/SaveStockHandler.java @@ -0,0 +1,70 @@ +package com.smore.bidcompetition.application.handler; + +import com.smore.bidcompetition.domain.model.Outbox; +import com.smore.bidcompetition.domain.status.OutboxResult; +import com.smore.bidcompetition.infrastructure.redis.StockRedisService; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; +import lombok.extern.slf4j.Slf4j; + +@Slf4j(topic = "SaveStockHandler") +public class SaveStockHandler implements OutboxHandler{ + + private final StockRedisService stockRedisService; + private final Outbox outbox; + private final Tracer tracer; + private final Propagator propagator; + + public SaveStockHandler(Tracer tracer, Propagator propagator, + StockRedisService stockRedisService, Outbox outbox) { + this.tracer = tracer; + this.propagator = propagator; + this.stockRedisService = stockRedisService; + this.outbox = outbox; + } + + @Override + public OutboxResult execute() { + + Span newSpan = restoreAndStartSpan(); + + try (Tracer.SpanInScope ws = tracer.withSpan(newSpan)) { + try { + long setResult = stockRedisService.setStock(outbox.getAggregateId(), Integer.parseInt(outbox.getPayload())); + + if (setResult == -1L) { + log.error("재고 초기화 실패: bidId={}, stock={}",outbox.getAggregateId(), Integer.parseInt(outbox.getPayload())); + return OutboxResult.FAIL; + } else if (setResult == 0L) { + log.info("이미 재고 키가 존재합니다. bidId={}", outbox.getAggregateId(), Integer.parseInt(outbox.getPayload())); + } else { + log.info("재고 초기화 완료: bidId={}, stock={}", outbox.getAggregateId(), Integer.parseInt(outbox.getPayload())); + } + } catch (Exception e) { + log.error("재고 초기화 중 예외 발생. bidId={}", outbox.getAggregateId(), e); + return OutboxResult.FAIL; + } + return OutboxResult.SUCCESS; + } finally { + newSpan.end(); + } + } + + private Span restoreAndStartSpan() { + Span.Builder spanBuilder = propagator.extract(outbox, (carrier, key) -> { + if ("X-B3-TraceId".equalsIgnoreCase(key)) { + return carrier.getTraceId(); + } + if ("X-B3-SpanId".equalsIgnoreCase(key)) { + return carrier.getSpanId(); + } + return null; + }); + + Span newSpan = spanBuilder + .name("redis-saved-stock") + .start(); + return newSpan; + } +} diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java index 7a0db99b..78b2320d 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java @@ -2,7 +2,9 @@ import com.smore.bidcompetition.domain.status.EventStatus; import com.smore.bidcompetition.domain.model.Outbox; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -12,6 +14,8 @@ public interface OutboxRepository { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + Outbox save(Outbox outbox); int claim(Long outboxId, EventStatus eventStatus); @@ -22,4 +26,6 @@ public interface OutboxRepository { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidCompetitionService.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidCompetitionService.java index fb72a2bb..97a989bc 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidCompetitionService.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidCompetitionService.java @@ -89,19 +89,22 @@ public void createBid(BidCreateCommand command) { BidCompetition saved = bidCompetitionRepository.save(newBid); - try { - long setResult = stockRedisService.setStock(saved.getId(), saved.getTotalQuantity()); - - if (setResult == -1L) { - log.error("재고 초기화 실패: bidId={}, stock={}", saved.getId(), saved.getTotalQuantity()); - } else if (setResult == 0L) { - log.info("이미 재고 키가 존재합니다. bidId={}", saved.getId()); - } else { - log.info("재고 초기화 완료: bidId={}, stock={}", saved.getId(), setResult); - } - } catch (Exception e) { - log.error("재고 초기화 중 예외 발생. bidId={}", saved.getId(), e); + Outbox outbox = Outbox.create( + AggregateType.BID, + saved.getId(), + EventType.SAVE_STOCK, + UUID.randomUUID(), + String.valueOf(saved.getTotalQuantity()) + ); + + if (tracer.currentSpan() != null) { + outbox.attachTracing( + tracer.currentSpan().context().traceId(), + tracer.currentSpan().context().spanId() + ); } + + outboxRepository.save(outbox); } @Transactional diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidEndFinalizer.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidEndFinalizer.java index d7bd29e9..c86c7a09 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidEndFinalizer.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/service/BidEndFinalizer.java @@ -76,6 +76,23 @@ public void finalizeBid(UUID bidId, LocalDateTime now) { } outboxRepository.save(outbox); + + Outbox redisOutbox = Outbox.create( + AggregateType.BID, + bid.getId(), + EventType.DELETE_STOCK, + UUID.randomUUID(), + "" + ); + + if (tracer.currentSpan() != null) { + redisOutbox.attachTracing( + tracer.currentSpan().context().traceId(), + tracer.currentSpan().context().spanId() + ); + } + + outboxRepository.save(redisOutbox); } private String makePayload(BidEvent event) { diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/domain/status/EventType.java b/bidcompetition/src/main/java/com/smore/bidcompetition/domain/status/EventType.java index 02218ea7..e7b63b41 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/domain/status/EventType.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/domain/status/EventType.java @@ -4,7 +4,9 @@ public enum EventType { BID_WINNER_SELECTED("경쟁 승리자 선정"), PRODUCT_INVENTORY_ADJUSTED("환불"), BID_RESULT_FINALIZED("경쟁 결과 최종 확정"), - BID_INVENTORY_CONFIRM_TIMEOUT("재고 확보 실패") + BID_INVENTORY_CONFIRM_TIMEOUT("재고 확보 실패"), + SAVE_STOCK("재고 저장"), + DELETE_STOCK("재고 클리어") ; private final String description; diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java index 209d571c..8f78abd7 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java @@ -1,7 +1,9 @@ package com.smore.bidcompetition.infrastructure.persistence.repository.outbox; import com.smore.bidcompetition.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -9,6 +11,8 @@ public interface OutboxJpaRepositoryCustom { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + int claim(Long outboxId, EventStatus eventStatus); int markSent(Long outboxId, EventStatus eventStatus); @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java index d25e39a5..eb1a8239 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java @@ -7,6 +7,7 @@ import com.smore.bidcompetition.domain.status.EventStatus; import com.smore.bidcompetition.infrastructure.persistence.entity.QOutboxEntity; import jakarta.persistence.EntityManager; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; import lombok.RequiredArgsConstructor; @@ -44,12 +45,26 @@ public Page findPendingIds(Collection states, Pageable pageab return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + + return queryFactory + .select(outboxEntity.id) + .from(outboxEntity) + .where( + outboxEntity.eventStatus.eq(EventStatus.PROCESSING), + outboxEntity.updatedAt.loe(expiredAt) + ) + .fetch(); + } + @Override public int claim(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PENDING) @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) { .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING), @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return (int) updated; } + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + long updated = queryFactory + .update(outboxEntity) + .set(outboxEntity.eventStatus, EventStatus.PENDING) + .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) + .where( + outboxEntity.id.in(ids), + outboxEntity.eventStatus.eq(EventStatus.PROCESSING) + ) + .execute(); + + em.flush(); + em.clear(); + + return (int) updated; + } + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java index b687a478..03bd5ce6 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java @@ -9,7 +9,9 @@ import com.smore.bidcompetition.infrastructure.persistence.exception.CreateOutboxFailException; import com.smore.bidcompetition.infrastructure.persistence.exception.NotFoundOutboxException; import com.smore.bidcompetition.infrastructure.persistence.mapper.OutboxMapper; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; @@ -55,6 +57,11 @@ public Page findPendingIds(Collection states, Pageable pageab return outboxJpaRepository.findPendingIds(states, pageable); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + return outboxJpaRepository.findExpiredProcessingIds(expiredAt); + } + @Override public Outbox save(Outbox outbox) { @@ -125,4 +132,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount); } + + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids); + } } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/redis/StockRedisService.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/redis/StockRedisService.java index f3b98cc4..58058e48 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/redis/StockRedisService.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/redis/StockRedisService.java @@ -74,4 +74,9 @@ public long setStock(UUID bidId, int stockQuantity) { String.valueOf(stockQuantity) ); } + + public boolean deleteStock(UUID bidId) { + Boolean deleted = redis.delete(keys.stockKey(bidId)); + return Boolean.TRUE.equals(deleted); + } } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java b/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java index 5a704645..05d86e64 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java @@ -4,6 +4,8 @@ import com.smore.bidcompetition.application.repository.OutboxRepository; import com.smore.bidcompetition.application.service.OutboxProcessor; import com.smore.bidcompetition.domain.status.EventStatus; +import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,4 +51,17 @@ public void outboxTasks() { page++; } } + + @Scheduled(fixedDelay = 60000) + public void recoverExpiredProcessing() { + LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2); + + List expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt); + + if (expiredProcessingIds.isEmpty()) { + return; + } + + int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds); + } } diff --git a/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java b/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java index c7e9c24c..d55065c3 100644 --- a/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java +++ b/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java @@ -2,7 +2,9 @@ import com.smore.order.domain.model.Outbox; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -12,6 +14,8 @@ public interface OutboxRepository { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + Outbox save(Outbox outbox); int claim(Long outboxId, EventStatus eventStatus); @@ -22,4 +26,6 @@ public interface OutboxRepository { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java index 356a3bf5..0eaae4b3 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java @@ -1,7 +1,9 @@ package com.smore.order.infrastructure.persistence.repository.outbox; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -9,6 +11,8 @@ public interface OutboxJpaRepositoryCustom { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + int claim(Long outboxId, EventStatus eventStatus); int markSent(Long outboxId, EventStatus eventStatus); @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java index 9b78699c..d837b07a 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java @@ -7,6 +7,7 @@ import com.smore.order.domain.status.EventStatus; import jakarta.persistence.EntityManager; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; import lombok.RequiredArgsConstructor; @@ -44,12 +45,26 @@ public Page findPendingIds(Collection states, Pageable pageab return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + + return queryFactory + .select(outboxEntity.id) + .from(outboxEntity) + .where( + outboxEntity.eventStatus.eq(EventStatus.PROCESSING), + outboxEntity.updatedAt.loe(expiredAt) + ) + .fetch(); + } + @Override public int claim(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PENDING) @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) { .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING), @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return (int) updated; } + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + long updated = queryFactory + .update(outboxEntity) + .set(outboxEntity.eventStatus, EventStatus.PENDING) + .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) + .where( + outboxEntity.id.in(ids), + outboxEntity.eventStatus.eq(EventStatus.PROCESSING) + ) + .execute(); + + em.flush(); + em.clear(); + + return (int) updated; + } + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java index 0c8c07b7..e38d3040 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java @@ -8,7 +8,9 @@ import com.smore.order.infrastructure.persistence.exception.CreateOutboxFailException; import com.smore.order.infrastructure.persistence.exception.NotFoundOutboxException; import com.smore.order.infrastructure.persistence.mapper.OutboxMapper; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; @@ -54,6 +56,11 @@ public Page findPendingIds(Collection states, Pageable pageab return outboxJpaRepository.findPendingIds(states, pageable); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + return outboxJpaRepository.findExpiredProcessingIds(expiredAt); + } + @Override public Outbox save(Outbox outbox) { @@ -124,4 +131,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount); } + + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids); + } } diff --git a/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java b/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java index 2c76db72..27bb0460 100644 --- a/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java +++ b/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java @@ -4,6 +4,8 @@ import com.smore.order.application.service.OutboxProcessor; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -50,4 +52,17 @@ public void outboxTasks() { page++; } } + + @Scheduled(fixedDelay = 60000) + public void recoverExpiredProcessing() { + LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2); + + List expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt); + + if (expiredProcessingIds.isEmpty()) { + return; + } + + int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds); + } }