Skip to content

Commit

Permalink
modif thread context
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelQuetin committed Dec 19, 2023
1 parent 5f55a68 commit ef14e82
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
Expand Down Expand Up @@ -70,15 +71,16 @@ private void executeMultiThread(File fichier, String kbartHeader) throws IOExcep
List<String> fileContent = buff.lines().toList();
Integer nbLignesFichier = fileContent.size() - 1;
for (String ligneKbart : fileContent) {
lineCounter++;
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
ThreadContext.put("package", fichier.getName());
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", fichier.getName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("nbCurrentLines", String.valueOf(finalLineCounter).getBytes()));
Expand Down

0 comments on commit ef14e82

Please sign in to comment.