From afb4bfff7b7985cd56714e5ce33757953213a484 Mon Sep 17 00:00:00 2001 From: koreanMike513 Date: Fri, 17 Jan 2025 21:01:27 +0000 Subject: [PATCH] feat: Added kafka configs to Payment Service - added kafka consumer and producer configs - added kafka implementations - removed application.properties and added application.yml --- payment/build.gradle | 4 + .../payment/config/KafkaConsumerConfig.java | 57 ++++++++++++++ .../payment/config/KafkaProducerConfig.java | 78 +++++++++++++++++++ .../src/main/resources/application.properties | 1 - payment/src/main/resources/application.yml | 37 +++++++++ 5 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaConsumerConfig.java create mode 100644 payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaProducerConfig.java delete mode 100644 payment/src/main/resources/application.properties create mode 100644 payment/src/main/resources/application.yml diff --git a/payment/build.gradle b/payment/build.gradle index fd9838f..1ed502a 100644 --- a/payment/build.gradle +++ b/payment/build.gradle @@ -8,4 +8,8 @@ dependencies { testImplementation 'org.springframework.boot:spring-boot-starter-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + // kafka 설정 추가 + implementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.kafka:spring-kafka' } diff --git a/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaConsumerConfig.java b/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..07053fd --- /dev/null +++ b/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaConsumerConfig.java @@ -0,0 +1,57 @@ +package com.f_lab.la_planete.payment.config; + + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.consumer.bootstrap-servers}") + private String BOOTSTRAP_SERVERS; + + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean AUTO_COMMIT; + + @Value("${kafka.container.concurrency:3}") + private int CONCURRENCY; + + @Bean + public Map consumerConfig() { + Map consumerConfig = new HashMap<>(); + + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "com.f_lab.la_planete.core.*"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT); + + return consumerConfig; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(CONCURRENCY); + + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfig()); + } +} diff --git a/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaProducerConfig.java b/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaProducerConfig.java new file mode 100644 index 0000000..20e6d48 --- /dev/null +++ b/payment/src/main/java/com/f_lab/la_planete/payment/config/KafkaProducerConfig.java @@ -0,0 +1,78 @@ +package com.f_lab.la_planete.payment.config; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + + @Value("${spring.kafka.producer.bootstrap-servers:localhost:9092}") + private String BOOTSTRAP_SERVERS; + + @Value("${spring.kafka.producer.ack:all}") + private String ACK; + + @Value("${spring.kafka.producer.enable.idempotence:true}") + private String IDEMPOTENCE; + + @Value("${payments.commands.topic.name}") + private String paymentProcessCommand; + + @Value("${payments.events.topic.name}") + private String paymentProcessedEvents; + + @Value("${kafka.topic.partitions:3}") + private int TOPIC_PARTITIONS; + + + @Bean + public Map producerConfig() { + Map config = new HashMap<>(); + + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, IDEMPOTENCE); + config.put(ProducerConfig.ACKS_CONFIG, ACK); + + return config; + } + + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfig()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public NewTopic foodsProcessFoodsReserveCommand() { + return TopicBuilder + .name(paymentProcessCommand) + .partitions(TOPIC_PARTITIONS) + .build(); + } + + @Bean + public NewTopic foodsFoodsReservationProcessedEvent() { + return TopicBuilder + .name(paymentProcessedEvents) + .partitions(TOPIC_PARTITIONS) + .build(); + } +} diff --git a/payment/src/main/resources/application.properties b/payment/src/main/resources/application.properties deleted file mode 100644 index 436e5e2..0000000 --- a/payment/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.application.name=payment diff --git a/payment/src/main/resources/application.yml b/payment/src/main/resources/application.yml new file mode 100644 index 0000000..36a0682 --- /dev/null +++ b/payment/src/main/resources/application.yml @@ -0,0 +1,37 @@ +server.port: 0 + +spring: + application: + name: payments + + kafka: + producer: + bootstrap-servers: localhost:9092 + acks: all + enable: + idempotence: true + + consumer: + bootstrap-servers: localhost:9092 + group-id: payments + enable-auto-commit: false + properties: + spring.json.trusted.packages: com.f-lab.la_planete.core.* + +payments: + commands: + topic: + name: payments.payment-process-command + + events: + topic: + name: payments.payment-processed-event + + + +kafka: + topic: + partitions: 3 + + container: + concurrency: 3 \ No newline at end of file