diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java b/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java index 7a0db99b..78b2320d 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/application/repository/OutboxRepository.java @@ -2,7 +2,9 @@ import com.smore.bidcompetition.domain.status.EventStatus; import com.smore.bidcompetition.domain.model.Outbox; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -12,6 +14,8 @@ public interface OutboxRepository { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + Outbox save(Outbox outbox); int claim(Long outboxId, EventStatus eventStatus); @@ -22,4 +26,6 @@ public interface OutboxRepository { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java index 209d571c..8f78abd7 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java @@ -1,7 +1,9 @@ package com.smore.bidcompetition.infrastructure.persistence.repository.outbox; import com.smore.bidcompetition.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -9,6 +11,8 @@ public interface OutboxJpaRepositoryCustom { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + int claim(Long outboxId, EventStatus eventStatus); int markSent(Long outboxId, EventStatus eventStatus); @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java index d25e39a5..eb1a8239 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java @@ -7,6 +7,7 @@ import com.smore.bidcompetition.domain.status.EventStatus; import com.smore.bidcompetition.infrastructure.persistence.entity.QOutboxEntity; import jakarta.persistence.EntityManager; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; import lombok.RequiredArgsConstructor; @@ -44,12 +45,26 @@ public Page findPendingIds(Collection states, Pageable pageab return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + + return queryFactory + .select(outboxEntity.id) + .from(outboxEntity) + .where( + outboxEntity.eventStatus.eq(EventStatus.PROCESSING), + outboxEntity.updatedAt.loe(expiredAt) + ) + .fetch(); + } + @Override public int claim(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PENDING) @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) { .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING), @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return (int) updated; } + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + long updated = queryFactory + .update(outboxEntity) + .set(outboxEntity.eventStatus, EventStatus.PENDING) + .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) + .where( + outboxEntity.id.in(ids), + outboxEntity.eventStatus.eq(EventStatus.PROCESSING) + ) + .execute(); + + em.flush(); + em.clear(); + + return (int) updated; + } + } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java index b687a478..03bd5ce6 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java @@ -9,7 +9,9 @@ import com.smore.bidcompetition.infrastructure.persistence.exception.CreateOutboxFailException; import com.smore.bidcompetition.infrastructure.persistence.exception.NotFoundOutboxException; import com.smore.bidcompetition.infrastructure.persistence.mapper.OutboxMapper; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; @@ -55,6 +57,11 @@ public Page findPendingIds(Collection states, Pageable pageab return outboxJpaRepository.findPendingIds(states, pageable); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + return outboxJpaRepository.findExpiredProcessingIds(expiredAt); + } + @Override public Outbox save(Outbox outbox) { @@ -125,4 +132,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount); } + + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids); + } } diff --git a/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java b/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java index 5a704645..05d86e64 100644 --- a/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java +++ b/bidcompetition/src/main/java/com/smore/bidcompetition/presentation/scheduler/OutboxScheduler.java @@ -4,6 +4,8 @@ import com.smore.bidcompetition.application.repository.OutboxRepository; import com.smore.bidcompetition.application.service.OutboxProcessor; import com.smore.bidcompetition.domain.status.EventStatus; +import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,4 +51,17 @@ public void outboxTasks() { page++; } } + + @Scheduled(fixedDelay = 60000) + public void recoverExpiredProcessing() { + LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2); + + List expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt); + + if (expiredProcessingIds.isEmpty()) { + return; + } + + int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds); + } } diff --git a/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java b/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java index c7e9c24c..d55065c3 100644 --- a/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java +++ b/order/src/main/java/com/smore/order/application/repository/OutboxRepository.java @@ -2,7 +2,9 @@ import com.smore.order.domain.model.Outbox; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -12,6 +14,8 @@ public interface OutboxRepository { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + Outbox save(Outbox outbox); int claim(Long outboxId, EventStatus eventStatus); @@ -22,4 +26,6 @@ public interface OutboxRepository { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java index 356a3bf5..0eaae4b3 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustom.java @@ -1,7 +1,9 @@ package com.smore.order.infrastructure.persistence.repository.outbox; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -9,6 +11,8 @@ public interface OutboxJpaRepositoryCustom { Page findPendingIds(Collection states, Pageable pageable); + List findExpiredProcessingIds(LocalDateTime expiredAt); + int claim(Long outboxId, EventStatus eventStatus); int markSent(Long outboxId, EventStatus eventStatus); @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom { int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount); + int bulkResetExpiredProcessingToReady(List ids); + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java index 9b78699c..d837b07a 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxJpaRepositoryCustomImpl.java @@ -7,6 +7,7 @@ import com.smore.order.domain.status.EventStatus; import jakarta.persistence.EntityManager; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; import lombok.RequiredArgsConstructor; @@ -44,12 +45,26 @@ public Page findPendingIds(Collection states, Pageable pageab return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + + return queryFactory + .select(outboxEntity.id) + .from(outboxEntity) + .where( + outboxEntity.eventStatus.eq(EventStatus.PROCESSING), + outboxEntity.updatedAt.loe(expiredAt) + ) + .fetch(); + } + @Override public int claim(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PENDING) @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) { long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) { .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING) @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun long updated = queryFactory .update(outboxEntity) .set(outboxEntity.eventStatus, eventStatus) + .set(outboxEntity.updatedAt, LocalDateTime.now()) .where( outboxEntity.id.eq(outboxId), outboxEntity.eventStatus.eq(EventStatus.PROCESSING), @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return (int) updated; } + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + long updated = queryFactory + .update(outboxEntity) + .set(outboxEntity.eventStatus, EventStatus.PENDING) + .set(outboxEntity.retryCount, outboxEntity.retryCount.add(1)) + .set(outboxEntity.updatedAt, LocalDateTime.now()) + .where( + outboxEntity.id.in(ids), + outboxEntity.eventStatus.eq(EventStatus.PROCESSING) + ) + .execute(); + + em.flush(); + em.clear(); + + return (int) updated; + } + } diff --git a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java index 0c8c07b7..e38d3040 100644 --- a/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java +++ b/order/src/main/java/com/smore/order/infrastructure/persistence/repository/outbox/OutboxRepositoryImpl.java @@ -8,7 +8,9 @@ import com.smore.order.infrastructure.persistence.exception.CreateOutboxFailException; import com.smore.order.infrastructure.persistence.exception.NotFoundOutboxException; import com.smore.order.infrastructure.persistence.mapper.OutboxMapper; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; @@ -54,6 +56,11 @@ public Page findPendingIds(Collection states, Pageable pageab return outboxJpaRepository.findPendingIds(states, pageable); } + @Override + public List findExpiredProcessingIds(LocalDateTime expiredAt) { + return outboxJpaRepository.findExpiredProcessingIds(expiredAt); + } + @Override public Outbox save(Outbox outbox) { @@ -124,4 +131,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount); } + + @Override + public int bulkResetExpiredProcessingToReady(List ids) { + return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids); + } } diff --git a/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java b/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java index 2c76db72..27bb0460 100644 --- a/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java +++ b/order/src/main/java/com/smore/order/presentation/scheduler/OutboxScheduler.java @@ -4,6 +4,8 @@ import com.smore.order.application.service.OutboxProcessor; import com.smore.order.domain.status.EventStatus; +import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -50,4 +52,17 @@ public void outboxTasks() { page++; } } + + @Scheduled(fixedDelay = 60000) + public void recoverExpiredProcessing() { + LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2); + + List expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt); + + if (expiredProcessingIds.isEmpty()) { + return; + } + + int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds); + } }