Skip to content

Commit

Permalink
Feat: 카프카로 인스턴스 간 채팅메시지 동기화
Browse files Browse the repository at this point in the history
  • Loading branch information
jzakka committed Feb 19, 2024
1 parent fa85cf1 commit 6812fc4
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 5 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ dependencies {
testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor'

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

// 구글 지오코딩 api
implementation 'com.google.maps:google-maps-services:2.2.0'

Expand Down
28 changes: 27 additions & 1 deletion infra/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
version: '3'
networks:
kafka-net:
driver: bridge
services:
redis:
image: redis
Expand All @@ -17,4 +20,27 @@ services:
- ./provision/mysql/init:/docker-entrypoint-initdb.d
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
- --collation-server=utf8mb4_unicode_ci
zookeeper:
image: 'wurstmeister/zookeeper'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "chats:1:1"
ports:
- "9092:9092"
depends_on:
- zookeeper
networks:
- kafka-net
2 changes: 1 addition & 1 deletion src/main/java/com/stoury/config/sse/SseEmitters.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private SseEmitter chatRoomEmitter(Long roomId) {
return emitter;
}

public void broadCast(Long roomId, ChatMessageResponse chatMessage) {
public void broadcast(Long roomId, ChatMessageResponse chatMessage) {
Long roomIdNotNull = Objects.requireNonNull(roomId, ROOM_ID_NULL_MESSAGE);

SseEmitter emitter = get(roomIdNotNull);
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/stoury/service/ChatService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.stoury.repository.ChatMessageRepository;
import com.stoury.repository.ChatRoomRepository;
import com.stoury.repository.MemberRepository;
import com.stoury.service.kafka.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.PageRequest;
Expand All @@ -35,6 +36,7 @@ public class ChatService {
private final ChatMessageRepository chatMessageRepository;
private final SseEmitters sseEmitters;
private final ApplicationEventPublisher eventPublisher;
private final KafkaProducer kafkaProducer;

@Transactional
public ChatRoomResponse createChatRoom(Long senderId, Long receiverId) {
Expand Down Expand Up @@ -89,7 +91,8 @@ public ChatMessageResponse sendChatMessage(Long memberId, Long chatRoomId, Strin
}

ChatMessageResponse chatMessage = publishChatMessageSaveEvent(memberId, chatRoomId, textContent);
sseEmitters.broadCast(chatRoomId, chatMessage);

kafkaProducer.produce(chatMessage);
return chatMessage;
}

Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/stoury/service/kafka/KafkaConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.stoury.service.kafka;

import com.stoury.config.sse.SseEmitters;
import com.stoury.dto.chat.ChatMessageResponse;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final SseEmitters sseEmitters;

@KafkaListener(topics = "chats", containerGroup = "${spring.kafka.consumer.group-id}")
public void broadcast(@Payload ChatMessageResponse chatMessage) {
sseEmitters.broadcast(chatMessage.chatRoomId(), chatMessage);
}
}
31 changes: 31 additions & 0 deletions src/main/java/com/stoury/service/kafka/KafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.stoury.service.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.stoury.exception.chat.ChatMessageSendException;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public static final String TOPIC_NAME="chats";

public <T> void produce(T message) {
String rawMessageJson = convertToString(message);
kafkaTemplate.send(TOPIC_NAME, rawMessageJson);
}

private <T> String convertToString(T message) {
String rawMessageJson = null;
try {
rawMessageJson = objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new ChatMessageSendException(e);
}
return rawMessageJson;
}
}
18 changes: 18 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ server:
http-only: true
timeout: 3600
spring:
application:
name: Stoury
kafka:
topic-name: chats
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.value.default.type: com.stoury.dto.chat.ChatMessageResponse
data:
redis:
port: 8379
Expand Down
7 changes: 5 additions & 2 deletions src/test/groovy/com/stoury/service/ChatServiceTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import com.stoury.config.sse.SseEmitters
import com.stoury.domain.ChatMessage
import com.stoury.domain.ChatRoom
import com.stoury.domain.Member
import com.stoury.dto.chat.ChatMessageResponse
import com.stoury.event.ChatMessageSaveEvent
import com.stoury.exception.authentication.NotAuthorizedException
import com.stoury.repository.ChatMessageRepository
import com.stoury.repository.ChatRoomRepository
import com.stoury.repository.MemberRepository
import com.stoury.service.kafka.KafkaProducer
import org.springframework.context.ApplicationEventPublisher
import spock.lang.Specification

Expand All @@ -20,7 +22,8 @@ class ChatServiceTest extends Specification {
def chatMessageRepository = Mock(ChatMessageRepository)
def sseEmitters = Mock(SseEmitters)
def eventPublisher = Mock(ApplicationEventPublisher)
def chatService = new ChatService(memerRepository, chatRoomRepository, chatMessageRepository, sseEmitters, eventPublisher)
def kafkaProducer = Mock(KafkaProducer)
def chatService = new ChatService(memerRepository, chatRoomRepository, chatMessageRepository, sseEmitters, eventPublisher,kafkaProducer)

def "채팅방 개설"() {
given:
Expand Down Expand Up @@ -128,7 +131,7 @@ class ChatServiceTest extends Specification {
when:
chatService.sendChatMessage(1, 2, "Hi!")
then:
1 * sseEmitters.broadCast(_,_)
1 * kafkaProducer.produce(_ as ChatMessageResponse)
}

def "채팅전송 불가, 내가 참여한 채팅방이 아님"() {
Expand Down

0 comments on commit 6812fc4

Please sign in to comment.