Skip to content

Commit

Permalink
Merge pull request #70 from abes-esr/CDE-319-checkup-de-logskbart
Browse files Browse the repository at this point in the history
Cde 319 checkup de logskbart
  • Loading branch information
EryneKL authored Dec 20, 2023
2 parents 746915b + 5fefb00 commit dfaa6cb
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.tsv
*.properties
hibernate.cfg.xml
HELP.md
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import fr.abes.kbart2kafka.service.FileService;
import fr.abes.kbart2kafka.utils.CheckFiles;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -42,12 +43,14 @@ public void run(String... args) throws IOException {
log.error("Message envoyé : {}", "Le chemin d'accès au fichier tsv n'a pas été trouvé dans les paramètres de l'application");
} else {
log.info("Debut envois kafka de : " + args[0]);
ThreadContext.put("package", args[0]);
// Récupération du chemin d'accès au fichier
File tsvFile = new File(args[0]);
// Appelle du service de vérification de fichier
try {
CheckFiles.verifyFile(tsvFile, kbartHeader);
} catch (IllegalFileFormatException | IllegalProviderException e) {
log.error(e.getMessage());
throw new RuntimeException(e);
}
service.loadFile(tsvFile, kbartHeader);
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -70,23 +70,29 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep
try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) {
List<String> fileContent = buff.lines().toList();
Integer nbLignesFichier = fileContent.size() - 1;
log.debug("Début d'envoi de "+ nbLignesFichier + " lignes du fichier");
for (String ligneKbart : fileContent) {
lineCounter++;
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
ThreadContext.put("package", fichier.getName());
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes()));
headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), fichier.getName(), mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
assert result != null;
result.whenComplete((sr, ex) -> {
try {
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
// log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
mapper.writeValueAsString(result.get().getProducerRecord().value());
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
log.warn("Erreur dans le mapping du résultat de l'envoi dans le topic pour la ligne " + ligneKbartDto);
}
Expand Down

0 comments on commit dfaa6cb

Please sign in to comment.