Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] SSE 알림 서비스 구현 #122

Merged
merged 5 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.kuit.agarang.domain.ai.utils.MusicGenClientUtil;
import com.kuit.agarang.domain.memory.model.entity.Memory;
import com.kuit.agarang.domain.memory.repository.MemoryRepository;
import com.kuit.agarang.domain.notification.model.SseEmitters;
import com.kuit.agarang.global.common.exception.exception.OpenAPIException;
import com.kuit.agarang.global.common.model.dto.BaseResponseStatus;
import com.kuit.agarang.global.s3.model.dto.S3File;
Expand All @@ -28,14 +29,15 @@ public class MusicGenService {
private final MusicGenClientUtil musicGenClientUtil;
private final S3Util s3Util;
private final MemoryRepository memoryRepository;
private final SseEmitters sseEmitters;

private static final String WEBHOOK_URI = "/api/ai/music-gen/webhook";
private static final Integer MUSIC_DURATION = 40;

public String getMusic(String prompt) {
MusicGenResponse response
= musicGenClientUtil.post(
new MusicGenRequest(version, baseUrl + WEBHOOK_URI, prompt, MUSIC_DURATION), MusicGenResponse.class);
= musicGenClientUtil.post(
new MusicGenRequest(version, baseUrl + WEBHOOK_URI, prompt, MUSIC_DURATION), MusicGenResponse.class);
log.info("created musicgen id : {}", response.getId());
return response.getId();
}
Expand All @@ -53,6 +55,9 @@ public void saveMusic(MusicGenResponse response) {
Memory memory = optionalMemory.get();
memory.setMusicUrl(s3File.getObjectUrl());
memoryRepository.save(memory);

String message = "음악 생성이 완료되었습니다";
sseEmitters.sendNotification(message);
}

private static void checkStatus(MusicGenResponse response) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.kuit.agarang.domain.notification.controller;

import com.kuit.agarang.domain.notification.model.SseEmitters;
import com.kuit.agarang.global.common.exception.exception.BusinessException;
import com.kuit.agarang.global.common.model.dto.BaseResponseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

@Slf4j
@RestController
@RequiredArgsConstructor
public class SseController {

private final SseEmitters sseEmitters;

@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
SseEmitter emitter = new SseEmitter(10 * 60 * 1000L);
Copy link
Collaborator

@JGoo99 JGoo99 Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

연결 시에 회원 식별값에 대한 정보 없이도 해당 기기로 알림이 갈 수 있나요??

음악 생성이 비동기로 진행되기 때문에 여러 구독이 발생할 수 있고,
구독된 이벤트 중 특정 이벤트만 발생시켜야 한다면 식별정보가 필요할 것 같아서 여쭤봅니다!

만약 식별정보를 추가한다면 ConcurrentHashMap 같은 것도 좋을 것 같아요. (key : memberId, value: SseEmitter)

sseEmitters.add(emitter);
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("connected!")); // 503에러 방지를 위한 더미데이터
} catch (IOException e) {
emitter.completeWithError(e);
throw new BusinessException(BaseResponseStatus.FAIL_CREATE_EMITTER);
}
return emitter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.kuit.agarang.domain.notification.model;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Slf4j
@Component
public class SseEmitters {
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();

public SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
log.info("new emitter added: {}", emitter);
log.info("emitter list size: {}", emitters.size());

emitter.onCompletion(() -> {
log.info("onCompletion callback");
this.emitters.remove(emitter);
});

emitter.onTimeout(() -> {
log.info("onTimeout callback");
emitter.complete();
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onCompletion, onTimeout 이랑 추가로 onError 핸들러를 호출한 이후에
this.emitters.add(emitter); 를 마지막에 호출하는 건 어떤가요?


그리고 onTimeout 핸들러에서는 emitters.remove 가 아닌 emitter.complete() 를 호출하셨는데,
이유가 아래 흐름대로 동작하기를 유도하신 건가요?? 이 방식대로 흘러가는게 맞는지도 궁금합니다..!!

onTimeout 핸들러 호출 -> emitter.complete() -> onCompletion 핸들러 호출 -> this.emitters.remove(emitter);


return emitter;
}

public void sendNotification(String message) {
emitters.forEach(emitter -> {
try {
emitter.send(SseEmitter.event()
.name("notification")
.data(message));
log.info("Notification sent: {}", message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

음악 생성의 경우 알림 1회 전송이 끝인데, 알림 구독을 취소하는(emitters 맵에서 삭제하는) 로직은 없어도 되나요??

만약 구독 취소를 구현해야 한다면 유지보수를 위해서 따로 구독취소 컨트롤러를 생성하면 좋을 것 같아요. (n회 알림이 필요한 경우 수정하지 않아도되기 때문에)

} catch (IOException e) {
log.error("Error sending notification: {}", e.getMessage());
emitter.complete();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public enum BaseResponseStatus {
FAIL_REDIS_CONNECTION(false, HttpStatus.SERVICE_UNAVAILABLE, 5002, "레디스 서버에 연결 실패했습니다."),
FAIL_S3_UPLOAD(false, HttpStatus.SERVICE_UNAVAILABLE, 5003, "S3 파일 서버 업로드에 실패했습니다."),
INVALID_GPT_RESPONSE(false, HttpStatus.INTERNAL_SERVER_ERROR, 5004, "GPT 응답이 유효하지 않습니다."),
FAIL_CREATE_MUSIC(false, HttpStatus.INTERNAL_SERVER_ERROR, 5005, "music gen 음악 생성에 실패했습니다.");

FAIL_CREATE_MUSIC(false, HttpStatus.INTERNAL_SERVER_ERROR, 5005, "music gen 음악 생성에 실패했습니다."),
FAIL_CREATE_EMITTER(false, HttpStatus.INTERNAL_SERVER_ERROR, 5006, "SseEmitter 생성에 실패했습니다.");

private final boolean isSuccess;
@JsonIgnore
Expand Down