diff --git a/src/main/java/com/sofa/linkiving/domain/link/config/SummaryWorkerProperties.java b/src/main/java/com/sofa/linkiving/domain/link/config/SummaryWorkerProperties.java new file mode 100644 index 00000000..b0ab6a0f --- /dev/null +++ b/src/main/java/com/sofa/linkiving/domain/link/config/SummaryWorkerProperties.java @@ -0,0 +1,16 @@ +package com.sofa.linkiving.domain.link.config; + +import java.time.Duration; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "summary.worker") +public record SummaryWorkerProperties( + Duration sleepDuration +) { + public SummaryWorkerProperties { + if (sleepDuration == null || sleepDuration.isZero() || sleepDuration.isNegative()) { + throw new IllegalArgumentException("sleepDuration must be positive"); + } + } +} diff --git a/src/main/java/com/sofa/linkiving/domain/link/event/LinkCreatedEvent.java b/src/main/java/com/sofa/linkiving/domain/link/event/LinkCreatedEvent.java new file mode 100644 index 00000000..11668f3c --- /dev/null +++ b/src/main/java/com/sofa/linkiving/domain/link/event/LinkCreatedEvent.java @@ -0,0 +1,10 @@ +package com.sofa.linkiving.domain.link.event; + +/** + * 링크 생성 완료 이벤트 + * 트랜잭션 커밋 이후 발행되는 이벤트 + */ +public record LinkCreatedEvent( + Long linkId +) { +} diff --git a/src/main/java/com/sofa/linkiving/domain/link/event/LinkEventListener.java b/src/main/java/com/sofa/linkiving/domain/link/event/LinkEventListener.java new file mode 100644 index 00000000..e138444e --- /dev/null +++ b/src/main/java/com/sofa/linkiving/domain/link/event/LinkEventListener.java @@ -0,0 +1,63 @@ +package com.sofa.linkiving.domain.link.event; + +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import com.sofa.linkiving.domain.link.worker.SummaryQueue; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * 링크 도메인 이벤트 리스너 + * 트랜잭션 커밋 후 이벤트를 처리하여 데이터 일관성 보장 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class LinkEventListener { + + private final SummaryQueue summaryQueue; + + /** + * 링크 생성 완료 이벤트 처리 + * 트랜잭션 커밋 후에만 실행되어 롤백 시 큐에 추가되지 않음 + */ + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handleLinkCreated(LinkCreatedEvent event) { + log.info("Link created event received (after commit) - linkId: {}", event.linkId()); + + int maxRetries = 3; + int retryCount = 0; + boolean success = false; + + while (retryCount < maxRetries && !success) { + try { + summaryQueue.addToQueue(event.linkId()); + success = true; + } catch (Exception e) { + retryCount++; + log.warn("Failed to add link to summary queue (attempt {}/{}): linkId={}, error={}", + retryCount, maxRetries, event.linkId(), e.getMessage()); + + if (retryCount >= maxRetries) { + // 최종 실패 시 에러 로그 및 모니터링 알림 + log.error("Failed to add link to summary queue after {} retries - linkId: {}. " + + "Summary generation will be skipped for this link.", + maxRetries, event.linkId(), e); + // TODO: 관리자 알림 또는 실패 큐에 저장하여 수동 처리 가능하도록 개선 필요 + } else { + // 재시도 전 짧은 대기 + try { + Thread.sleep(100L * retryCount); // 100ms, 200ms, 300ms + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Retry interrupted for linkId: {}", event.linkId()); + break; + } + } + } + } + } +} diff --git a/src/main/java/com/sofa/linkiving/domain/link/service/LinkService.java b/src/main/java/com/sofa/linkiving/domain/link/service/LinkService.java index c11bc521..198e8f36 100644 --- a/src/main/java/com/sofa/linkiving/domain/link/service/LinkService.java +++ b/src/main/java/com/sofa/linkiving/domain/link/service/LinkService.java @@ -1,5 +1,6 @@ package com.sofa.linkiving.domain.link.service; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; @@ -8,6 +9,7 @@ import com.sofa.linkiving.domain.link.dto.response.LinkRes; import com.sofa.linkiving.domain.link.entity.Link; import com.sofa.linkiving.domain.link.error.LinkErrorCode; +import com.sofa.linkiving.domain.link.event.LinkCreatedEvent; import com.sofa.linkiving.domain.member.entity.Member; import com.sofa.linkiving.global.error.exception.BusinessException; @@ -21,6 +23,7 @@ public class LinkService { private final LinkCommandService linkCommandService; private final LinkQueryService linkQueryService; + private final ApplicationEventPublisher eventPublisher; public LinkRes createLink(Member member, String url, String title, String memo, String imageUrl) { if (linkQueryService.existsByUrl(member, url)) { @@ -30,6 +33,9 @@ public LinkRes createLink(Member member, String url, String title, String memo, Link link = linkCommandService.saveLink(member, url, title, memo, imageUrl); log.info("Link created - id: {}, memberId: {}, url: {}", link.getId(), member.getId(), url); + // 트랜잭션 커밋 후 요약 대기 큐에 추가되도록 이벤트 발행 + eventPublisher.publishEvent(new LinkCreatedEvent(link.getId())); + return LinkRes.from(link); } diff --git a/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryQueue.java b/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryQueue.java new file mode 100644 index 00000000..510bf396 --- /dev/null +++ b/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryQueue.java @@ -0,0 +1,31 @@ +package com.sofa.linkiving.domain.link.worker; + +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class SummaryQueue { + + private final Queue summaryQueue = new ConcurrentLinkedQueue<>(); + + /** + * 요약 대기 큐에 링크 ID 추가 + */ + public void addToQueue(Long linkId) { + summaryQueue.offer(linkId); + log.info("Link added to summary queue - linkId: {}", linkId); + } + + /** + * 요약 대기 큐에서 링크 ID 꺼내기 + */ + public Optional pollFromQueue() { + return Optional.ofNullable(summaryQueue.poll()); + } +} diff --git a/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryWorker.java b/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryWorker.java new file mode 100644 index 00000000..18afee09 --- /dev/null +++ b/src/main/java/com/sofa/linkiving/domain/link/worker/SummaryWorker.java @@ -0,0 +1,128 @@ +package com.sofa.linkiving.domain.link.worker; + +import java.util.Optional; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.sofa.linkiving.domain.link.config.SummaryWorkerProperties; +import com.sofa.linkiving.domain.link.entity.Link; +import com.sofa.linkiving.domain.link.entity.Summary; +import com.sofa.linkiving.domain.link.enums.Format; +import com.sofa.linkiving.domain.link.repository.LinkRepository; +import com.sofa.linkiving.domain.link.repository.SummaryRepository; +import com.sofa.linkiving.infra.feign.AiServerClient; +import com.sofa.linkiving.infra.feign.dto.SummaryRequest; +import com.sofa.linkiving.infra.feign.dto.SummaryResponse; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@EnableConfigurationProperties(SummaryWorkerProperties.class) +@RequiredArgsConstructor +public class SummaryWorker { + + private final SummaryQueue summaryQueue; + private final SummaryWorkerProperties properties; + private final LinkRepository linkRepository; + private final SummaryRepository summaryRepository; + private final AiServerClient aiServerClient; + private volatile boolean running = true; + private Thread workerThread; + + @PostConstruct + public void startWorker() { + workerThread = new Thread(() -> { + log.info("Summary worker thread started"); + while (running) { + try { + processQueue(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Summary worker thread interrupted"); + break; + } catch (Exception e) { + log.error("Error in summary worker thread", e); + } + } + log.info("Summary worker thread stopped"); + }); + workerThread.setName("summary-worker"); + workerThread.setDaemon(true); + workerThread.start(); + } + + @PreDestroy + public void stopWorker() { + log.info("Stopping summary worker thread"); + running = false; + if (workerThread != null) { + workerThread.interrupt(); + } + } + + private void processQueue() throws InterruptedException { + Optional linkIdOpt = summaryQueue.pollFromQueue(); + + if (linkIdOpt.isEmpty()) { + // 큐가 비어있으면 대기 + Thread.sleep(properties.sleepDuration().toMillis()); + return; + } + + Long linkId = linkIdOpt.get(); + log.info("Processing link for summary - linkId: {}", linkId); + + try { + generateAndSaveSummary(linkId); + } catch (Exception e) { + log.error("Failed to generate summary for linkId: {}", linkId, e); + } + } + + @Transactional + public void generateAndSaveSummary(Long linkId) { + // 1. Link 조회 + Link link = linkRepository.findById(linkId) + .orElseThrow(() -> new IllegalArgumentException("Link not found: " + linkId)); + + log.debug("Link found - url: {}, title: {}", link.getUrl(), link.getTitle()); + + // 2. RAG 서버에 요약 요청 + SummaryRequest request = SummaryRequest.of( + link.getId(), + link.getMember().getId(), + link.getUrl(), + link.getTitle(), + link.getMemo() + ); + log.info("Requesting summary to AI server - linkId: {}, userId: {}", request.linkId(), request.userId()); + SummaryResponse[] responses = aiServerClient.generateSummary(request); + if (responses == null || responses.length == 0) { + log.warn("AI server returned empty summary response - linkId: {}", linkId); + return; + } + if (responses.length > 1) { + log.warn("AI server returned multiple summaries, using the first - linkId: {}, size: {}", linkId, + responses.length); + } + SummaryResponse response = responses[0]; + + log.info("Summary generated for linkId: {}", linkId); + + // 3. Summary 엔티티 생성 및 저장 + Summary summary = Summary.builder() + .link(link) + .format(Format.CONCISE) + .content(response.summary()) + .build(); + + summaryRepository.save(summary); + log.info("Summary saved for linkId: {}", linkId); + } +} diff --git a/src/main/java/com/sofa/linkiving/infra/feign/AiServerClient.java b/src/main/java/com/sofa/linkiving/infra/feign/AiServerClient.java new file mode 100644 index 00000000..893b4552 --- /dev/null +++ b/src/main/java/com/sofa/linkiving/infra/feign/AiServerClient.java @@ -0,0 +1,19 @@ +package com.sofa.linkiving.infra.feign; + +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +import com.sofa.linkiving.infra.feign.dto.SummaryRequest; +import com.sofa.linkiving.infra.feign.dto.SummaryResponse; + +@FeignClient( + name = "aiServerClient", + url = "${ai.server.url}", + configuration = GlobalFeignConfig.class +) +public interface AiServerClient { + + @PostMapping("/webhook/summary-initial") + SummaryResponse[] generateSummary(@RequestBody SummaryRequest request); +} diff --git a/src/main/java/com/sofa/linkiving/infra/feign/GlobalFeignConfig.java b/src/main/java/com/sofa/linkiving/infra/feign/GlobalFeignConfig.java index 64a1dac9..2a7af62b 100644 --- a/src/main/java/com/sofa/linkiving/infra/feign/GlobalFeignConfig.java +++ b/src/main/java/com/sofa/linkiving/infra/feign/GlobalFeignConfig.java @@ -22,6 +22,6 @@ public Logger.Level feignLoggerLevel() { @Bean public Request.Options feignRequestOptions() { - return new Request.Options(3000, 5000); + return new Request.Options(5000, 60000); } } diff --git a/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryRequest.java b/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryRequest.java new file mode 100644 index 00000000..42fcc000 --- /dev/null +++ b/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryRequest.java @@ -0,0 +1,13 @@ +package com.sofa.linkiving.infra.feign.dto; + +public record SummaryRequest( + Long linkId, + Long userId, + String url, + String title, + String memo +) { + public static SummaryRequest of(Long linkId, Long userId, String url, String title, String memo) { + return new SummaryRequest(linkId, userId, url, title, memo); + } +} diff --git a/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryResponse.java b/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryResponse.java new file mode 100644 index 00000000..2eba886d --- /dev/null +++ b/src/main/java/com/sofa/linkiving/infra/feign/dto/SummaryResponse.java @@ -0,0 +1,6 @@ +package com.sofa.linkiving.infra.feign.dto; + +public record SummaryResponse( + String summary +) { +} diff --git a/src/test/java/com/sofa/linkiving/domain/link/config/SummaryWorkerPropertiesTest.java b/src/test/java/com/sofa/linkiving/domain/link/config/SummaryWorkerPropertiesTest.java new file mode 100644 index 00000000..34239084 --- /dev/null +++ b/src/test/java/com/sofa/linkiving/domain/link/config/SummaryWorkerPropertiesTest.java @@ -0,0 +1,50 @@ +package com.sofa.linkiving.domain.link.config; + +import static org.assertj.core.api.Assertions.*; + +import java.time.Duration; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@DisplayName("SummaryWorkerProperties 단위 테스트") +class SummaryWorkerPropertiesTest { + + @Test + @DisplayName("양수 값으로 Properties를 생성할 수 있다") + void shouldCreatePropertiesWithPositiveValue() { + // when + SummaryWorkerProperties properties = new SummaryWorkerProperties(Duration.ofSeconds(1)); + + // then + assertThat(properties.sleepDuration()).isEqualTo(Duration.ofSeconds(1)); + } + + @Test + @DisplayName("0 이하의 값으로 Properties 생성 시 예외가 발생한다 - 0") + void shouldThrowExceptionWhenSleepMsIsZero() { + // when & then + assertThatThrownBy(() -> new SummaryWorkerProperties(Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("sleepDuration must be positive"); + } + + @Test + @DisplayName("0 이하의 값으로 Properties 생성 시 예외가 발생한다 - 음수") + void shouldThrowExceptionWhenSleepMsIsNegative() { + // when & then + assertThatThrownBy(() -> new SummaryWorkerProperties(Duration.ofMillis(-100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("sleepDuration must be positive"); + } + + @Test + @DisplayName("최소 양수 값으로 Properties를 생성할 수 있다") + void shouldCreatePropertiesWithMinimumPositiveValue() { + // when + SummaryWorkerProperties properties = new SummaryWorkerProperties(Duration.ofMillis(1)); + + // then + assertThat(properties.sleepDuration()).isEqualTo(Duration.ofMillis(1)); + } +} diff --git a/src/test/java/com/sofa/linkiving/domain/link/event/LinkEventListenerTest.java b/src/test/java/com/sofa/linkiving/domain/link/event/LinkEventListenerTest.java new file mode 100644 index 00000000..436dd7fb --- /dev/null +++ b/src/test/java/com/sofa/linkiving/domain/link/event/LinkEventListenerTest.java @@ -0,0 +1,111 @@ +package com.sofa.linkiving.domain.link.event; + +import static org.mockito.BDDMockito.*; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.sofa.linkiving.domain.link.worker.SummaryQueue; + +@ExtendWith(MockitoExtension.class) +@DisplayName("LinkEventListener 단위 테스트") +class LinkEventListenerTest { + + @InjectMocks + private LinkEventListener linkEventListener; + + @Mock + private SummaryQueue summaryQueue; + + @Test + @DisplayName("링크 생성 이벤트 수신 시 큐에 추가한다") + void shouldAddToQueueWhenLinkCreatedEventReceived() { + // given + Long linkId = 123L; + LinkCreatedEvent event = new LinkCreatedEvent(linkId); + + // when + linkEventListener.handleLinkCreated(event); + + // then + verify(summaryQueue, times(1)).addToQueue(linkId); + } + + @Test + @DisplayName("여러 링크 생성 이벤트를 순차적으로 처리한다") + void shouldHandleMultipleLinkCreatedEvents() { + // given + LinkCreatedEvent event1 = new LinkCreatedEvent(1L); + LinkCreatedEvent event2 = new LinkCreatedEvent(2L); + LinkCreatedEvent event3 = new LinkCreatedEvent(3L); + + // when + linkEventListener.handleLinkCreated(event1); + linkEventListener.handleLinkCreated(event2); + linkEventListener.handleLinkCreated(event3); + + // then + verify(summaryQueue, times(1)).addToQueue(1L); + verify(summaryQueue, times(1)).addToQueue(2L); + verify(summaryQueue, times(1)).addToQueue(3L); + } + + @Test + @DisplayName("큐 추가 실패 시 최대 3번까지 재시도한다") + void shouldRetryWhenAddToQueueFails() { + // given + Long linkId = 123L; + LinkCreatedEvent event = new LinkCreatedEvent(linkId); + + // 첫 2번 실패, 3번째 성공 + willThrow(new RuntimeException("Queue full")) + .willThrow(new RuntimeException("Queue full")) + .willDoNothing() + .given(summaryQueue).addToQueue(linkId); + + // when + linkEventListener.handleLinkCreated(event); + + // then + verify(summaryQueue, times(3)).addToQueue(linkId); + } + + @Test + @DisplayName("큐 추가가 3번 모두 실패하면 재시도를 중단하고 에러 로그를 남긴다") + void shouldStopRetryingAfterMaxAttempts() { + // given + Long linkId = 123L; + LinkCreatedEvent event = new LinkCreatedEvent(linkId); + + // 3번 모두 실패 + willThrow(new RuntimeException("Queue full")) + .given(summaryQueue).addToQueue(linkId); + + // when + linkEventListener.handleLinkCreated(event); + + // then + verify(summaryQueue, times(3)).addToQueue(linkId); // 최대 3번 시도 + } + + @Test + @DisplayName("첫 번째 시도에서 성공하면 재시도하지 않는다") + void shouldNotRetryWhenFirstAttemptSucceeds() { + // given + Long linkId = 123L; + LinkCreatedEvent event = new LinkCreatedEvent(linkId); + + willDoNothing().given(summaryQueue).addToQueue(linkId); + + // when + linkEventListener.handleLinkCreated(event); + + // then + verify(summaryQueue, times(1)).addToQueue(linkId); // 1번만 시도 + } +} + diff --git a/src/test/java/com/sofa/linkiving/domain/link/service/LinkServiceTest.java b/src/test/java/com/sofa/linkiving/domain/link/service/LinkServiceTest.java index 0626a91e..77e454df 100644 --- a/src/test/java/com/sofa/linkiving/domain/link/service/LinkServiceTest.java +++ b/src/test/java/com/sofa/linkiving/domain/link/service/LinkServiceTest.java @@ -12,6 +12,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; @@ -37,6 +38,9 @@ class LinkServiceTest { @Mock private LinkQueryService linkQueryService; + @Mock + private ApplicationEventPublisher eventPublisher; + @Test @DisplayName("링크를 생성할 수 있다") void shouldCreateLink() { diff --git a/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryQueueTest.java b/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryQueueTest.java new file mode 100644 index 00000000..7c993763 --- /dev/null +++ b/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryQueueTest.java @@ -0,0 +1,91 @@ +package com.sofa.linkiving.domain.link.worker; + +import static org.assertj.core.api.Assertions.*; + +import java.util.Optional; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@DisplayName("SummaryQueue 단위 테스트") +class SummaryQueueTest { + + private SummaryQueue summaryQueue; + + @BeforeEach + void setUp() { + summaryQueue = new SummaryQueue(); + } + + @Test + @DisplayName("큐에 링크 ID를 추가할 수 있다") + void shouldAddLinkIdToQueue() { + // given + Long linkId = 123L; + + // when + summaryQueue.addToQueue(linkId); + + // then + Optional result = summaryQueue.pollFromQueue(); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(123L); + } + + @Test + @DisplayName("큐에서 링크 ID를 꺼낼 수 있다") + void shouldPollLinkIdFromQueue() { + // given + summaryQueue.addToQueue(456L); + + // when + Optional result = summaryQueue.pollFromQueue(); + + // then + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(456L); + } + + @Test + @DisplayName("빈 큐에서 poll 시 empty를 반환한다") + void shouldReturnEmptyWhenQueueIsEmpty() { + // when + Optional result = summaryQueue.pollFromQueue(); + + // then + assertThat(result).isEmpty(); + } + + @Test + @DisplayName("여러 링크 ID를 FIFO 순서로 처리한다") + void shouldProcessLinksInFifoOrder() { + // given + summaryQueue.addToQueue(1L); + summaryQueue.addToQueue(2L); + summaryQueue.addToQueue(3L); + + // when & then + assertThat(summaryQueue.pollFromQueue().get()).isEqualTo(1L); + assertThat(summaryQueue.pollFromQueue().get()).isEqualTo(2L); + assertThat(summaryQueue.pollFromQueue().get()).isEqualTo(3L); + assertThat(summaryQueue.pollFromQueue()).isEmpty(); + } + + @Test + @DisplayName("동일한 링크 ID를 여러 번 추가할 수 있다") + void shouldAddSameLinkIdMultipleTimes() { + // given + Long linkId = 999L; + + // when + summaryQueue.addToQueue(linkId); + summaryQueue.addToQueue(linkId); + + // then + assertThat(summaryQueue.pollFromQueue().get()).isEqualTo(999L); + assertThat(summaryQueue.pollFromQueue().get()).isEqualTo(999L); + assertThat(summaryQueue.pollFromQueue()).isEmpty(); + } +} + diff --git a/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryWorkerTest.java b/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryWorkerTest.java new file mode 100644 index 00000000..a5917bc8 --- /dev/null +++ b/src/test/java/com/sofa/linkiving/domain/link/worker/SummaryWorkerTest.java @@ -0,0 +1,159 @@ +package com.sofa.linkiving.domain.link.worker; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.BDDMockito.*; + +import java.time.Duration; +import java.util.Optional; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.sofa.linkiving.domain.link.config.SummaryWorkerProperties; +import com.sofa.linkiving.domain.link.repository.LinkRepository; +import com.sofa.linkiving.domain.link.repository.SummaryRepository; +import com.sofa.linkiving.infra.feign.AiServerClient; + +@ExtendWith(MockitoExtension.class) +@DisplayName("SummaryWorker 단위 테스트") +class SummaryWorkerTest { + + @Mock + private SummaryQueue summaryQueue; + + @Mock + private LinkRepository linkRepository; + + @Mock + private SummaryRepository summaryRepository; + + @Mock + private AiServerClient aiServerClient; + + private SummaryWorker summaryWorker; + private SummaryWorkerProperties properties; + + @BeforeEach + void setUp() { + properties = new SummaryWorkerProperties(Duration.ofMillis(100)); // 테스트용 짧은 sleep 시간 + summaryWorker = new SummaryWorker(summaryQueue, properties, linkRepository, summaryRepository, + aiServerClient); + } + + @AfterEach + void tearDown() { + if (summaryWorker != null) { + summaryWorker.stopWorker(); + } + } + + @Test + @DisplayName("워커 시작 시 백그라운드 쓰레드가 생성된다") + void shouldStartWorkerThread() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()).willReturn(Optional.empty()); + + // when + summaryWorker.startWorker(); + Thread.sleep(50); // 워커 쓰레드가 시작될 시간 대기 + + // then + verify(summaryQueue, atLeastOnce()).pollFromQueue(); + } + + @Test + @DisplayName("큐에 데이터가 있으면 처리한다") + void shouldProcessLinkFromQueue() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()) + .willReturn(Optional.of(123L)) + .willReturn(Optional.empty()); + + // when + summaryWorker.startWorker(); + Thread.sleep(150); // 처리 시간 대기 + + // then + verify(summaryQueue, atLeast(2)).pollFromQueue(); + } + + @Test + @DisplayName("큐가 비어있으면 설정된 시간만큼 대기한다") + void shouldSleepWhenQueueIsEmpty() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()).willReturn(Optional.empty()); + + // when + summaryWorker.startWorker(); + long startTime = System.currentTimeMillis(); + Thread.sleep(250); // sleep(100ms) * 2회 이상 호출될 시간 대기 + long endTime = System.currentTimeMillis(); + + // then + long elapsed = endTime - startTime; + assertThat(elapsed).isGreaterThanOrEqualTo(200); // 최소 2번의 sleep(100ms) + verify(summaryQueue, atLeast(2)).pollFromQueue(); + } + + @Test + @DisplayName("워커 종료 시 쓰레드가 정상적으로 중단된다") + void shouldStopWorkerThread() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()).willReturn(Optional.empty()); + summaryWorker.startWorker(); + Thread.sleep(50); // 워커 시작 대기 + + // when + summaryWorker.stopWorker(); + Thread.sleep(50); // 종료 대기 + + // then + int invocationsBefore = mockingDetails(summaryQueue).getInvocations().size(); + Thread.sleep(150); // 추가 대기 + int invocationsAfter = mockingDetails(summaryQueue).getInvocations().size(); + + // 워커가 중단되었으므로 추가 호출이 없어야 함 + assertThat(invocationsAfter).isEqualTo(invocationsBefore); + } + + @Test + @DisplayName("여러 링크를 순차적으로 처리한다") + void shouldProcessMultipleLinks() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()) + .willReturn(Optional.of(1L)) + .willReturn(Optional.of(2L)) + .willReturn(Optional.of(3L)) + .willReturn(Optional.empty()); + + // when + summaryWorker.startWorker(); + Thread.sleep(200); // 여러 링크 처리 시간 대기 + + // then + verify(summaryQueue, atLeast(4)).pollFromQueue(); + } + + @Test + @DisplayName("에러 발생 시에도 워커는 계속 동작한다") + void shouldContinueWorkingAfterError() throws InterruptedException { + // given + given(summaryQueue.pollFromQueue()) + .willThrow(new RuntimeException("Test exception")) + .willReturn(Optional.of(123L)) + .willReturn(Optional.empty()); + + // when + summaryWorker.startWorker(); + Thread.sleep(200); // 에러 발생 및 복구 시간 대기 + + // then + // 에러가 발생해도 워커가 계속 동작하여 다음 pollFromQueue 호출 + verify(summaryQueue, atLeast(3)).pollFromQueue(); + } +}