Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/CDE-319-checkup-de-logskbart' in…
Browse files Browse the repository at this point in the history
…to CDE-319-checkup-de-logskbart
  • Loading branch information
SamuelQuetin committed Dec 20, 2023
2 parents b6a0f17 + 6807adc commit 5fefb00
Showing 1 changed file with 13 additions and 21 deletions.
34 changes: 13 additions & 21 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -70,59 +67,54 @@ public void loadFile(File fichier, String kbartHeader) {
private void executeMultiThread(File fichier, String kbartHeader) throws IOException {
// Compteur du nombre de lignes dans le kbart
int lineCounter = 0;
// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(fichier.getName());
try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) {
List<String> fileContent = buff.lines().toList();
Integer nbLignesFichier = fileContent.size() - 1;
log.debug("Début d'envoi de "+ nbLignesFichier + " lignes du fichier");
for (String ligneKbart : fileContent) {
lineCounter++;
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);


final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
ThreadContext.put("package", fichier.getName());
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes()));
headers.add(new RecordHeader("nbLinesTotal", String.valueOf(nbLignesFichier).getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers);
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), fichier.getName(), mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
assert result != null;
result.whenComplete((sr, ex) -> {
try {
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
// log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
mapper.writeValueAsString(result.get().getProducerRecord().value());
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
log.warn("Erreur dans le mapping du résultat de l'envoi dans le topic pour la ligne " + ligneKbartDto);
}
});
} catch (JsonProcessingException e) {
sendErrorToKafka("erreur de mapping des données au chargement de la ligne " + finalLineCounter, e, kafkaHeader);
sendErrorToKafka("erreur de mapping des données au chargement de la ligne " + finalLineCounter, e, fichier.getName());
throw new RuntimeException(e);
}
});
}
}

} catch (IOException ex) {
sendErrorToKafka("erreur de lecture du fichier", ex, kafkaHeader);
sendErrorToKafka("erreur de lecture du fichier", ex, fichier.getName());
} finally {
executor.shutdown();
}
}

private void sendErrorToKafka(String errorMessage, Exception exception, Header kafkaHeader) {
log.info("Envoi erreur");
Message<String> message = MessageBuilder
.withPayload(errorMessage + exception.getMessage())
.setHeader(KafkaHeaders.TOPIC, topicErrors)
.setHeader("FileName", kafkaHeader.getFileName())
.build();
kafkaTemplate.send(message);
private void sendErrorToKafka(String errorMessage, Exception exception, String filename) {
log.debug("Envoi erreur");
kafkaTemplate.send(new ProducerRecord<>(topicErrors, new Random().nextInt(nbThread), filename, errorMessage + exception.getMessage()));
}

/**
Expand Down

0 comments on commit 5fefb00

Please sign in to comment.