Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.smore.bidcompetition.domain.status.EventStatus;
import com.smore.bidcompetition.domain.model.Outbox;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

Expand All @@ -12,6 +14,8 @@ public interface OutboxRepository {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

Outbox save(Outbox outbox);

int claim(Long outboxId, EventStatus eventStatus);
Expand All @@ -22,4 +26,6 @@ public interface OutboxRepository {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.smore.bidcompetition.infrastructure.persistence.repository.outbox;

import com.smore.bidcompetition.domain.status.EventStatus;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

public interface OutboxJpaRepositoryCustom {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

int claim(Long outboxId, EventStatus eventStatus);

int markSent(Long outboxId, EventStatus eventStatus);
Expand All @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.smore.bidcompetition.domain.status.EventStatus;
import com.smore.bidcompetition.infrastructure.persistence.entity.QOutboxEntity;
import jakarta.persistence.EntityManager;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -44,12 +45,26 @@ public Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageab
return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne);
}

@Override
public List<Long> findExpiredProcessingIds(LocalDateTime expiredAt) {

return queryFactory
.select(outboxEntity.id)
.from(outboxEntity)
.where(
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
outboxEntity.updatedAt.loe(expiredAt)
)
.fetch();
}

@Override
public int claim(Long outboxId, EventStatus eventStatus) {

long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PENDING)
Expand All @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) {
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
Expand All @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
return (int) updated;
}

@Override
public int bulkResetExpiredProcessingToReady(List<Long> ids) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, EventStatus.PENDING)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.in(ids),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
)
.execute();

em.flush();
em.clear();

return (int) updated;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.smore.bidcompetition.infrastructure.persistence.exception.CreateOutboxFailException;
import com.smore.bidcompetition.infrastructure.persistence.exception.NotFoundOutboxException;
import com.smore.bidcompetition.infrastructure.persistence.mapper.OutboxMapper;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -55,6 +57,11 @@ public Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageab
return outboxJpaRepository.findPendingIds(states, pageable);
}

@Override
public List<Long> findExpiredProcessingIds(LocalDateTime expiredAt) {
return outboxJpaRepository.findExpiredProcessingIds(expiredAt);
}

@Override
public Outbox save(Outbox outbox) {

Expand Down Expand Up @@ -125,4 +132,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun

return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount);
}

@Override
public int bulkResetExpiredProcessingToReady(List<Long> ids) {
return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.smore.bidcompetition.application.repository.OutboxRepository;
import com.smore.bidcompetition.application.service.OutboxProcessor;
import com.smore.bidcompetition.domain.status.EventStatus;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -49,4 +51,17 @@ public void outboxTasks() {
page++;
}
}

@Scheduled(fixedDelay = 60000)
public void recoverExpiredProcessing() {
LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2);

List<Long> expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt);

if (expiredProcessingIds.isEmpty()) {
return;
}

int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.smore.order.domain.model.Outbox;
import com.smore.order.domain.status.EventStatus;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

Expand All @@ -12,6 +14,8 @@ public interface OutboxRepository {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

Outbox save(Outbox outbox);

int claim(Long outboxId, EventStatus eventStatus);
Expand All @@ -22,4 +26,6 @@ public interface OutboxRepository {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.smore.order.infrastructure.persistence.repository.outbox;

import com.smore.order.domain.status.EventStatus;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

public interface OutboxJpaRepositoryCustom {

Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageable);

List<Long> findExpiredProcessingIds(LocalDateTime expiredAt);

int claim(Long outboxId, EventStatus eventStatus);

int markSent(Long outboxId, EventStatus eventStatus);
Expand All @@ -17,4 +21,6 @@ public interface OutboxJpaRepositoryCustom {

int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCount);

int bulkResetExpiredProcessingToReady(List<Long> ids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.smore.order.domain.status.EventStatus;
import jakarta.persistence.EntityManager;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -44,12 +45,26 @@ public Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageab
return PageableExecutionUtils.getPage(content, pageable, countQuery::fetchOne);
}

@Override
public List<Long> findExpiredProcessingIds(LocalDateTime expiredAt) {

return queryFactory
.select(outboxEntity.id)
.from(outboxEntity)
.where(
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
outboxEntity.updatedAt.loe(expiredAt)
)
.fetch();
}

@Override
public int claim(Long outboxId, EventStatus eventStatus) {

long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PENDING)
Expand All @@ -67,6 +82,7 @@ public int markSent(Long outboxId, EventStatus eventStatus) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -85,6 +101,7 @@ public int markRetry(Long outboxId, EventStatus eventStatus) {
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
Expand All @@ -102,6 +119,7 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, eventStatus)
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.eq(outboxId),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING),
Expand All @@ -115,4 +133,23 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun
return (int) updated;
}

@Override
public int bulkResetExpiredProcessingToReady(List<Long> ids) {
long updated = queryFactory
.update(outboxEntity)
.set(outboxEntity.eventStatus, EventStatus.PENDING)
.set(outboxEntity.retryCount, outboxEntity.retryCount.add(1))
.set(outboxEntity.updatedAt, LocalDateTime.now())
.where(
outboxEntity.id.in(ids),
outboxEntity.eventStatus.eq(EventStatus.PROCESSING)
)
.execute();

em.flush();
em.clear();

return (int) updated;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.smore.order.infrastructure.persistence.exception.CreateOutboxFailException;
import com.smore.order.infrastructure.persistence.exception.NotFoundOutboxException;
import com.smore.order.infrastructure.persistence.mapper.OutboxMapper;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -54,6 +56,11 @@ public Page<Long> findPendingIds(Collection<EventStatus> states, Pageable pageab
return outboxJpaRepository.findPendingIds(states, pageable);
}

@Override
public List<Long> findExpiredProcessingIds(LocalDateTime expiredAt) {
return outboxJpaRepository.findExpiredProcessingIds(expiredAt);
}

@Override
public Outbox save(Outbox outbox) {

Expand Down Expand Up @@ -124,4 +131,9 @@ public int markFail(Long outboxId, EventStatus eventStatus, Integer maxRetryCoun

return outboxJpaRepository.markFail(outboxId, eventStatus, maxRetryCount);
}

@Override
public int bulkResetExpiredProcessingToReady(List<Long> ids) {
return outboxJpaRepository.bulkResetExpiredProcessingToReady(ids);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.smore.order.application.service.OutboxProcessor;
import com.smore.order.domain.status.EventStatus;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -50,4 +52,17 @@ public void outboxTasks() {
page++;
}
}

@Scheduled(fixedDelay = 60000)
public void recoverExpiredProcessing() {
LocalDateTime expiredAt = LocalDateTime.now().minusMinutes(2);

List<Long> expiredProcessingIds = outboxRepository.findExpiredProcessingIds(expiredAt);

if (expiredProcessingIds.isEmpty()) {
return;
}

int updated = outboxRepository.bulkResetExpiredProcessingToReady(expiredProcessingIds);
}
}