diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..22c49f41 Binary files /dev/null and b/.DS_Store differ diff --git a/Botanify_ERD.png b/Botanify_ERD.png new file mode 100644 index 00000000..c00e7f94 Binary files /dev/null and b/Botanify_ERD.png differ diff --git a/README.md b/README.md index 21173b02..945cc84e 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ - [서비스 소개](#-서비스-소개) - [기술 스택](#-기술-스택) - [프로젝트 설치 및 실행법](#-프로젝트-설치-및-실행법) -- [프로젝트 구조](#-프로젝트-구조) -- [주요기능](#-주요기능) -- [Developer](#-developer) +- [프로젝트 구조 ](#-프로젝트-구조 ) +- [주요 기능](#-주요기능) +- [Developer](#-Developer) ## 💁‍♀️ 서비스 소개 diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 00000000..8329e17c Binary files /dev/null and b/src/.DS_Store differ diff --git a/src/main/.DS_Store b/src/main/.DS_Store new file mode 100644 index 00000000..7cd02eba Binary files /dev/null and b/src/main/.DS_Store differ diff --git a/src/main/java/com/sounganization/botanify/common/config/TransactionConfig.java b/src/main/java/com/sounganization/botanify/common/config/TransactionConfig.java new file mode 100644 index 00000000..d5e336c9 --- /dev/null +++ b/src/main/java/com/sounganization/botanify/common/config/TransactionConfig.java @@ -0,0 +1,19 @@ +package com.sounganization.botanify.common.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.TransactionTemplate; + +@Configuration +public class TransactionConfig { + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + TransactionTemplate template = new TransactionTemplate(transactionManager); + template.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); + template.setTimeout(30); + return template; + } +} diff --git a/src/main/java/com/sounganization/botanify/common/config/async/AsyncConfig.java b/src/main/java/com/sounganization/botanify/common/config/async/AsyncConfig.java new file mode 100644 index 00000000..1cae5af8 --- /dev/null +++ b/src/main/java/com/sounganization/botanify/common/config/async/AsyncConfig.java @@ -0,0 +1,36 @@ +package com.sounganization.botanify.common.config.async; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +@Configuration +@EnableAsync +@Slf4j +public class AsyncConfig implements AsyncConfigurer { + + @Bean(name = "chatThreadPoolTaskExecutor") + public Executor getAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); + executor.setThreadNamePrefix("ChatAsync-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.initialize(); + return executor; + } + + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return (ex, method, params) -> { + log.error("비동기 메서드 {}에서 예외 발생: {}", method.getName(), ex.getMessage()); + }; + } +} diff --git a/src/main/java/com/sounganization/botanify/common/config/redis/RedisConfig.java b/src/main/java/com/sounganization/botanify/common/config/redis/RedisConfig.java index 785a1d37..fd8f6acf 100644 --- a/src/main/java/com/sounganization/botanify/common/config/redis/RedisConfig.java +++ b/src/main/java/com/sounganization/botanify/common/config/redis/RedisConfig.java @@ -1,6 +1,7 @@ package com.sounganization.botanify.common.config.redis; import com.sounganization.botanify.common.config.websocket.ChatMessageListener; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -8,13 +9,16 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @ConditionalOnProperty(name = "spring.redis.enabled", havingValue = "true", matchIfMissing = true) +@Slf4j public class RedisConfig { @Value("${spring.redis.master.host}") @@ -46,13 +50,36 @@ public RedisTemplate redisTemplate() { return redisTemplate; } + @Bean + public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { + StringRedisTemplate template = new StringRedisTemplate(connectionFactory); + template.setEnableTransactionSupport(true); + return template; + } + + @Bean + public ThreadPoolTaskExecutor redisThreadPoolTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("RedisExecutor-"); + executor.initialize(); + return executor; + } + @Bean public RedisMessageListenerContainer redisMessageListenerContainer( RedisConnectionFactory connectionFactory, ChatMessageListener chatMessageListener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); - container.addMessageListener(chatMessageListener, new PatternTopic("chat_room_*")); + container.setTaskExecutor(redisThreadPoolTaskExecutor()); + container.setRecoveryInterval(5000L); + container.addMessageListener(chatMessageListener, new PatternTopic("chat_room_broadcast_*")); + + log.info("최적화된 설정으로 Redis message listener container 구성"); return container; } } diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/ChatMessageListener.java b/src/main/java/com/sounganization/botanify/common/config/websocket/ChatMessageListener.java index f392f16f..52083115 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/ChatMessageListener.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/ChatMessageListener.java @@ -1,29 +1,67 @@ package com.sounganization.botanify.common.config.websocket; import com.fasterxml.jackson.databind.ObjectMapper; -import com.sounganization.botanify.common.config.websocket.service.WebSocketChatService; import com.sounganization.botanify.domain.chat.dto.req.ChatMessageReqDto; +import com.sounganization.botanify.domain.chat.entity.ChatMessage; +import com.sounganization.botanify.domain.chat.service.ChatMessageService; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; -@Component +import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +@Service @RequiredArgsConstructor @Slf4j public class ChatMessageListener implements MessageListener { private final ObjectMapper objectMapper; - private final WebSocketChatService webSocketChatService; + private final ChatMessageService chatMessageService; + private final Set processedMessages = ConcurrentHashMap.newKeySet(); @Override - public void onMessage(Message message, byte[] pattern) { + public void onMessage(@NonNull Message message, byte[] pattern) { try { - String messageBody = new String(message.getBody()); - ChatMessageReqDto chatMessage = objectMapper.readValue(messageBody, ChatMessageReqDto.class); - webSocketChatService.handleChatMessage(chatMessage); + String messageContent = new String(message.getBody(), StandardCharsets.UTF_8); + if (!processedMessages.add(messageContent)) { + log.debug("Listener를 통해 이미 처리된 메시지입니다. 건너뜁니다."); + return; + } + + ChatMessageReqDto chatMessage = objectMapper.readValue(messageContent, ChatMessageReqDto.class); + + if (chatMessage.source() != ChatMessageReqDto.MessageSource.WEBSOCKET) { + log.debug("WebSocket 메시지가 아니므로 건너뜁니다."); + return; + } + + ChatMessage savedMessage = chatMessageService.findExistingMessage( + chatMessage.roomId(), + chatMessage.senderId(), + chatMessage.content() + ); + + if (savedMessage != null) { + chatMessageService.markMessageAsDelivered(savedMessage.getId()); + log.debug("브로드캐스트 후 메시지가 전달됨으로 표시되었습니다 - messageId: {}", savedMessage.getId()); + } + + removeProcessedMessageAfterDelay(messageContent); + } catch (Exception e) { - log.error("채팅 메시지 처리 중 오류 발생: {}", e.getMessage()); + log.error("Redis Pub/Sub에서 메시지 처리 실패", e); } } -} \ No newline at end of file + + private void removeProcessedMessageAfterDelay(String messageContent) { + CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS).execute(() -> { + processedMessages.remove(messageContent); + }); + } +} diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/WebSocketConfig.java b/src/main/java/com/sounganization/botanify/common/config/websocket/WebSocketConfig.java index 86ab2d93..27c15def 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/WebSocketConfig.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/WebSocketConfig.java @@ -2,6 +2,7 @@ import com.sounganization.botanify.common.config.websocket.handler.ChatWebSocketHandler; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; @@ -10,8 +11,8 @@ @Configuration @EnableWebSocket @RequiredArgsConstructor +@Slf4j public class WebSocketConfig implements WebSocketConfigurer { - private final ChatWebSocketHandler chatWebSocketHandler; @Override diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ChatWebSocketHandler.java b/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ChatWebSocketHandler.java index a9ef3dce..b1fc0c82 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ChatWebSocketHandler.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ChatWebSocketHandler.java @@ -17,40 +17,41 @@ @Slf4j @RequiredArgsConstructor public class ChatWebSocketHandler extends TextWebSocketHandler { - private final ObjectMapper objectMapper; private final WebSocketChatService webSocketChatService; private final ConnectionFailureHandler connectionFailureHandler; - @Override - public void afterConnectionEstablished(WebSocketSession session) { - log.info("새로운 WebSocket 연결이 열렸습니다. 세션 ID: {}", session.getId()); - } - @Override protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception { String payload = message.getPayload(); + log.info("WebSocket 메시지를 수신했습니다: {}", payload); + ChatMessageReqDto chatMessage = objectMapper.readValue(payload, ChatMessageReqDto.class); try { switch (chatMessage.type()) { case ENTER: + log.info("사용자 {}이(가) 방 {}에 입장합니다.", chatMessage.senderId(), chatMessage.roomId()); webSocketChatService.handleEnterRoom(session, chatMessage.roomId(), chatMessage.senderId()); break; case TALK: if (!session.isOpen()) { + log.warn("메시지를 보내는 중 세션이 종료되었습니다."); connectionFailureHandler.handleConnectionFailure(chatMessage); return; } + log.info("사용자 {}이(가) 방 {}에서 TALK 메시지를 처리 중입니다.", + chatMessage.senderId(), chatMessage.roomId()); webSocketChatService.handleChatMessage(chatMessage); break; case LEAVE: + log.info("사용자 {}이(가) 방 {}에서 나갑니다.", chatMessage.senderId(), chatMessage.roomId()); webSocketChatService.handleLeaveRoom(chatMessage.roomId(), chatMessage.senderId()); break; } } catch (Exception e) { + log.error("메시지 처리 중 오류 발생: ", e); handleError(session, e); - // 메시지 처리 중 오류 발생시 연결 실패로 처리 if (chatMessage.type() == ChatMessageReqDto.MessageType.TALK) { connectionFailureHandler.handleConnectionFailure(chatMessage); } diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ConnectionFailureHandler.java b/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ConnectionFailureHandler.java index 40f74138..b1f0ee53 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ConnectionFailureHandler.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/handler/ConnectionFailureHandler.java @@ -54,7 +54,8 @@ private void retryMessageDelivery(ChatMessage message) { convertToDtoMessageType(message.getType()), message.getChatRoom().getId(), message.getSenderId(), - message.getContent() + message.getContent(), + ChatMessageReqDto.MessageSource.WEBSOCKET ); try { diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/service/MessageBroadcastService.java b/src/main/java/com/sounganization/botanify/common/config/websocket/service/MessageBroadcastService.java index 652f3da0..52d0a489 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/service/MessageBroadcastService.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/service/MessageBroadcastService.java @@ -6,32 +6,68 @@ import com.sounganization.botanify.domain.chat.dto.req.ChatMessageReqDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.web.socket.WebSocketSession; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; @Service @RequiredArgsConstructor @Slf4j public class MessageBroadcastService { - private final ObjectMapper objectMapper; + private final StringRedisTemplate redisTemplate; private final Map> chatRoomSessions = new ConcurrentHashMap<>(); + private final Set processedMessageIds = ConcurrentHashMap.newKeySet(); public void broadcastMessage(Long roomId, ChatMessageReqDto message) { + String messageId = generateMessageId(roomId, message); + + if (!processedMessageIds.add(messageId)) { + log.debug("이미 브로드캐스트된 메시지입니다. 건너뜁니다: {}", messageId); + return; + } + try { - String messageJson = objectMapper.writeValueAsString(message); - WebSocketUtils.broadcastMessageToRoom(roomId, messageJson, chatRoomSessions); + log.debug("방 {}에 메시지를 브로드캐스트 중입니다.", roomId); + redisTemplate.convertAndSend( + "chat_room_broadcast_" + roomId, + objectMapper.writeValueAsString(message) + ); + + removeProcessedMessageIdAfterDelay(messageId); } catch (JsonProcessingException e) { - log.error("메시지 변환 중 오류 발생: {}", e.getMessage()); + log.error("Redis에 메시지 게시 실패", e); + processedMessageIds.remove(messageId); } } + private String generateMessageId(Long roomId, ChatMessageReqDto message) { + return String.format("%d:%d:%d:%s", + roomId, + message.senderId(), + message.content().hashCode(), + LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS) + ); + } + + private void removeProcessedMessageIdAfterDelay(String messageId) { + CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS).execute(() -> { + processedMessageIds.remove(messageId); + }); + } + public void addSession(Long roomId, Long userId, WebSocketSession session) { chatRoomSessions.computeIfAbsent(roomId, k -> new ConcurrentHashMap<>()) .put(userId, session); + log.debug("roomId: {}, userId: {}에 대한 세션을 추가했습니다.", roomId, userId); } public void removeSession(Long roomId, Long userId) { @@ -40,10 +76,12 @@ public void removeSession(Long roomId, Long userId) { WebSocketSession session = roomSessions.remove(userId); if (session != null) { WebSocketUtils.closeSession(session); + log.debug("roomId: {}, userId: {}에 대한 세션을 제거했습니다.", roomId, userId); } if (roomSessions.isEmpty()) { chatRoomSessions.remove(roomId); + log.debug("빈 방을 제거했습니다: {}", roomId); } } } -} +} \ No newline at end of file diff --git a/src/main/java/com/sounganization/botanify/common/config/websocket/service/WebSocketChatService.java b/src/main/java/com/sounganization/botanify/common/config/websocket/service/WebSocketChatService.java index 89908872..a8377890 100644 --- a/src/main/java/com/sounganization/botanify/common/config/websocket/service/WebSocketChatService.java +++ b/src/main/java/com/sounganization/botanify/common/config/websocket/service/WebSocketChatService.java @@ -3,41 +3,59 @@ import com.sounganization.botanify.domain.chat.dto.req.ChatMessageReqDto; import com.sounganization.botanify.domain.chat.entity.ChatMessage; import com.sounganization.botanify.domain.chat.service.ChatMessageService; -import com.sounganization.botanify.domain.chat.service.ChatRoomService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.socket.WebSocketSession; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + @Service @RequiredArgsConstructor @Slf4j public class WebSocketChatService { private final MessageBroadcastService messageBroadcastService; - private final ChatRoomService chatRoomService; private final ChatMessageService chatMessageService; public void handleChatMessage(ChatMessageReqDto chatMessage) { - // WebSocket을 통해 처음 메시지가 올 때만 저장 및 Redis 발행 - if (chatMessage.type() == ChatMessageReqDto.MessageType.TALK) { - // 저장된 메시지를 변수에 할당하여 사용 - ChatMessage savedMessage = chatMessageService.saveMessage( + log.info("handleChatMessage 호출됨, 메시지: {}", chatMessage); + + if (chatMessage.type() != ChatMessageReqDto.MessageType.TALK || + chatMessage.source() != ChatMessageReqDto.MessageSource.WEBSOCKET) { + log.warn("메시지가 거부되었습니다: 잘못된 type 또는 source입니다. Type: {}, Source: {}", + chatMessage.type(), chatMessage.source()); + return; + } + + try { + log.info("Room: {}, sender: {}에 대한 메시지 생성 중", + chatMessage.roomId(), chatMessage.senderId()); + + ChatMessage message = chatMessageService.createMessage( chatMessage.roomId(), chatMessage.senderId(), chatMessage.content() ); - // 저장 성공 시에만 브로드캐스트 실행 - if (savedMessage != null) { - messageBroadcastService.broadcastMessage(chatMessage.roomId(), chatMessage); + + if (message != null) { + log.info("메시지가 생성되었습니다. 저장 시도 중"); + Future future = chatMessageService.saveMessageAsync(message); + ChatMessage savedMessage = future.get(5, TimeUnit.SECONDS); + + if (savedMessage != null) { + log.info("메시지가 저장되었습니다. 방 {}에 브로드캐스트 중", chatMessage.roomId()); + messageBroadcastService.broadcastMessage(chatMessage.roomId(), chatMessage); + } } - } else { - // Redis 구독을 통해 받은 메시지는 브로드캐스트만 수행 - messageBroadcastService.broadcastMessage(chatMessage.roomId(), chatMessage); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("채팅 메시지 처리 실패: ", e); } } public void handleEnterRoom(WebSocketSession session, Long roomId, Long userId) { - chatRoomService.getChatRoom(roomId, userId); messageBroadcastService.addSession(roomId, userId, session); log.info("사용자 {}가 채팅방 {}에 입장했습니다.", userId, roomId); } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/components/ChatFailureHandler.java b/src/main/java/com/sounganization/botanify/domain/chat/components/ChatFailureHandler.java index f1fa6849..8a2c82e0 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/components/ChatFailureHandler.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/components/ChatFailureHandler.java @@ -6,7 +6,6 @@ import com.sounganization.botanify.domain.chat.repository.ChatMessageRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import java.util.List; @@ -18,7 +17,6 @@ public class ChatFailureHandler { private final ChatMessageRepository chatMessageRepository; private final WebSocketChatService webSocketChatService; - @EventListener public void handleRedisFailure(ChatMessageReqDto message) { log.error("Redis 연결 실패 - Fallback 모드로 전환"); webSocketChatService.handleChatMessage(message); @@ -43,7 +41,8 @@ private void retryMessageDelivery(ChatMessage message) { ChatMessageReqDto.MessageType.TALK, message.getChatRoom().getId(), message.getSenderId(), - message.getContent() + message.getContent(), + ChatMessageReqDto.MessageSource.WEBSOCKET ); try { diff --git a/src/main/java/com/sounganization/botanify/domain/chat/dto/req/ChatMessageReqDto.java b/src/main/java/com/sounganization/botanify/domain/chat/dto/req/ChatMessageReqDto.java index c423965a..a213b878 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/dto/req/ChatMessageReqDto.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/dto/req/ChatMessageReqDto.java @@ -4,9 +4,15 @@ public record ChatMessageReqDto( MessageType type, Long roomId, Long senderId, - String content + String content, + MessageSource source ) { public enum MessageType { ENTER, TALK, LEAVE } + + public enum MessageSource { + WEBSOCKET, + REDIS + } } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/entity/ChatMessage.java b/src/main/java/com/sounganization/botanify/domain/chat/entity/ChatMessage.java index 1aac2c17..d072f0f5 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/entity/ChatMessage.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/entity/ChatMessage.java @@ -15,6 +15,9 @@ @Builder @AllArgsConstructor @NoArgsConstructor +@Table(indexes = { + @Index(name = "idx_chat_message_unique", columnList = "room_id,sender_id,content,created_at", unique = true) +}) public class ChatMessage extends Timestamped { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @@ -46,6 +49,9 @@ public void setExpirationDate() { @Builder.Default private Boolean delivered = false; + @Version + private Long version; + public void markAsDelivered() { this.delivered = true; } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/event/ChatMessageEvent.java b/src/main/java/com/sounganization/botanify/domain/chat/event/ChatMessageEvent.java deleted file mode 100644 index 5d421850..00000000 --- a/src/main/java/com/sounganization/botanify/domain/chat/event/ChatMessageEvent.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.sounganization.botanify.domain.chat.event; - -import com.sounganization.botanify.domain.chat.dto.req.ChatMessageReqDto; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -@Getter -@RequiredArgsConstructor -public class ChatMessageEvent { - private final ChatMessageReqDto message; -} diff --git a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepository.java b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepository.java index 6ab0e624..1dc40bd1 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepository.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepository.java @@ -1,19 +1,16 @@ package com.sounganization.botanify.domain.chat.repository; import com.sounganization.botanify.domain.chat.entity.ChatMessage; -import com.sounganization.botanify.domain.chat.entity.ChatRoom; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import java.time.LocalDateTime; import java.util.List; +import java.util.Optional; public interface ChatMessageCustomRepository { - List findMessagesByRoomId(Long roomId); - List findRecentMessages(ChatRoom room, LocalDateTime after); - List findMessagesWithRoomByRoomId(Long roomId); Page findMessagesByRoomIdWithPagination(Long roomId, Pageable pageable); int softDeleteMessagesOlderThan(LocalDateTime cutoffDate, int batchSize); - long countActiveMessagesByRoomId(Long roomId); List findUndeliveredMessages(); + Optional findDuplicateMessage(Long roomId, Long senderId, String content, LocalDateTime since); } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepositoryImpl.java b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepositoryImpl.java index f30a390a..21fa78d3 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepositoryImpl.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatMessageCustomRepositoryImpl.java @@ -2,9 +2,7 @@ import com.querydsl.jpa.impl.JPAQueryFactory; import com.sounganization.botanify.domain.chat.entity.ChatMessage; -import com.sounganization.botanify.domain.chat.entity.ChatRoom; import com.sounganization.botanify.domain.chat.entity.QChatMessage; -import com.sounganization.botanify.domain.chat.entity.QChatRoom; import lombok.RequiredArgsConstructor; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; @@ -13,49 +11,13 @@ import java.time.LocalDateTime; import java.util.List; +import java.util.Optional; @Repository @RequiredArgsConstructor public class ChatMessageCustomRepositoryImpl implements ChatMessageCustomRepository { private final JPAQueryFactory jpaQueryFactory; - @Override - public List findMessagesByRoomId(Long roomId) { - QChatMessage message = QChatMessage.chatMessage; - - return jpaQueryFactory - .selectFrom(message) - .where(message.chatRoom.id.eq(roomId)) - .orderBy(message.createdAt.asc()) - .fetch(); - } - - @Override - public List findRecentMessages(ChatRoom room, LocalDateTime after) { - QChatMessage message = QChatMessage.chatMessage; - - return jpaQueryFactory - .selectFrom(message) - .where(message.chatRoom.eq(room) - .and(message.createdAt.after(after))) - .orderBy(message.createdAt.asc()) - .fetch(); - } - - @Override - public List findMessagesWithRoomByRoomId(Long roomId) { - QChatMessage message = QChatMessage.chatMessage; - QChatRoom room = QChatRoom.chatRoom; - - return jpaQueryFactory - .selectFrom(message) - .join(message.chatRoom, room) - .fetchJoin() - .where(message.chatRoom.id.eq(roomId)) - .orderBy(message.createdAt.asc()) - .fetch(); - } - @Override public Page findMessagesByRoomIdWithPagination(Long roomId, Pageable pageable) { QChatMessage message = QChatMessage.chatMessage; @@ -74,7 +36,7 @@ public Page findMessagesByRoomIdWithPagination(Long roomId, Pageabl .where(message.chatRoom.id.eq(roomId)) .fetchOne(); - return new PageImpl<>(messages, pageable, total); + return new PageImpl<>(messages, pageable, total != null ? total : 0L); } @Override @@ -115,17 +77,17 @@ public List findUndeliveredMessages() { } @Override - public long countActiveMessagesByRoomId(Long roomId) { + public Optional findDuplicateMessage(Long roomId, Long senderId, String content, LocalDateTime since) { QChatMessage message = QChatMessage.chatMessage; - Long count = jpaQueryFactory - .select(message.count()) - .from(message) + return Optional.ofNullable(jpaQueryFactory + .selectFrom(message) .where(message.chatRoom.id.eq(roomId) + .and(message.senderId.eq(senderId)) + .and(message.content.eq(content)) + .and(message.createdAt.goe(since)) .and(message.deletedYn.isFalse())) - .fetchOne(); - - return count != null ? count : 0L; - + .orderBy(message.createdAt.desc()) + .fetchFirst()); } } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepository.java b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepository.java index d2089659..5947ba26 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepository.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepository.java @@ -5,13 +5,10 @@ import org.springframework.data.domain.Pageable; import java.time.LocalDateTime; -import java.util.List; import java.util.Optional; public interface ChatRoomCustomRepository { - List findRoomsByUserId(Long userId); Optional findRoomByUsers(Long senderUserId, Long receiverUserId); - List findRoomsWithMessagesById(Long userId); Page findRoomsByUserIdWithPagination(Long userId, Pageable pageable); int softDeleteEmptyRoomsOlderThan(LocalDateTime cutoffDate); } diff --git a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepositoryImpl.java b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepositoryImpl.java index c926af10..4799da75 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepositoryImpl.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/repository/ChatRoomCustomRepositoryImpl.java @@ -20,17 +20,6 @@ public class ChatRoomCustomRepositoryImpl implements ChatRoomCustomRepository { private final JPAQueryFactory jpaQueryFactory; - @Override - public List findRoomsByUserId(Long userId) { - QChatRoom chatRoom = QChatRoom.chatRoom; - - return jpaQueryFactory - .selectFrom(chatRoom) - .where(chatRoom.senderUserId.eq(userId) - .or(chatRoom.receiverUserId.eq(userId))) - .fetch(); - } - @Override public Optional findRoomByUsers(Long senderUserId, Long receiverUserId) { QChatRoom chatRoom = QChatRoom.chatRoom; @@ -44,21 +33,6 @@ public Optional findRoomByUsers(Long senderUserId, Long receiverUserId return Optional.ofNullable(result); } - @Override - public List findRoomsWithMessagesById(Long userId) { - QChatRoom chatRoom = QChatRoom.chatRoom; - QChatMessage message = QChatMessage.chatMessage; - - return jpaQueryFactory - .selectFrom(chatRoom) - .distinct() - .leftJoin(chatRoom.messages, message) - .fetchJoin() - .where(chatRoom.senderUserId.eq(userId) - .or(chatRoom.receiverUserId.eq(userId))) - .fetch(); - } - @Override public Page findRoomsByUserIdWithPagination(Long userId, Pageable pageable) { QChatRoom chatRoom = QChatRoom.chatRoom; @@ -79,7 +53,7 @@ public Page findRoomsByUserIdWithPagination(Long userId, Pageable page .or(chatRoom.receiverUserId.eq(userId))) .fetchOne(); - return new PageImpl<>(rooms, pageable, total); + return new PageImpl<>(rooms, pageable, total != null ? total : 0L); } @Override diff --git a/src/main/java/com/sounganization/botanify/domain/chat/service/ChatMessageService.java b/src/main/java/com/sounganization/botanify/domain/chat/service/ChatMessageService.java index e6a65b20..e5507dbd 100644 --- a/src/main/java/com/sounganization/botanify/domain/chat/service/ChatMessageService.java +++ b/src/main/java/com/sounganization/botanify/domain/chat/service/ChatMessageService.java @@ -1,24 +1,26 @@ package com.sounganization.botanify.domain.chat.service; -import com.fasterxml.jackson.databind.ObjectMapper; import com.sounganization.botanify.common.exception.CustomException; import com.sounganization.botanify.common.exception.ExceptionStatus; -import com.sounganization.botanify.domain.chat.components.ChatFailureHandler; -import com.sounganization.botanify.domain.chat.dto.req.ChatMessageReqDto; import com.sounganization.botanify.domain.chat.entity.ChatMessage; import com.sounganization.botanify.domain.chat.entity.ChatRoom; -import com.sounganization.botanify.domain.chat.event.ChatMessageEvent; import com.sounganization.botanify.domain.chat.repository.ChatMessageRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Future; @Service @RequiredArgsConstructor @@ -26,12 +28,32 @@ public class ChatMessageService { private final ChatMessageRepository chatMessageRepository; private final ChatRoomService chatRoomService; + private final TransactionTemplate transactionTemplate; private final StringRedisTemplate redisTemplate; - private final ObjectMapper objectMapper; - private final ApplicationEventPublisher eventPublisher; @Transactional - public ChatMessage saveMessage(Long roomId, Long senderId, String content) { + public ChatMessage createMessage(Long roomId, Long senderId, String content) { + log.debug("메시지 생성 중 - roomId: {}, senderId: {}", roomId, senderId); + + Optional existingMessage = chatMessageRepository + .findDuplicateMessage(roomId, senderId, content, LocalDateTime.now().minusSeconds(10)); + + if (existingMessage.isPresent()) { + log.debug("중복 메시지를 발견했습니다. 기존 메시지를 반환합니다."); + return existingMessage.get(); + } + + String dedupeKey = String.format("msg:dedupe:%d:%d:%d", + roomId, senderId, content.hashCode()); + + Boolean isNew = redisTemplate.opsForValue() + .setIfAbsent(dedupeKey, "1", Duration.ofSeconds(10)); + + if (Boolean.FALSE.equals(isNew)) { + log.debug("Redis에서 중복 메시지가 감지되었습니다."); + return null; + } + ChatRoom chatRoom = chatRoomService.getChatRoom(roomId, senderId); if (!chatRoom.getSenderUserId().equals(senderId) && @@ -39,43 +61,62 @@ public ChatMessage saveMessage(Long roomId, Long senderId, String content) { throw new CustomException(ExceptionStatus.UNAUTHORIZED_CHAT_ACCESS); } - ChatMessage message = ChatMessage.builder() + return ChatMessage.builder() .type(ChatMessage.MessageType.TALK) .senderId(senderId) .content(content) .chatRoom(chatRoom) + .delivered(false) .build(); + } - // Redis로 메시지 발행 + @Transactional + public void markMessageAsDelivered(Long messageId) { try { - ChatMessageReqDto messageDto = new ChatMessageReqDto( - ChatMessageReqDto.MessageType.TALK, - roomId, - senderId, - content - ); - - redisTemplate.convertAndSend( - "chat_room_" + roomId, - objectMapper.writeValueAsString(messageDto) - ); + transactionTemplate.execute(status -> { + chatMessageRepository.findById(messageId).ifPresent(message -> { + message.markAsDelivered(); + chatMessageRepository.save(message); + log.debug("메시지 {}를 전달됨으로 표시했습니다.", messageId); + }); + return null; + }); } catch (Exception e) { - log.error("메시지 발행 중 오류 발생: {}", e.getMessage()); - // Redis 실패 시 이벤트 발행 - eventPublisher.publishEvent(new ChatMessageEvent(new ChatMessageReqDto( - ChatMessageReqDto.MessageType.TALK, - roomId, - senderId, - content - ))); + log.error("메시지를 전달됨으로 표시하는 데 실패했습니다: {}", e.getMessage(), e); } + } + + @Transactional(readOnly = true) + public ChatMessage findExistingMessage(Long roomId, Long senderId, String content) { + return chatMessageRepository.findDuplicateMessage( + roomId, + senderId, + content, + LocalDateTime.now().minusSeconds(30) + ).orElse(null); + } - // 비동기적으로 DB에 저장 - return CompletableFuture.supplyAsync(() -> chatMessageRepository.save(message)) - .exceptionally(throwable -> { - log.error("메시지 저장 중 오류 발생: {}", throwable.getMessage()); - return message; - }).join(); + @Async("chatThreadPoolTaskExecutor") + public Future saveMessageAsync(ChatMessage message) { + return CompletableFuture.supplyAsync(() -> { + try { + return transactionTemplate.execute(status -> { + try { + ChatMessage saved = chatMessageRepository.save(message); + log.info("메시지가 비동기적으로 저장되었습니다. roomId: {}", + message.getChatRoom().getId()); + return saved; + } catch (Exception e) { + log.error("메시지 저장 중 오류 발생: {}", e.getMessage(), e); + status.setRollbackOnly(); + throw e; + } + }); + } catch (Exception e) { + log.error("메시지 저장 실패", e); + throw new CompletionException(e); + } + }); } @Transactional(readOnly = true) diff --git a/src/main/resources/.DS_Store b/src/main/resources/.DS_Store new file mode 100644 index 00000000..25b6478e Binary files /dev/null and b/src/main/resources/.DS_Store differ diff --git a/src/main/resources/static/.DS_Store b/src/main/resources/static/.DS_Store new file mode 100644 index 00000000..c93fa26d Binary files /dev/null and b/src/main/resources/static/.DS_Store differ diff --git "a/src/main/resources/static/images/ERD_\354\265\234\354\242\205.jpeg" "b/src/main/resources/static/images/ERD_\354\265\234\354\242\205.jpeg" new file mode 100644 index 00000000..fb4c89a2 Binary files /dev/null and "b/src/main/resources/static/images/ERD_\354\265\234\354\242\205.jpeg" differ diff --git a/src/test/java/com/sounganization/botanify/domain/chat/ChatFailureHandlerTest.java b/src/test/java/com/sounganization/botanify/domain/chat/ChatFailureHandlerTest.java index b410ed23..54efe26b 100644 --- a/src/test/java/com/sounganization/botanify/domain/chat/ChatFailureHandlerTest.java +++ b/src/test/java/com/sounganization/botanify/domain/chat/ChatFailureHandlerTest.java @@ -38,7 +38,8 @@ void handleRedisFailure_ShouldDelegateToWebSocketService() { ChatMessageReqDto.MessageType.TALK, 1L, 1L, - "Test message" + "Test message", + ChatMessageReqDto.MessageSource.WEBSOCKET ); // When diff --git a/src/test/java/com/sounganization/botanify/domain/chat/ConnectionFailureHandlerTest.java b/src/test/java/com/sounganization/botanify/domain/chat/ConnectionFailureHandlerTest.java index fe0ef861..0aaba959 100644 --- a/src/test/java/com/sounganization/botanify/domain/chat/ConnectionFailureHandlerTest.java +++ b/src/test/java/com/sounganization/botanify/domain/chat/ConnectionFailureHandlerTest.java @@ -48,7 +48,8 @@ void handleConnectionFailure_ShouldSaveMessage() { ChatMessageReqDto.MessageType.TALK, 1L, 1L, - "Test message" + "Test message", + ChatMessageReqDto.MessageSource.WEBSOCKET ); when(chatRoomRepository.findById(1L))