From fdf8d68da6267d3ee047feafc1046e29eb7836e0 Mon Sep 17 00:00:00 2001 From: Jaewon Lee <58386334+jaewonLeeKOR@users.noreply.github.com> Date: Sun, 7 Sep 2025 18:37:00 +0900 Subject: [PATCH] =?UTF-8?q?[LB-461]=20feat=20:=20=ED=8A=B8=EB=A0=88?= =?UTF-8?q?=EC=9D=B4=EC=8B=B1=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 6 +- .../common/kafka/config/KafkaConfig.java | 78 ------------------- .../kafka/config/KafkaListenerConfig.java | 22 ------ .../lgcms/streaming/config/KafkaConfig.java | 9 +++ .../streaming/config/ObjectMapperConfig.java | 15 ++++ .../consumer/EncodingConsumeService.java | 2 +- src/main/resources/application-local.yml | 17 ++-- 7 files changed, 40 insertions(+), 109 deletions(-) delete mode 100644 src/main/java/com/lgcms/streaming/common/kafka/config/KafkaConfig.java delete mode 100644 src/main/java/com/lgcms/streaming/common/kafka/config/KafkaListenerConfig.java create mode 100644 src/main/java/com/lgcms/streaming/config/KafkaConfig.java create mode 100644 src/main/java/com/lgcms/streaming/config/ObjectMapperConfig.java diff --git a/build.gradle b/build.gradle index eff0016..1094059 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { } group = 'com.lgcms' -version = '0.1' +version = '0.2' java { toolchain { @@ -42,6 +42,10 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-actuator' runtimeOnly 'io.micrometer:micrometer-registry-prometheus' + + implementation 'io.micrometer:micrometer-tracing-bridge-otel' // 트레이싱 + implementation 'io.opentelemetry:opentelemetry-exporter-otlp' // otel exporter + implementation 'io.github.openfeign:feign-micrometer' // feign 트레이싱 } dependencyManagement { diff --git a/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaConfig.java b/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaConfig.java deleted file mode 100644 index 4bfca84..0000000 --- a/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaConfig.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.lgcms.streaming.common.kafka.config; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.lgcms.streaming.common.kafka.utils.serializer.JsonKafkaDeserializer; -import com.lgcms.streaming.common.kafka.utils.serializer.JsonKafkaSerializer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.*; -import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; -import org.springframework.kafka.support.serializer.JsonDeserializer; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -@EnableKafka -public class KafkaConfig { - @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServer; - @Value("${spring.kafka.consumer.group-id}") - private String groupId; - - @Bean - public ObjectMapper objectMapper() { - return new ObjectMapper() - .registerModule(new JavaTimeModule()); - } - - @Bean - public ProducerFactory producerFactory() { - Map config = new HashMap<>(); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonKafkaSerializer.class); - return new DefaultKafkaProducerFactory<>(config); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - - private Map commonConsumerProps() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); - props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); - props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); - props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); - return props; - } - - public ConsumerFactory consumerFactory(Class valueType) { - Map props = new HashMap<>(commonConsumerProps()); - return new DefaultKafkaConsumerFactory<>( - props, - new StringDeserializer(), - new JsonKafkaDeserializer<>(valueType) - ); - } - - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(Class valueType) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory(valueType)); - return factory; - } -} diff --git a/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaListenerConfig.java b/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaListenerConfig.java deleted file mode 100644 index bf87156..0000000 --- a/src/main/java/com/lgcms/streaming/common/kafka/config/KafkaListenerConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.lgcms.streaming.common.kafka.config; - - -import com.lgcms.streaming.common.kafka.dto.KafkaEvent; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; - -@Configuration -public class KafkaListenerConfig { - - private final KafkaConfig kafkaConfig; - - public KafkaListenerConfig(KafkaConfig kafkaConfig) { - this.kafkaConfig = kafkaConfig; - } - - @Bean - public ConcurrentKafkaListenerContainerFactory defaultFactory() { - return kafkaConfig.kafkaListenerContainerFactory(KafkaEvent.class); - } -} \ No newline at end of file diff --git a/src/main/java/com/lgcms/streaming/config/KafkaConfig.java b/src/main/java/com/lgcms/streaming/config/KafkaConfig.java new file mode 100644 index 0000000..977fb44 --- /dev/null +++ b/src/main/java/com/lgcms/streaming/config/KafkaConfig.java @@ -0,0 +1,9 @@ +package com.lgcms.streaming.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; + +@Configuration +@EnableKafka +public class KafkaConfig { +} diff --git a/src/main/java/com/lgcms/streaming/config/ObjectMapperConfig.java b/src/main/java/com/lgcms/streaming/config/ObjectMapperConfig.java new file mode 100644 index 0000000..7d7b0e6 --- /dev/null +++ b/src/main/java/com/lgcms/streaming/config/ObjectMapperConfig.java @@ -0,0 +1,15 @@ +package com.lgcms.streaming.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ObjectMapperConfig { + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper() + .registerModule(new JavaTimeModule()); + } +} diff --git a/src/main/java/com/lgcms/streaming/event/consumer/EncodingConsumeService.java b/src/main/java/com/lgcms/streaming/event/consumer/EncodingConsumeService.java index 8673e14..b108f24 100644 --- a/src/main/java/com/lgcms/streaming/event/consumer/EncodingConsumeService.java +++ b/src/main/java/com/lgcms/streaming/event/consumer/EncodingConsumeService.java @@ -18,7 +18,7 @@ public class EncodingConsumeService { private final StreamingService streamingService; private final KafkaEventFactory kafkaEventFactory; - @KafkaListener(topics = "ENCODING",containerFactory = "defaultFactory") + @KafkaListener(topics = "ENCODING") public void EncodingEvent(KafkaEvent event){ LectureEncodeDto lectureEncodeDto = kafkaEventFactory.convert(event, LectureEncodeDto.class); diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index b10f586..6129ca2 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -12,28 +12,31 @@ spring: kafka: bootstrap-servers: localhost:38123 - + template: + observation-enabled: true listener: - ack-mode: manual_immediate + observation-enabled: true + ack-mode: manual consumer: group-id: ${spring.application.name} - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer enable-auto-commit: false - auto-offset-reset: latest + auto-offset-reset: earliest max-poll-records: 10 properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer spring.json.trusted.packages: "*" spring.json.use.type.headers: false + spring.json.value.default.type: com.lgcms.streaming.common.kafka.dto.KafkaEvent producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.add.type.headers: false - - cloud: aws: s3: