From 65a7dc449c506f9041a68bc4054c2e36f4974a8b Mon Sep 17 00:00:00 2001 From: AppleMint98 Date: Thu, 29 Aug 2024 23:53:16 +0900 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20=EC=95=8C=EB=9E=8C=20=EC=84=9C?= =?UTF-8?q?=EB=B9=84=EC=8A=A4=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/ai/service/MusicGenService.java | 9 +++- .../controller/SseController.java | 36 ++++++++++++++ .../notification/model/SseEmitters.java | 47 +++++++++++++++++++ .../common/model/dto/BaseResponseStatus.java | 4 +- 4 files changed, 92 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java create mode 100644 src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java diff --git a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java index 2149e79b..fefeb284 100644 --- a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java +++ b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java @@ -5,6 +5,7 @@ import com.kuit.agarang.domain.ai.utils.MusicGenClientUtil; import com.kuit.agarang.domain.memory.model.entity.Memory; import com.kuit.agarang.domain.memory.repository.MemoryRepository; +import com.kuit.agarang.domain.notification.model.SseEmitters; import com.kuit.agarang.global.common.exception.exception.OpenAPIException; import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; import com.kuit.agarang.global.s3.model.dto.S3File; @@ -28,14 +29,15 @@ public class MusicGenService { private final MusicGenClientUtil musicGenClientUtil; private final S3Util s3Util; private final MemoryRepository memoryRepository; + private final SseEmitters sseEmitters; private static final String WEBHOOK_URI = "/api/ai/music-gen/webhook"; private static final Integer MUSIC_DURATION = 40; public String getMusic(String prompt) { MusicGenResponse response - = musicGenClientUtil.post( - new MusicGenRequest(version, baseUrl + WEBHOOK_URI, prompt, MUSIC_DURATION), MusicGenResponse.class); + = musicGenClientUtil.post( + new MusicGenRequest(version, baseUrl + WEBHOOK_URI, prompt, MUSIC_DURATION), MusicGenResponse.class); log.info("created musicgen id : {}", response.getId()); return response.getId(); } @@ -53,6 +55,9 @@ public void saveMusic(MusicGenResponse response) { Memory memory = optionalMemory.get(); memory.setMusicUrl(s3File.getObjectUrl()); memoryRepository.save(memory); + + String message = "음악 생성이 완료되었습니다"; + sseEmitters.sendNotification(message); } private static void checkStatus(MusicGenResponse response) { diff --git a/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java new file mode 100644 index 00000000..3eef76d7 --- /dev/null +++ b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java @@ -0,0 +1,36 @@ +package com.kuit.agarang.domain.notification.controller; + +import com.kuit.agarang.domain.notification.model.SseEmitters; +import com.kuit.agarang.global.common.exception.exception.BusinessException; +import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; + +@Slf4j +@RestController +@RequiredArgsConstructor +public class SseController { + + private final SseEmitters sseEmitters; + + @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter connect() { + SseEmitter emitter = new SseEmitter(10 * 60 * 1000L); + sseEmitters.add(emitter); + try { + emitter.send(SseEmitter.event() + .name("connect") + .data("connected!")); // 503에러 방지를 위한 더미데이터 + } catch (IOException e) { + emitter.completeWithError(e); + throw new BusinessException(BaseResponseStatus.FAIL_CREATE_EMITTER); + } + return emitter; + } +} diff --git a/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java b/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java new file mode 100644 index 00000000..b5c6977d --- /dev/null +++ b/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java @@ -0,0 +1,47 @@ +package com.kuit.agarang.domain.notification.model; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +@Slf4j +@Component +public class SseEmitters { + private final List emitters = new CopyOnWriteArrayList<>(); + + public SseEmitter add(SseEmitter emitter) { + this.emitters.add(emitter); + log.info("new emitter added: {}", emitter); + log.info("emitter list size: {}", emitters.size()); + + emitter.onCompletion(() -> { + log.info("onCompletion callback"); + this.emitters.remove(emitter); + }); + + emitter.onTimeout(() -> { + log.info("onTimeout callback"); + emitter.complete(); + }); + + return emitter; + } + + public void sendNotification(String message) { + emitters.forEach(emitter -> { + try { + emitter.send(SseEmitter.event() + .name("notification") + .data(message)); + log.info("Notification sent: {}", message); + } catch (IOException e) { + log.error("Error sending notification: {}", e.getMessage()); + emitter.complete(); + } + }); + } +} diff --git a/src/main/java/com/kuit/agarang/global/common/model/dto/BaseResponseStatus.java b/src/main/java/com/kuit/agarang/global/common/model/dto/BaseResponseStatus.java index a4343b1f..0e968b70 100644 --- a/src/main/java/com/kuit/agarang/global/common/model/dto/BaseResponseStatus.java +++ b/src/main/java/com/kuit/agarang/global/common/model/dto/BaseResponseStatus.java @@ -39,8 +39,8 @@ public enum BaseResponseStatus { FAIL_REDIS_CONNECTION(false, HttpStatus.SERVICE_UNAVAILABLE, 5002, "레디스 서버에 연결 실패했습니다."), FAIL_S3_UPLOAD(false, HttpStatus.SERVICE_UNAVAILABLE, 5003, "S3 파일 서버 업로드에 실패했습니다."), INVALID_GPT_RESPONSE(false, HttpStatus.INTERNAL_SERVER_ERROR, 5004, "GPT 응답이 유효하지 않습니다."), - FAIL_CREATE_MUSIC(false, HttpStatus.INTERNAL_SERVER_ERROR, 5005, "music gen 음악 생성에 실패했습니다."); - + FAIL_CREATE_MUSIC(false, HttpStatus.INTERNAL_SERVER_ERROR, 5005, "music gen 음악 생성에 실패했습니다."), + FAIL_CREATE_EMITTER(false, HttpStatus.INTERNAL_SERVER_ERROR, 5006, "SseEmitter 생성에 실패했습니다."); private final boolean isSuccess; @JsonIgnore From 071957f81781d9d0d7ded9a8c541cff8ef89ff18 Mon Sep 17 00:00:00 2001 From: AppleMint98 Date: Fri, 30 Aug 2024 21:53:56 +0900 Subject: [PATCH 2/5] =?UTF-8?q?feat:=20=EC=95=8C=EB=9E=8C=20=EC=84=9C?= =?UTF-8?q?=EB=B9=84=EC=8A=A4=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/ai/service/MusicGenService.java | 8 +-- .../controller/SseController.java | 31 ++++----- .../notification/model/SseEmitters.java | 47 ------------- .../notification/service/SseService.java | 69 +++++++++++++++++++ 4 files changed, 85 insertions(+), 70 deletions(-) delete mode 100644 src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java create mode 100644 src/main/java/com/kuit/agarang/domain/notification/service/SseService.java diff --git a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java index fefeb284..c276fbf7 100644 --- a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java +++ b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java @@ -5,7 +5,7 @@ import com.kuit.agarang.domain.ai.utils.MusicGenClientUtil; import com.kuit.agarang.domain.memory.model.entity.Memory; import com.kuit.agarang.domain.memory.repository.MemoryRepository; -import com.kuit.agarang.domain.notification.model.SseEmitters; +import com.kuit.agarang.domain.notification.service.SseService; import com.kuit.agarang.global.common.exception.exception.OpenAPIException; import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; import com.kuit.agarang.global.s3.model.dto.S3File; @@ -29,7 +29,7 @@ public class MusicGenService { private final MusicGenClientUtil musicGenClientUtil; private final S3Util s3Util; private final MemoryRepository memoryRepository; - private final SseEmitters sseEmitters; + private final SseService sseService; private static final String WEBHOOK_URI = "/api/ai/music-gen/webhook"; private static final Integer MUSIC_DURATION = 40; @@ -56,8 +56,8 @@ public void saveMusic(MusicGenResponse response) { memory.setMusicUrl(s3File.getObjectUrl()); memoryRepository.save(memory); - String message = "음악 생성이 완료되었습니다"; - sseEmitters.sendNotification(message); + String message = "음악 생성 완료!"; + sseService.sendOneNotification(memory.getMember().getId(), message); } private static void checkStatus(MusicGenResponse response) { diff --git a/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java index 3eef76d7..8db2269a 100644 --- a/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java +++ b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java @@ -1,36 +1,29 @@ package com.kuit.agarang.domain.notification.controller; -import com.kuit.agarang.domain.notification.model.SseEmitters; -import com.kuit.agarang.global.common.exception.exception.BusinessException; -import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; +import com.kuit.agarang.domain.login.model.dto.CustomOAuth2User; +import com.kuit.agarang.domain.notification.service.SseService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; +import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import java.io.IOException; - @Slf4j @RestController @RequiredArgsConstructor public class SseController { - private final SseEmitters sseEmitters; + private final SseService sseService; + + @GetMapping(value = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter connect(@AuthenticationPrincipal CustomOAuth2User details) { + return sseService.connect(details.getMemberId()); + } - @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter connect() { - SseEmitter emitter = new SseEmitter(10 * 60 * 1000L); - sseEmitters.add(emitter); - try { - emitter.send(SseEmitter.event() - .name("connect") - .data("connected!")); // 503에러 방지를 위한 더미데이터 - } catch (IOException e) { - emitter.completeWithError(e); - throw new BusinessException(BaseResponseStatus.FAIL_CREATE_EMITTER); - } - return emitter; + @GetMapping(value = "/sse/disconnect") + public void disconnect(@AuthenticationPrincipal CustomOAuth2User details) { + sseService.disconnect(details.getMemberId()); } } diff --git a/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java b/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java deleted file mode 100644 index b5c6977d..00000000 --- a/src/main/java/com/kuit/agarang/domain/notification/model/SseEmitters.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.kuit.agarang.domain.notification.model; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -@Slf4j -@Component -public class SseEmitters { - private final List emitters = new CopyOnWriteArrayList<>(); - - public SseEmitter add(SseEmitter emitter) { - this.emitters.add(emitter); - log.info("new emitter added: {}", emitter); - log.info("emitter list size: {}", emitters.size()); - - emitter.onCompletion(() -> { - log.info("onCompletion callback"); - this.emitters.remove(emitter); - }); - - emitter.onTimeout(() -> { - log.info("onTimeout callback"); - emitter.complete(); - }); - - return emitter; - } - - public void sendNotification(String message) { - emitters.forEach(emitter -> { - try { - emitter.send(SseEmitter.event() - .name("notification") - .data(message)); - log.info("Notification sent: {}", message); - } catch (IOException e) { - log.error("Error sending notification: {}", e.getMessage()); - emitter.complete(); - } - }); - } -} diff --git a/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java new file mode 100644 index 00000000..a2efafdd --- /dev/null +++ b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java @@ -0,0 +1,69 @@ +package com.kuit.agarang.domain.notification.service; + + +import com.kuit.agarang.global.common.exception.exception.BusinessException; +import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SseService { + + private final ConcurrentHashMap emitters = new ConcurrentHashMap<>(); + + public SseEmitter connect(Long memberId) { + + SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); + emitters.put(memberId, emitter); + + try { + emitter.send(SseEmitter.event() + .name("connect") + .data("connected!")); // 503에러 방지를 위한 더미데이터 + } catch (IOException e) { + emitter.completeWithError(e); + throw new BusinessException(BaseResponseStatus.FAIL_CREATE_EMITTER); + } + + emitter.onCompletion(() -> { + log.info("onCompletion callback"); + emitters.remove(memberId); + }); + + emitter.onTimeout(() -> { + log.info("onTimeout callback"); + emitters.remove(memberId); + }); + + return emitter; + } + + public void sendOneNotification(Long memberId, String message) { + SseEmitter emitter = emitters.get(memberId); + log.info("memberId : {} , sendNotification", memberId); + if (emitter != null) { + try { + emitter.send(SseEmitter.event() + .name("notification") + .data(message, MediaType.TEXT_PLAIN)); + } catch (IOException e) { + emitters.remove(memberId); + } finally { + emitters.remove(memberId); + } + } + } + + public void disconnect(Long memberId) { + log.info("memberId : {} , disconnect", memberId); + emitters.remove(memberId); + } +} From 18794b3c5681a8810ff4e79cb5419ae826c2c15f Mon Sep 17 00:00:00 2001 From: AppleMint98 Date: Fri, 30 Aug 2024 22:20:20 +0900 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20=EC=97=90=EB=9F=AC=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/notification/service/SseService.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java index a2efafdd..920c5651 100644 --- a/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java +++ b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java @@ -22,27 +22,32 @@ public class SseService { public SseEmitter connect(Long memberId) { SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); - emitters.put(memberId, emitter); try { emitter.send(SseEmitter.event() .name("connect") - .data("connected!")); // 503에러 방지를 위한 더미데이터 + .data("connected!")); // 503에러 방지를 위한 더미 } catch (IOException e) { emitter.completeWithError(e); throw new BusinessException(BaseResponseStatus.FAIL_CREATE_EMITTER); } + emitter.onError(e -> { + log.error("Error on SSE connection for memberId: {}", memberId, e); + emitters.remove(memberId); + }); + emitter.onCompletion(() -> { - log.info("onCompletion callback"); + log.info("onCompletion callback for memberId: {}", memberId); emitters.remove(memberId); }); emitter.onTimeout(() -> { - log.info("onTimeout callback"); - emitters.remove(memberId); + log.info("onTimeout callback for memberId: {}", memberId); + emitter.complete(); }); + emitters.put(memberId, emitter); return emitter; } From 1aa28897b86396786b226cb31f4560d0eaa8129f Mon Sep 17 00:00:00 2001 From: AppleMint98 Date: Sun, 1 Sep 2024 12:58:07 +0900 Subject: [PATCH 4/5] =?UTF-8?q?fix:=20=EC=95=8C=EB=A6=BC=20=EB=A9=94?= =?UTF-8?q?=EC=84=9C=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kuit/agarang/domain/ai/service/MusicGenService.java | 4 ++-- .../domain/notification/controller/SseController.java | 6 ++++-- .../agarang/domain/notification/service/SseService.java | 7 ++----- src/main/resources/application-local.yml | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java index c276fbf7..5be95f90 100644 --- a/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java +++ b/src/main/java/com/kuit/agarang/domain/ai/service/MusicGenService.java @@ -56,8 +56,8 @@ public void saveMusic(MusicGenResponse response) { memory.setMusicUrl(s3File.getObjectUrl()); memoryRepository.save(memory); - String message = "음악 생성 완료!"; - sseService.sendOneNotification(memory.getMember().getId(), message); + String message = "MusicGen Complete!"; + sseService.sendNotification(memory.getMember().getId(), message); } private static void checkStatus(MusicGenResponse response) { diff --git a/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java index 8db2269a..b5cdc9be 100644 --- a/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java +++ b/src/main/java/com/kuit/agarang/domain/notification/controller/SseController.java @@ -7,22 +7,24 @@ import org.springframework.http.MediaType; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @Slf4j @RestController +@RequestMapping("/sse") @RequiredArgsConstructor public class SseController { private final SseService sseService; - @GetMapping(value = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect(@AuthenticationPrincipal CustomOAuth2User details) { return sseService.connect(details.getMemberId()); } - @GetMapping(value = "/sse/disconnect") + @GetMapping(value = "/disconnect") public void disconnect(@AuthenticationPrincipal CustomOAuth2User details) { sseService.disconnect(details.getMemberId()); } diff --git a/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java index 920c5651..08c6eae2 100644 --- a/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java +++ b/src/main/java/com/kuit/agarang/domain/notification/service/SseService.java @@ -3,7 +3,6 @@ import com.kuit.agarang.global.common.exception.exception.BusinessException; import com.kuit.agarang.global.common.model.dto.BaseResponseStatus; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; @@ -14,7 +13,6 @@ @Slf4j @Service -@RequiredArgsConstructor public class SseService { private final ConcurrentHashMap emitters = new ConcurrentHashMap<>(); @@ -51,7 +49,7 @@ public SseEmitter connect(Long memberId) { return emitter; } - public void sendOneNotification(Long memberId, String message) { + public void sendNotification(Long memberId, String message) { SseEmitter emitter = emitters.get(memberId); log.info("memberId : {} , sendNotification", memberId); if (emitter != null) { @@ -60,8 +58,7 @@ public void sendOneNotification(Long memberId, String message) { .name("notification") .data(message, MediaType.TEXT_PLAIN)); } catch (IOException e) { - emitters.remove(memberId); - } finally { + log.error("Error on send notification for memberId: {}", memberId, e); emitters.remove(memberId); } } diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index f7c58db0..8942095d 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -12,7 +12,7 @@ spring: data: redis: host: localhost - port: 6380 + port: 6379 deploy: env: local From c25f05b5b670097aec730e78182e0dfeacfef428 Mon Sep 17 00:00:00 2001 From: AppleMint98 Date: Sun, 1 Sep 2024 12:59:31 +0900 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20local=20yml=20=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application-local.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 8942095d..f7c58db0 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -12,7 +12,7 @@ spring: data: redis: host: localhost - port: 6379 + port: 6380 deploy: env: local