-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from mairess/add-messaging-kafka
Add messaging kafka
- Loading branch information
Showing
20 changed files
with
480 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
src/main/java/com/maires/wnet/configuration/ProducerConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package com.maires.wnet.configuration; | ||
|
||
import java.util.Map; | ||
import org.apache.kafka.clients.admin.NewTopic; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.kafka.config.TopicBuilder; | ||
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | ||
import org.springframework.kafka.core.KafkaTemplate; | ||
import org.springframework.kafka.core.ProducerFactory; | ||
|
||
|
||
/** | ||
* The type Producer config. | ||
*/ | ||
@Configuration | ||
public class ProducerConfig { | ||
|
||
private final KafkaProperties kafkaProperties; | ||
|
||
@Value("${kafka.topic}") | ||
private String newInstallationRequest; | ||
|
||
/** | ||
* Instantiates a new Producer config. | ||
* | ||
* @param kafkaProperties the kafka properties | ||
*/ | ||
@Autowired | ||
public ProducerConfig(KafkaProperties kafkaProperties) { | ||
this.kafkaProperties = kafkaProperties; | ||
} | ||
|
||
/** | ||
* Producer factory producer factory. | ||
* | ||
* @return the producer factory | ||
*/ | ||
@Bean | ||
public ProducerFactory<String, String> producerFactory() { | ||
Map<String, Object> properties = kafkaProperties.buildProducerProperties(); | ||
return new DefaultKafkaProducerFactory<>(properties); | ||
} | ||
|
||
/** | ||
* Kafka template kafka template. | ||
* | ||
* @return the kafka template | ||
*/ | ||
@Bean | ||
public KafkaTemplate<String, String> kafkaTemplate() { | ||
return new KafkaTemplate<>(producerFactory()); | ||
} | ||
|
||
/** | ||
* Messaging request topic build new topic. | ||
* | ||
* @return the new topic | ||
*/ | ||
@Bean | ||
public NewTopic messagingRequestTopicBuild() { | ||
return TopicBuilder.name(newInstallationRequest).partitions(1).replicas(1).build(); | ||
} | ||
|
||
} |
56 changes: 56 additions & 0 deletions
56
src/main/java/com/maires/wnet/consumer/NewInstallationConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package com.maires.wnet.consumer; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.maires.wnet.controller.dto.MessagingNewInstallationDto; | ||
import com.maires.wnet.service.EmailService; | ||
import jakarta.mail.MessagingException; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.kafka.annotation.KafkaListener; | ||
import org.springframework.stereotype.Service; | ||
|
||
/** | ||
* The type New installation consumer. | ||
*/ | ||
@Service | ||
public class NewInstallationConsumer { | ||
|
||
private final EmailService emailService; | ||
private final ObjectMapper objectMapper; | ||
|
||
/** | ||
* Instantiates a new New installation consumer. | ||
* | ||
* @param emailService the email service | ||
* @param objectMapper the object mapper | ||
*/ | ||
@Autowired | ||
public NewInstallationConsumer(EmailService emailService, ObjectMapper objectMapper) { | ||
this.emailService = emailService; | ||
this.objectMapper = objectMapper; | ||
} | ||
|
||
/** | ||
* Consume installation message. | ||
* | ||
* @param message the message | ||
* @throws MessagingException the messaging exception | ||
* @throws JsonProcessingException the json processing exception | ||
*/ | ||
@KafkaListener( | ||
topics = "${kafka.topic}", | ||
groupId = "message-request-consumer1" | ||
) | ||
public void consumeNewInstallationMessage(String message) | ||
throws MessagingException, JsonProcessingException { | ||
|
||
MessagingNewInstallationDto messagingNewInstallationDto = objectMapper.readValue(message, | ||
MessagingNewInstallationDto.class); | ||
|
||
emailService.sendNewInstallationMail( | ||
messagingNewInstallationDto.customerMail(), | ||
messagingNewInstallationDto.customerName() | ||
); | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.