Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.sofa.linkiving.domain.link.config;

import java.time.Duration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "summary.worker")
public record SummaryWorkerProperties(
Duration sleepDuration
) {
public SummaryWorkerProperties {
if (sleepDuration == null || sleepDuration.isZero() || sleepDuration.isNegative()) {
throw new IllegalArgumentException("sleepDuration must be positive");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.sofa.linkiving.domain.link.event;

/**
* 링크 생성 완료 이벤트
* 트랜잭션 커밋 이후 발행되는 이벤트
*/
public record LinkCreatedEvent(
Long linkId
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.sofa.linkiving.domain.link.event;

import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import com.sofa.linkiving.domain.link.worker.SummaryQueue;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* 링크 도메인 이벤트 리스너
* 트랜잭션 커밋 후 이벤트를 처리하여 데이터 일관성 보장
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class LinkEventListener {

private final SummaryQueue summaryQueue;

/**
* 링크 생성 완료 이벤트 처리
* 트랜잭션 커밋 후에만 실행되어 롤백 시 큐에 추가되지 않음
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleLinkCreated(LinkCreatedEvent event) {
log.info("Link created event received (after commit) - linkId: {}", event.linkId());

int maxRetries = 3;
int retryCount = 0;
boolean success = false;

while (retryCount < maxRetries && !success) {
try {
summaryQueue.addToQueue(event.linkId());
success = true;
} catch (Exception e) {
retryCount++;
log.warn("Failed to add link to summary queue (attempt {}/{}): linkId={}, error={}",
retryCount, maxRetries, event.linkId(), e.getMessage());

if (retryCount >= maxRetries) {
// 최종 실패 시 에러 로그 및 모니터링 알림
log.error("Failed to add link to summary queue after {} retries - linkId: {}. "
+ "Summary generation will be skipped for this link.",
maxRetries, event.linkId(), e);
// TODO: 관리자 알림 또는 실패 큐에 저장하여 수동 처리 가능하도록 개선 필요
} else {
// 재시도 전 짧은 대기
try {
Thread.sleep(100L * retryCount); // 100ms, 200ms, 300ms
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Retry interrupted for linkId: {}", event.linkId());
break;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sofa.linkiving.domain.link.service;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
Expand All @@ -8,6 +9,7 @@
import com.sofa.linkiving.domain.link.dto.response.LinkRes;
import com.sofa.linkiving.domain.link.entity.Link;
import com.sofa.linkiving.domain.link.error.LinkErrorCode;
import com.sofa.linkiving.domain.link.event.LinkCreatedEvent;
import com.sofa.linkiving.domain.member.entity.Member;
import com.sofa.linkiving.global.error.exception.BusinessException;

Expand All @@ -21,6 +23,7 @@ public class LinkService {

private final LinkCommandService linkCommandService;
private final LinkQueryService linkQueryService;
private final ApplicationEventPublisher eventPublisher;

public LinkRes createLink(Member member, String url, String title, String memo, String imageUrl) {
if (linkQueryService.existsByUrl(member, url)) {
Expand All @@ -30,6 +33,9 @@ public LinkRes createLink(Member member, String url, String title, String memo,
Link link = linkCommandService.saveLink(member, url, title, memo, imageUrl);
log.info("Link created - id: {}, memberId: {}, url: {}", link.getId(), member.getId(), url);

// 트랜잭션 커밋 후 요약 대기 큐에 추가되도록 이벤트 발행
eventPublisher.publishEvent(new LinkCreatedEvent(link.getId()));

return LinkRes.from(link);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.sofa.linkiving.domain.link.worker;

import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class SummaryQueue {

private final Queue<Long> summaryQueue = new ConcurrentLinkedQueue<>();

/**
* 요약 대기 큐에 링크 ID 추가
*/
public void addToQueue(Long linkId) {
summaryQueue.offer(linkId);
log.info("Link added to summary queue - linkId: {}", linkId);
}

/**
* 요약 대기 큐에서 링크 ID 꺼내기
*/
public Optional<Long> pollFromQueue() {
return Optional.ofNullable(summaryQueue.poll());
}
}
128 changes: 128 additions & 0 deletions src/main/java/com/sofa/linkiving/domain/link/worker/SummaryWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.sofa.linkiving.domain.link.worker;

import java.util.Optional;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sofa.linkiving.domain.link.config.SummaryWorkerProperties;
import com.sofa.linkiving.domain.link.entity.Link;
import com.sofa.linkiving.domain.link.entity.Summary;
import com.sofa.linkiving.domain.link.enums.Format;
import com.sofa.linkiving.domain.link.repository.LinkRepository;
import com.sofa.linkiving.domain.link.repository.SummaryRepository;
import com.sofa.linkiving.infra.feign.AiServerClient;
import com.sofa.linkiving.infra.feign.dto.SummaryRequest;
import com.sofa.linkiving.infra.feign.dto.SummaryResponse;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@EnableConfigurationProperties(SummaryWorkerProperties.class)
@RequiredArgsConstructor
public class SummaryWorker {

private final SummaryQueue summaryQueue;
private final SummaryWorkerProperties properties;
private final LinkRepository linkRepository;
private final SummaryRepository summaryRepository;
private final AiServerClient aiServerClient;
private volatile boolean running = true;
private Thread workerThread;

@PostConstruct
public void startWorker() {
workerThread = new Thread(() -> {
log.info("Summary worker thread started");
while (running) {
try {
processQueue();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Summary worker thread interrupted");
break;
} catch (Exception e) {
log.error("Error in summary worker thread", e);
}
}
log.info("Summary worker thread stopped");
});
workerThread.setName("summary-worker");
workerThread.setDaemon(true);
workerThread.start();
}

@PreDestroy
public void stopWorker() {
log.info("Stopping summary worker thread");
running = false;
if (workerThread != null) {
workerThread.interrupt();
}
}

private void processQueue() throws InterruptedException {
Optional<Long> linkIdOpt = summaryQueue.pollFromQueue();

if (linkIdOpt.isEmpty()) {
// 큐가 비어있으면 대기
Thread.sleep(properties.sleepDuration().toMillis());
return;
}

Long linkId = linkIdOpt.get();
log.info("Processing link for summary - linkId: {}", linkId);

try {
generateAndSaveSummary(linkId);
} catch (Exception e) {
log.error("Failed to generate summary for linkId: {}", linkId, e);
}
}

@Transactional
public void generateAndSaveSummary(Long linkId) {
// 1. Link 조회
Link link = linkRepository.findById(linkId)
.orElseThrow(() -> new IllegalArgumentException("Link not found: " + linkId));

log.debug("Link found - url: {}, title: {}", link.getUrl(), link.getTitle());

// 2. RAG 서버에 요약 요청
SummaryRequest request = SummaryRequest.of(
link.getId(),
link.getMember().getId(),
link.getUrl(),
link.getTitle(),
link.getMemo()
);
log.info("Requesting summary to AI server - linkId: {}, userId: {}", request.linkId(), request.userId());
SummaryResponse[] responses = aiServerClient.generateSummary(request);
if (responses == null || responses.length == 0) {
log.warn("AI server returned empty summary response - linkId: {}", linkId);
return;
}
if (responses.length > 1) {
log.warn("AI server returned multiple summaries, using the first - linkId: {}, size: {}", linkId,
responses.length);
}
SummaryResponse response = responses[0];

log.info("Summary generated for linkId: {}", linkId);

// 3. Summary 엔티티 생성 및 저장
Summary summary = Summary.builder()
.link(link)
.format(Format.CONCISE)
.content(response.summary())
.build();

summaryRepository.save(summary);
log.info("Summary saved for linkId: {}", linkId);
}
}
19 changes: 19 additions & 0 deletions src/main/java/com/sofa/linkiving/infra/feign/AiServerClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.sofa.linkiving.infra.feign;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import com.sofa.linkiving.infra.feign.dto.SummaryRequest;
import com.sofa.linkiving.infra.feign.dto.SummaryResponse;

@FeignClient(
name = "aiServerClient",
url = "${ai.server.url}",
configuration = GlobalFeignConfig.class
)
public interface AiServerClient {

@PostMapping("/webhook/summary-initial")
SummaryResponse[] generateSummary(@RequestBody SummaryRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ public Logger.Level feignLoggerLevel() {

@Bean
public Request.Options feignRequestOptions() {
return new Request.Options(3000, 5000);
return new Request.Options(5000, 60000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sofa.linkiving.infra.feign.dto;

public record SummaryRequest(
Long linkId,
Long userId,
String url,
String title,
String memo
) {
public static SummaryRequest of(Long linkId, Long userId, String url, String title, String memo) {
return new SummaryRequest(linkId, userId, url, title, memo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.sofa.linkiving.infra.feign.dto;

public record SummaryResponse(
String summary
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.sofa.linkiving.domain.link.config;

import static org.assertj.core.api.Assertions.*;

import java.time.Duration;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

@DisplayName("SummaryWorkerProperties 단위 테스트")
class SummaryWorkerPropertiesTest {

@Test
@DisplayName("양수 값으로 Properties를 생성할 수 있다")
void shouldCreatePropertiesWithPositiveValue() {
// when
SummaryWorkerProperties properties = new SummaryWorkerProperties(Duration.ofSeconds(1));

// then
assertThat(properties.sleepDuration()).isEqualTo(Duration.ofSeconds(1));
}

@Test
@DisplayName("0 이하의 값으로 Properties 생성 시 예외가 발생한다 - 0")
void shouldThrowExceptionWhenSleepMsIsZero() {
// when & then
assertThatThrownBy(() -> new SummaryWorkerProperties(Duration.ZERO))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("sleepDuration must be positive");
}

@Test
@DisplayName("0 이하의 값으로 Properties 생성 시 예외가 발생한다 - 음수")
void shouldThrowExceptionWhenSleepMsIsNegative() {
// when & then
assertThatThrownBy(() -> new SummaryWorkerProperties(Duration.ofMillis(-100)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("sleepDuration must be positive");
}

@Test
@DisplayName("최소 양수 값으로 Properties를 생성할 수 있다")
void shouldCreatePropertiesWithMinimumPositiveValue() {
// when
SummaryWorkerProperties properties = new SummaryWorkerProperties(Duration.ofMillis(1));

// then
assertThat(properties.sleepDuration()).isEqualTo(Duration.ofMillis(1));
}
}
Loading