Skip to content

Commit

Permalink
Merge pull request #68 from abes-esr/develop
Browse files Browse the repository at this point in the history
Merge develop dans main
  • Loading branch information
EryneKL authored Dec 19, 2023
2 parents f7c0ac6 + e1795fc commit ce5809b
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
Expand Down Expand Up @@ -67,8 +67,6 @@ public void loadFile(File fichier, String kbartHeader) {
private void executeMultiThread(File fichier, String kbartHeader) throws IOException {
// Compteur du nombre de lignes dans le kbart
int lineCounter = 0;
// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(fichier.getName());
try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) {
List<String> fileContent = buff.lines().toList();
Integer nbLignesFichier = fileContent.size() - 1;
Expand All @@ -79,14 +77,12 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);


final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers);
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), fichier.getName(), mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
result.whenComplete((sr, ex) -> {
try {
Expand All @@ -96,28 +92,23 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep
}
});
} catch (JsonProcessingException e) {
sendErrorToKafka("erreur de mapping des données au chargement de la ligne " + finalLineCounter, e, kafkaHeader);
sendErrorToKafka("erreur de mapping des données au chargement de la ligne " + finalLineCounter, e, fichier.getName());
throw new RuntimeException(e);
}
});
}
}

} catch (IOException ex) {
sendErrorToKafka("erreur de lecture du fichier", ex, kafkaHeader);
sendErrorToKafka("erreur de lecture du fichier", ex, fichier.getName());
} finally {
executor.shutdown();
}
}

private void sendErrorToKafka(String errorMessage, Exception exception, Header kafkaHeader) {
log.info("Envoi erreur");
Message<String> message = MessageBuilder
.withPayload(errorMessage + exception.getMessage())
.setHeader(KafkaHeaders.TOPIC, topicErrors)
.setHeader("FileName", kafkaHeader.getFileName())
.build();
kafkaTemplate.send(message);
private void sendErrorToKafka(String errorMessage, Exception exception, String filename) {
log.debug("Envoi erreur");
kafkaTemplate.send(new ProducerRecord<>(topicErrors, new Random().nextInt(nbThread), filename, errorMessage + exception.getMessage()));
}

/**
Expand Down

0 comments on commit ce5809b

Please sign in to comment.