From 28e1d403e3715487c5ebeebd268f79777e48bde0 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Wed, 6 Dec 2023 09:05:02 +0100 Subject: [PATCH 1/8] ajout log --- .../java/fr/abes/kbart2kafka/service/FileService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java index 6e7ce9d..e8c6868 100644 --- a/src/main/java/fr/abes/kbart2kafka/service/FileService.java +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -17,7 +17,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -72,6 +75,7 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) { List fileContent = buff.lines().toList(); Integer nbLignesFichier = fileContent.size() - 1; + log.debug("Début d'envoi de "+ nbLignesFichier + " lignes du fichier"); for (String ligneKbart : fileContent) { if (!ligneKbart.contains(kbartHeader)) { lineCounter++; @@ -88,9 +92,11 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes())); ProducerRecord record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers); CompletableFuture> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record)); + assert result != null; result.whenComplete((sr, ex) -> { try { - log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value())); +// log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value())); + mapper.writeValueAsString(result.get().getProducerRecord().value()); } catch (JsonProcessingException | InterruptedException | ExecutionException e) { log.warn("Erreur dans le mapping du résultat de l'envoi dans le topic pour la ligne " + ligneKbartDto); } From 9141f769ea5b7972d581de4effb9868551387c96 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Fri, 15 Dec 2023 11:29:44 +0100 Subject: [PATCH 2/8] Maj gitignore Ajout vrai nb line --- .gitignore | 1 + src/main/java/fr/abes/kbart2kafka/service/FileService.java | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 777ee3e..68d7042 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.tsv *.properties hibernate.cfg.xml HELP.md diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java index 6e7ce9d..217818d 100644 --- a/src/main/java/fr/abes/kbart2kafka/service/FileService.java +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -17,7 +17,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -85,6 +88,7 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep try { List headers = new ArrayList<>(); headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes())); headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes())); ProducerRecord record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers); CompletableFuture> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record)); From 4c6c20d1c193c7b9a491cfed09f737dc00197d36 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Mon, 18 Dec 2023 11:52:55 +0100 Subject: [PATCH 3/8] Gestion threadContext pas fini --- src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java | 3 +++ src/main/java/fr/abes/kbart2kafka/service/FileService.java | 1 + src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java | 3 +++ 3 files changed, 7 insertions(+) diff --git a/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java b/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java index 8644476..46379f5 100644 --- a/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java +++ b/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java @@ -5,6 +5,7 @@ import fr.abes.kbart2kafka.service.FileService; import fr.abes.kbart2kafka.utils.CheckFiles; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.ThreadContext; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -42,12 +43,14 @@ public void run(String... args) throws IOException { log.error("Message envoyé : {}", "Le chemin d'accès au fichier tsv n'a pas été trouvé dans les paramètres de l'application"); } else { log.info("Debut envois kafka de : " + args[0]); + ThreadContext.put("package", args[0]); // Récupération du chemin d'accès au fichier File tsvFile = new File(args[0]); // Appelle du service de vérification de fichier try { CheckFiles.verifyFile(tsvFile, kbartHeader); } catch (IllegalFileFormatException | IllegalProviderException e) { + log.error(e.getMessage()); throw new RuntimeException(e); } service.loadFile(tsvFile, kbartHeader); diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java index 217818d..247b75f 100644 --- a/src/main/java/fr/abes/kbart2kafka/service/FileService.java +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -92,6 +92,7 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes())); ProducerRecord record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers); CompletableFuture> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record)); + assert result != null; result.whenComplete((sr, ex) -> { try { log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value())); diff --git a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java index 3ec2017..ddf4bb8 100644 --- a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java +++ b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java @@ -3,6 +3,7 @@ import fr.abes.kbart2kafka.exception.IllegalFileFormatException; import fr.abes.kbart2kafka.exception.IllegalProviderException; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.ThreadContext; import java.io.BufferedReader; import java.io.File; @@ -59,7 +60,9 @@ public static void detectTabulations(File file) throws IOException, IllegalFileF try (BufferedReader reader = new BufferedReader(new FileReader(file))) { String line; while ((line = reader.readLine()) != null) { + String nbLine = ""; if (!line.contains("\t")) { + ThreadContext.put("package", file.getName() + ";" + nbLine); log.error("Message envoyé : {}", "Le fichier ne contient pas de tabulation"); throw new IllegalFileFormatException("Le fichier ne contient pas de tabulation"); } From 5f55a68225940894002dbf09c9df001dda172bf5 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Tue, 19 Dec 2023 09:36:35 +0100 Subject: [PATCH 4/8] fix after merge --- src/main/java/fr/abes/kbart2kafka/service/FileService.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java index c0f2a45..f0b01e4 100644 --- a/src/main/java/fr/abes/kbart2kafka/service/FileService.java +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -6,14 +6,10 @@ import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -84,7 +80,7 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep executor.execute(() -> { try { List headers = new ArrayList<>(); - headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8))); headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes())); headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes())); ProducerRecord record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), fichier.getName(), mapper.writeValueAsString(ligneKbartDto), headers); From d9d7792602a782f6ddd4f4925a15d9848b697f48 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Tue, 19 Dec 2023 10:52:17 +0100 Subject: [PATCH 5/8] CDE-328 : Fix : Correction regexp pour controle nom du fichier --- src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java | 3 ++- src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java index 3ec2017..8dd4dfc 100644 --- a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java +++ b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java @@ -14,7 +14,8 @@ public class CheckFiles { public static void detectFileName(File file) throws IllegalFileFormatException { String filename = file.getName(); - if(!filename.matches("([a-zA-Z0-9\\-_]+_[a-zA-Z0-9\\-]+_)+(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")){ + //if(!filename.matches("([a-zA-Z0-9\\-_]+_[a-zA-Z0-9\\-]+_)+(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")){ + if (!filename.matches("([a-zA-Z0-9\\-]+_){3}(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")) { log.error("Message envoyé : {}", "Le nom du fichier n'est pas correct"); throw new IllegalFileFormatException("Le nom du fichier "+ filename +" n'est pas correct"); } diff --git a/src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java b/src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java index 3d26134..caf897c 100644 --- a/src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java +++ b/src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java @@ -41,10 +41,10 @@ void isFileWithTSVExtension() throws IllegalFileFormatException { @Test void detectFileName() throws IllegalFileFormatException { - this.file = new File("test_test_test_test1_1234-12-12.tsv"); + this.file = new File("test_test_test_1234-12-12.tsv"); CheckFiles.detectFileName(file); - this.file2 = new File("test_test_test_test1_1234-12-12_FORCE.tsv"); + this.file2 = new File("test_test_test_1234-12-12_FORCE.tsv"); CheckFiles.detectFileName(file2); for(String name : Lists.newArrayList("123", "test_1234-12-12.tsv", "test_test_134-12-12.tsv", "test_test_1344-12-12.tsvf", "test_test_1344-12-123.tsv", "test_test_test_test1_1234-12-12_force.tsv")) { From ef14e8214bc648ce50903927f6e0d3e091740775 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Tue, 19 Dec 2023 11:31:40 +0100 Subject: [PATCH 6/8] modif thread context --- src/main/java/fr/abes/kbart2kafka/service/FileService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java index f0b01e4..387d6ef 100644 --- a/src/main/java/fr/abes/kbart2kafka/service/FileService.java +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.logging.log4j.ThreadContext; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @@ -70,8 +71,8 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep List fileContent = buff.lines().toList(); Integer nbLignesFichier = fileContent.size() - 1; for (String ligneKbart : fileContent) { + lineCounter++; if (!ligneKbart.contains(kbartHeader)) { - lineCounter++; // Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer String[] tsvElementsOnOneLine = ligneKbart.split("\t"); LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine); @@ -79,6 +80,7 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep final int finalLineCounter = lineCounter; executor.execute(() -> { try { + ThreadContext.put("package", fichier.getName()); List headers = new ArrayList<>(); headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8))); headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes())); From 2ea92c7e9b6aaad805bd91b00fa918e123c61004 Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Tue, 19 Dec 2023 11:33:15 +0100 Subject: [PATCH 7/8] CDE-328 : suppression commentaire --- src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java index 8dd4dfc..189dde3 100644 --- a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java +++ b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java @@ -14,7 +14,6 @@ public class CheckFiles { public static void detectFileName(File file) throws IllegalFileFormatException { String filename = file.getName(); - //if(!filename.matches("([a-zA-Z0-9\\-_]+_[a-zA-Z0-9\\-]+_)+(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")){ if (!filename.matches("([a-zA-Z0-9\\-]+_){3}(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")) { log.error("Message envoyé : {}", "Le nom du fichier n'est pas correct"); throw new IllegalFileFormatException("Le nom du fichier "+ filename +" n'est pas correct"); From b6a0f17702ed71f494d0548d91545f119a7b2f45 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Wed, 20 Dec 2023 08:22:40 +0100 Subject: [PATCH 8/8] Gestion threadContext --- src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java index ddf4bb8..3ec2017 100644 --- a/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java +++ b/src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java @@ -3,7 +3,6 @@ import fr.abes.kbart2kafka.exception.IllegalFileFormatException; import fr.abes.kbart2kafka.exception.IllegalProviderException; import lombok.extern.slf4j.Slf4j; -import org.apache.logging.log4j.ThreadContext; import java.io.BufferedReader; import java.io.File; @@ -60,9 +59,7 @@ public static void detectTabulations(File file) throws IOException, IllegalFileF try (BufferedReader reader = new BufferedReader(new FileReader(file))) { String line; while ((line = reader.readLine()) != null) { - String nbLine = ""; if (!line.contains("\t")) { - ThreadContext.put("package", file.getName() + ";" + nbLine); log.error("Message envoyé : {}", "Le fichier ne contient pas de tabulation"); throw new IllegalFileFormatException("Le fichier ne contient pas de tabulation"); }