Skip to content

Commit

Permalink
Merge pull request #71 from abes-esr/develop
Browse files Browse the repository at this point in the history
merge dev to test
  • Loading branch information
EryneKL authored Dec 20, 2023
2 parents ce5809b + dfaa6cb commit 11292c8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.tsv
*.properties
hibernate.cfg.xml
HELP.md
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import fr.abes.kbart2kafka.service.FileService;
import fr.abes.kbart2kafka.utils.CheckFiles;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -42,12 +43,14 @@ public void run(String... args) throws IOException {
log.error("Message envoyé : {}", "Le chemin d'accès au fichier tsv n'a pas été trouvé dans les paramètres de l'application");
} else {
log.info("Debut envois kafka de : " + args[0]);
ThreadContext.put("package", args[0]);
// Récupération du chemin d'accès au fichier
File tsvFile = new File(args[0]);
// Appelle du service de vérification de fichier
try {
CheckFiles.verifyFile(tsvFile, kbartHeader);
} catch (IllegalFileFormatException | IllegalProviderException e) {
log.error(e.getMessage());
throw new RuntimeException(e);
}
service.loadFile(tsvFile, kbartHeader);
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -70,23 +70,29 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep
try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) {
List<String> fileContent = buff.lines().toList();
Integer nbLignesFichier = fileContent.size() - 1;
log.debug("Début d'envoi de "+ nbLignesFichier + " lignes du fichier");
for (String ligneKbart : fileContent) {
lineCounter++;
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
ThreadContext.put("package", fichier.getName());
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes()));
headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), fichier.getName(), mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
assert result != null;
result.whenComplete((sr, ex) -> {
try {
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
// log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
mapper.writeValueAsString(result.get().getProducerRecord().value());
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
log.warn("Erreur dans le mapping du résultat de l'envoi dans le topic pour la ligne " + ligneKbartDto);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/fr/abes/kbart2kafka/utils/CheckFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class CheckFiles {

public static void detectFileName(File file) throws IllegalFileFormatException {
String filename = file.getName();
if(!filename.matches("([a-zA-Z0-9\\-_]+_[a-zA-Z0-9\\-]+_)+(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")){
if (!filename.matches("([a-zA-Z0-9\\-]+_){3}(\\d{4}-\\d{2}-\\d{2})+(_FORCE)?+(.tsv)$")) {
log.error("Message envoyé : {}", "Le nom du fichier n'est pas correct");
throw new IllegalFileFormatException("Le nom du fichier "+ filename +" n'est pas correct");
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/fr/abes/kbart2kafka/utils/CheckFilesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ void isFileWithTSVExtension() throws IllegalFileFormatException {

@Test
void detectFileName() throws IllegalFileFormatException {
this.file = new File("test_test_test_test1_1234-12-12.tsv");
this.file = new File("test_test_test_1234-12-12.tsv");
CheckFiles.detectFileName(file);

this.file2 = new File("test_test_test_test1_1234-12-12_FORCE.tsv");
this.file2 = new File("test_test_test_1234-12-12_FORCE.tsv");
CheckFiles.detectFileName(file2);

for(String name : Lists.newArrayList("123", "test_1234-12-12.tsv", "test_test_134-12-12.tsv", "test_test_1344-12-12.tsvf", "test_test_1344-12-123.tsv", "test_test_test_test1_1234-12-12_force.tsv")) {
Expand Down

0 comments on commit 11292c8

Please sign in to comment.