From 3b5471dcc6c0f2374c8d0f811afead550f755ce5 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:16:11 +0000 Subject: [PATCH 1/3] Setting up GitHub Classroom Feedback From c5c9fa2a051920f9ed4091dd4c07afe97caa54ba Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:16:14 +0000 Subject: [PATCH 2/3] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 97f8c85..98999e5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO) # Распределенная обработка текстовых данных с использованием брокера сообщений ## Цель задания: From 81fcca04c252f9ea87b277522d09b94de7022dfe Mon Sep 17 00:00:00 2001 From: Artemiy Romanov Date: Sat, 10 Jan 2026 14:35:50 +0300 Subject: [PATCH 3/3] solution --- .gitignore | 4 + .gitmodules | 3 + README.md | 8 + build.gradle.kts | 15 +- docker-compose.yml | 32 +++ src/main/java/Main.java | 5 - src/main/java/org/itmo/FileProducer.java | 186 +++++++++++++++++ src/main/java/org/itmo/KafkaAggregator.java | 196 ++++++++++++++++++ .../java/org/itmo/KafkaConsumerWorker.java | 180 ++++++++++++++++ src/main/java/org/itmo/KafkaProducerBase.java | 33 +++ .../java/org/itmo/KafkaSharedResources.java | 33 +++ src/main/java/org/itmo/Main.java | 180 ++++++++++++++++ src/main/java/org/itmo/MessageSerializer.java | 29 +++ src/main/java/org/itmo/ResultMessage.java | 94 +++++++++ src/main/java/org/itmo/Summarizer.java | 106 ++++++++++ src/main/java/org/itmo/TaskMessage.java | 69 ++++++ src/main/java/org/itmo/TextProcessor.java | 119 +++++++++++ text | 1 + 18 files changed, 1283 insertions(+), 10 deletions(-) create mode 100644 .gitignore create mode 100644 .gitmodules create mode 100644 docker-compose.yml delete mode 100644 src/main/java/Main.java create mode 100644 src/main/java/org/itmo/FileProducer.java create mode 100644 src/main/java/org/itmo/KafkaAggregator.java create mode 100644 src/main/java/org/itmo/KafkaConsumerWorker.java create mode 100644 src/main/java/org/itmo/KafkaProducerBase.java create mode 100644 src/main/java/org/itmo/KafkaSharedResources.java create mode 100644 src/main/java/org/itmo/Main.java create mode 100644 src/main/java/org/itmo/MessageSerializer.java create mode 100644 src/main/java/org/itmo/ResultMessage.java create mode 100644 src/main/java/org/itmo/Summarizer.java create mode 100644 src/main/java/org/itmo/TaskMessage.java create mode 100644 src/main/java/org/itmo/TextProcessor.java create mode 160000 text diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..957aa0b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.gradle +.vscode +bin +build \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..c2d3716 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "text"] + path = text + url = git@gist.github.com:c95f3a864828f7f034b7a33d1676e420.git diff --git a/README.md b/README.md index 98999e5..f9b4dac 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,14 @@ [![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO) # Распределенная обработка текстовых данных с использованием брокера сообщений +## Использование + +1. `git submodule update` + +1. `docker compose up` + + + ## Цель задания: Реализовать распределённую систему обработки текстовых данных, где секции текста рассылаются на обработку через брокер сообщений (message broker). Несколько воркеров параллельно обрабатывают секции и отправляют результаты на агрегатор. Воркеры выполняют набор задач diff --git a/build.gradle.kts b/build.gradle.kts index 8e91d96..a3de1fb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,6 @@ plugins { kotlin("jvm") version "1.9.20" + java application } @@ -11,8 +12,8 @@ repositories { } dependencies { - implementation("javax.jms:jms-api:2.0.1") - implementation("org.apache.activemq:activemq-broker:6.1.1") + implementation("org.apache.kafka:kafka-clients:3.6.1") + implementation("com.google.code.gson:gson:2.10.1") testImplementation(kotlin("test")) } @@ -21,9 +22,13 @@ tasks.test { } kotlin { - jvmToolchain(8) + jvmToolchain(14) } application { - mainClass.set("MainKt") -} \ No newline at end of file + mainClass.set("org.itmo.Main") + applicationDefaultJvmArgs = listOf( + "-Dfile.encoding=UTF-8", + "-Dconsole.encoding=UTF-8" + ) +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..b25a7f7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,32 @@ +services: + + broker: + image: apache/kafka:latest + container_name: broker + hostname: broker + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1 + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + environment: + DYNAMIC_CONFIG_ENABLED: true diff --git a/src/main/java/Main.java b/src/main/java/Main.java deleted file mode 100644 index 477e8b9..0000000 --- a/src/main/java/Main.java +++ /dev/null @@ -1,5 +0,0 @@ -public class Main { - public static void main(String[] args) { - System.out.println("Lab 3"); - } -} diff --git a/src/main/java/org/itmo/FileProducer.java b/src/main/java/org/itmo/FileProducer.java new file mode 100644 index 0000000..34d5bea --- /dev/null +++ b/src/main/java/org/itmo/FileProducer.java @@ -0,0 +1,186 @@ +package org.itmo; + +import org.apache.kafka.clients.producer.KafkaProducer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +public class FileProducer extends KafkaProducerBase { + private final String filename; + private final int topN; + public static final String[] TASK_TYPES = { + "WORD_COUNT", + "TOP_N", + "SENTIMENT", + "NAME_REPLACE", + "SORT_SENTENCES" + }; + // public static final String[] TASK_TOPICS = { + // "task-topic-word-count", + // "task-topic-top-n", + // "task-topic-sentiment", + // "task-topic-name-replace", + // "task-topic-sort-sentences" + // }; + + public FileProducer(KafkaProducer sharedProducer, String filename) { + super(sharedProducer); + this.filename = filename; + this.topN = 10; + } + + public FileProducer(KafkaProducer sharedProducer, String filename, int topN) { + super(sharedProducer); + this.filename = filename; + this.topN = topN; + } + + /** + * Разбивает файл на секции (по параграфам) и отправляет задачи в Kafka topics + * @return количество созданных секций + */ + public int prepareTextAndProduce() throws IOException { + Path file; + + try { + file = Paths.get(this.filename); + if (!Files.isRegularFile(file)) { + throw new FileNotFoundException(); + } + } catch (InvalidPathException e) { + throw new IOException("Failed to open file " + filename + "\n" + e.getMessage()); + } catch (FileNotFoundException e) { + throw new IOException("File " + filename + " is not present or a directory"); + } + + // Читаем весь файл + String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8); + + // Разбиваем на секции по параграфам (двойной перенос строки) или предложениям + List sections = splitIntoSections(fileContent); + + System.out.println("[Producer] Split file into " + sections.size() + " sections"); + + // Создаем и отправляем задачи для каждой секции + try { + int taskCounter = 0; + for (int sectionIndex = 0; sectionIndex < sections.size(); sectionIndex++) { + String section = sections.get(sectionIndex); + // System.out.println("[Producer] Processing section " + sectionIndex + " (length: " + section.length() + " chars)"); + + // Отправляем задачи всех типов для каждой секции + for (int taskTypeIndex = 0; taskTypeIndex < TASK_TYPES.length; taskTypeIndex++) { + String taskType = TASK_TYPES[taskTypeIndex]; + String taskTopic = TASK_TYPES[taskTypeIndex]; + + TaskMessage task; + if ("TOP_N".equals(taskType)) { + task = new TaskMessage(taskType, section, sectionIndex, topN); + } else { + task = new TaskMessage(taskType, section, sectionIndex); + } + + byte[] messageBytes = MessageSerializer.serialize(task); + // Use taskId as key for partition distribution + String key = task.getTaskId(); + taskCounter++; + // System.out.println("[Producer] Sending task #" + (taskCounter) + ": " + taskType + + // " for section " + sectionIndex + " (message size: " + messageBytes.length + " bytes)"); + super.produce(taskTopic, key, messageBytes); + } + } + + // Ждем завершения всех отправок + super.producer.flush(); + System.out.println("[Producer] All tasks sent to Kafka topics (" + (sections.size() * TASK_TYPES.length) + " tasks total)"); + + } catch (Exception e) { + throw new IOException("Error while sending messages to Kafka: " + e.getMessage(), e); + } + + return sections.size(); + } + + /** + * Подсчитывает количество секций в файле без разбиения + */ + public int countSections() throws IOException { + Path file; + + try { + file = Paths.get(this.filename); + if (!Files.isRegularFile(file)) { + throw new FileNotFoundException(); + } + } catch (InvalidPathException e) { + throw new IOException("Failed to open file " + filename + "\n" + e.getMessage()); + } catch (FileNotFoundException e) { + throw new IOException("File " + filename + " is not present or a directory"); + } + + // Читаем весь файл + String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8); + + // Разбиваем на секции по параграфам (двойной перенос строки) или предложениям + List sections = splitIntoSections(fileContent); + + return sections.size(); + } + + /** + * Разбивает текст на секции по параграфам + */ + private List splitIntoSections(String text) { + List sections = new ArrayList<>(); + + // Разбиваем по двойному переносу строки (параграфы) + String[] paragraphs = text.split("\\n\\s*\\n+"); + + // Если получилось слишком мало параграфов, разбиваем по предложениям + if (paragraphs.length < 2) { + paragraphs = text.split("[.!?]+\\s+"); + } + + // Объединяем очень короткие секции + StringBuilder currentSection = new StringBuilder(); + int minSectionLength = 100; // минимальная длина секции + + for (String paragraph : paragraphs) { + String trimmed = paragraph.trim(); + if (trimmed.isEmpty()) { + continue; + } + + if (currentSection.length() > 0 && + currentSection.length() + trimmed.length() < minSectionLength) { + // Добавляем к текущей секции + currentSection.append(" ").append(trimmed); + } else { + // Сохраняем предыдущую секцию и начинаем новую + if (currentSection.length() > 0) { + sections.add(currentSection.toString()); + } + currentSection = new StringBuilder(trimmed); + } + } + + // Добавляем последнюю секцию + if (currentSection.length() > 0) { + sections.add(currentSection.toString()); + } + + // Если всё ещё пусто, используем весь текст как одну секцию + if (sections.isEmpty()) { + sections.add(text.trim()); + } + + return sections; + } +} diff --git a/src/main/java/org/itmo/KafkaAggregator.java b/src/main/java/org/itmo/KafkaAggregator.java new file mode 100644 index 0000000..04dc038 --- /dev/null +++ b/src/main/java/org/itmo/KafkaAggregator.java @@ -0,0 +1,196 @@ +package org.itmo; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class KafkaAggregator { + private final String resultTopic; + private final int expectedSections; + + private final Map wordCounts = new ConcurrentHashMap<>(); + private final Map> topWordsMap = new ConcurrentHashMap<>(); + private final List sentimentScores = Collections.synchronizedList(new ArrayList<>()); + private final Map processedTexts = new ConcurrentHashMap<>(); + private final Map> sortedSentencesMap = new ConcurrentHashMap<>(); + + private final Set completedTasks = ConcurrentHashMap.newKeySet(); + private final CountDownLatch completionLatch; + private final AtomicBoolean running = new AtomicBoolean(true); + private KafkaConsumer sharedConsumer; + + public KafkaAggregator(String resultTopic, int expectedSections) { + this.resultTopic = resultTopic; + this.expectedSections = expectedSections; + + // Initialize latch for waiting all results + // Each task type should be executed for each section + this.completionLatch = new CountDownLatch(expectedSections * FileProducer.TASK_TYPES.length); + } + + public void start(KafkaConsumer sharedConsumer) throws InterruptedException { + this.sharedConsumer = sharedConsumer; + + while (running.get()) { + ConsumerRecords records = sharedConsumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + try { + byte[] messageBody = record.value(); + ResultMessage result = MessageSerializer.deserializeResult(messageBody); + String taskKey = result.getTaskType() + "_" + result.getSectionIndex(); + synchronized (this) { + if (completedTasks.contains(taskKey)) { + System.out.println("[Aggregator] Duplicate result ignored: " + taskKey); + sharedConsumer.commitSync(); + continue; + } + completedTasks.add(taskKey); + aggregateResult(result); + System.out.println("[Aggregator] Aggregated result: " + result.getTaskType() + + " (section " + result.getSectionIndex() + + ", total completed: " + completedTasks.size() + ")"); + completionLatch.countDown(); + if (completionLatch.getCount() == 0) { + sharedConsumer.commitSync(); + return; + } + } + sharedConsumer.commitSync(); + } catch (Exception e) { + System.err.println("[Aggregator] Error processing result: " + e.getMessage()); + e.printStackTrace(); + } + } + if (completionLatch.getCount() == 0) { + break; + } + } + } + + public void shutdown() { + running.set(false); + if (sharedConsumer != null) { + sharedConsumer.wakeup(); + } + } + + private synchronized void aggregateResult(ResultMessage result) { + String taskType = result.getTaskType(); + switch (taskType) { + case "WORD_COUNT": + if (result.getWordCount() != null) { + wordCounts.merge(taskType, result.getWordCount(), Long::sum); + } + break; + + case "TOP_N": + if (result.getTopWords() != null) { + Map aggregated = topWordsMap.computeIfAbsent( + taskType, k -> new HashMap<>() + ); + result.getTopWords().forEach((word, count) -> + aggregated.merge(word, count, Integer::sum) + ); + } + break; + + case "SENTIMENT": + if (result.getSentimentScore() != null) { + sentimentScores.add(result.getSentimentScore()); + } + break; + + case "NAME_REPLACE": + if (result.getProcessedText() != null) { + processedTexts.put(result.getSectionIndex(), result.getProcessedText()); + } + break; + + case "SORT_SENTENCES": + if (result.getSortedSentences() != null) { + sortedSentencesMap.put(result.getSectionIndex(), result.getSortedSentences()); + } + break; + } + } + + public AggregatedResults getResults() { + return new AggregatedResults( + wordCounts.getOrDefault("WORD_COUNT", 0L), + getTopNWordsFromMap(topWordsMap.getOrDefault("TOP_N", new HashMap<>()), 10), + calculateAverageSentiment(), + combineProcessedTexts(), + combineSortedSentences() + ); + } + + private Map getTopNWordsFromMap(Map wordMap, int n) { + return wordMap.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .limit(n) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + } + + private double calculateAverageSentiment() { + if (sentimentScores.isEmpty()) { + return 0.0; + } + return sentimentScores.stream() + .mapToDouble(Double::doubleValue) + .average() + .orElse(0.0); + } + + private String combineProcessedTexts() { + return processedTexts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(Map.Entry::getValue) + .collect(Collectors.joining("\n\n")); + } + + private List combineSortedSentences() { + return sortedSentencesMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .flatMap(entry -> entry.getValue().stream()) + .sorted(Comparator.comparingInt(String::length)) + .collect(Collectors.toList()); + } + + public static class AggregatedResults { + private final long totalWordCount; + private final Map topWords; + private final double averageSentiment; + private final String processedText; + private final List sortedSentences; + + public AggregatedResults(long totalWordCount, Map topWords, + double averageSentiment, String processedText, + List sortedSentences) { + this.totalWordCount = totalWordCount; + this.topWords = topWords; + this.averageSentiment = averageSentiment; + this.processedText = processedText; + this.sortedSentences = sortedSentences; + } + + public long getTotalWordCount() { return totalWordCount; } + public Map getTopWords() { return topWords; } + public double getAverageSentiment() { return averageSentiment; } + public String getProcessedText() { return processedText; } + public List getSortedSentences() { return sortedSentences; } + } +} + diff --git a/src/main/java/org/itmo/KafkaConsumerWorker.java b/src/main/java/org/itmo/KafkaConsumerWorker.java new file mode 100644 index 0000000..bf43289 --- /dev/null +++ b/src/main/java/org/itmo/KafkaConsumerWorker.java @@ -0,0 +1,180 @@ +package org.itmo; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class KafkaConsumerWorker { + private final String bootstrapServers; + private final String taskTopic; + private final String resultTopic; + private final String workerId; + private final String consumerGroupId; + private final CountDownLatch readyLatch; + private final KafkaProducer sharedProducer; + private KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + private static final int TOP_N_DEFAULT = 10; + + public KafkaConsumerWorker(String bootstrapServers, String taskTopic, String resultTopic, + String workerId, String consumerGroupId, + KafkaProducer sharedProducer, + CountDownLatch readyLatch) { + this.bootstrapServers = bootstrapServers; + this.taskTopic = taskTopic; + this.resultTopic = resultTopic; + this.workerId = workerId; + this.consumerGroupId = consumerGroupId; + this.sharedProducer = sharedProducer; + this.readyLatch = readyLatch; + } + + public void start() { + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); + consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); + + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singletonList(taskTopic)); + + System.out.println("[" + workerId + "] Worker started. Subscribed to topic: " + taskTopic); + + if (readyLatch != null) { + readyLatch.countDown(); + } + + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + + for (ConsumerRecord record : records) { + long taskStartTime = System.currentTimeMillis(); + try { + System.out.println("[" + workerId + "] Received record from topic: " + record.topic() + + " partition: " + record.partition() + + " offset: " + record.offset() + + " (size: " + record.value().length + " bytes)"); + + byte[] messageBody = record.value(); + TaskMessage task = MessageSerializer.deserializeTask(messageBody); + String text = task.getTextSection(); + ResultMessage result = processTask(task); + long processingTime = System.currentTimeMillis() - taskStartTime; + byte[] resultBytes = MessageSerializer.serialize(result); + System.out.println("[" + workerId + "] Publishing result to topic: " + resultTopic); + ProducerRecord resultRecord = new ProducerRecord<>( + resultTopic, + result.getTaskId(), + resultBytes + ); + sharedProducer.send(resultRecord); + sharedProducer.flush(); + consumer.commitSync(); + System.out.println("[" + workerId + "] Task completed: " + task.getTaskType() + + " (section " + task.getSectionIndex() + ") in " + processingTime + "ms"); + } catch (IllegalArgumentException e) { + System.err.println("[" + workerId + "] Error message: " + e.getMessage()); + e.printStackTrace(); + System.err.flush(); + } + } + } + System.out.println("[" + workerId + "] Has been interrupted "); + } catch (org.apache.kafka.common.errors.WakeupException e) { + System.out.println("[" + workerId + "] Consumer wakeup requested"); + } catch (Exception e) { + System.err.println("[" + workerId + "] Error message: " + e.getMessage()); + e.printStackTrace(); + } finally { + try { + if (consumer != null) { + consumer.close(); + } + } catch (Exception e) { + System.err.println("[" + workerId + "] Error closing consumer: " + e.getMessage()); + } + } + } + + public void shutdown() { + running.set(false); + if (consumer != null) { + consumer.wakeup(); + } + } + + private ResultMessage processTask(TaskMessage task) throws IllegalArgumentException { + ResultMessage result = new ResultMessage( + task.getTaskId(), + task.getTaskType(), + task.getSectionIndex() + ); + + String text = task.getTextSection(); + if (text == null) { + text = ""; + } + int textLength = text.length(); + + try { + switch (task.getTaskType()) { + case "WORD_COUNT": + long wordCount = TextProcessor.countWords(text); + result.setWordCount(wordCount); + System.out.println("[" + workerId + "] WORD_COUNT result: " + wordCount + " words"); + break; + + case "TOP_N": + int topN = task.getTopN() > 0 ? task.getTopN() : TOP_N_DEFAULT; + Map topWords = TextProcessor.findTopNWords(text, topN); + result.setTopWords(topWords); + System.out.println("[" + workerId + "] TOP_N result: " + topWords.size() + " unique words"); + break; + + case "SENTIMENT": + double sentiment = TextProcessor.analyzeSentiment(text); + result.setSentimentScore(sentiment); + System.out.println("[" + workerId + "] SENTIMENT result: " + String.format("%.4f", sentiment)); + break; + + case "NAME_REPLACE": + String processedText = TextProcessor.replaceNames(text, "Parallels"); + result.setProcessedText(processedText); + System.out.println("[" + workerId + "] NAME_REPLACE result: processed text length " + processedText.length() + " chars"); + break; + + case "SORT_SENTENCES": + List sortedSentences = TextProcessor.sortSentencesByLength(text); + result.setSortedSentences(sortedSentences); + System.out.println("[" + workerId + "] SORT_SENTENCES result: " + sortedSentences.size() + " sentences"); + break; + + default: + throw new IllegalArgumentException("Unknown task type: " + task.getTaskType()); + } + } catch (IllegalArgumentException e) { + e.printStackTrace(); + throw e; + } + + return result; + } +} + diff --git a/src/main/java/org/itmo/KafkaProducerBase.java b/src/main/java/org/itmo/KafkaProducerBase.java new file mode 100644 index 0000000..4b108ff --- /dev/null +++ b/src/main/java/org/itmo/KafkaProducerBase.java @@ -0,0 +1,33 @@ +package org.itmo; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +public abstract class KafkaProducerBase { + protected final KafkaProducer producer; + + public KafkaProducerBase(KafkaProducer sharedProducer) { + this.producer = sharedProducer; + } + + public void produce(String topic, String key, byte[] message) { + ProducerRecord record = new ProducerRecord<>(topic, key, message); + try { + producer.send(record, (metadata, exception) -> { + if (exception != null) { + System.err.println("[KafkaProducer] Error sending message to topic " + topic + ": " + exception.getMessage()); + exception.printStackTrace(); + } else { + System.out.println("[KafkaProducer] Published message to topic: " + topic + + " partition: " + metadata.partition() + + " offset: " + metadata.offset() + + " (size: " + message.length + " bytes)"); + } + }); + } catch (Exception e) { + System.err.println("[KafkaProducer] Exception while sending: " + e.getMessage()); + e.printStackTrace(); + } + } +} + diff --git a/src/main/java/org/itmo/KafkaSharedResources.java b/src/main/java/org/itmo/KafkaSharedResources.java new file mode 100644 index 0000000..48196ae --- /dev/null +++ b/src/main/java/org/itmo/KafkaSharedResources.java @@ -0,0 +1,33 @@ +package org.itmo; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +public class KafkaSharedResources { + private final KafkaProducer sharedProducer; + + public KafkaSharedResources(String bootstrapServers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 3); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + + this.sharedProducer = new KafkaProducer<>(props); + } + + public KafkaProducer getSharedProducer() { + return sharedProducer; + } + + public void close() { + sharedProducer.close(); + } +} + diff --git a/src/main/java/org/itmo/Main.java b/src/main/java/org/itmo/Main.java new file mode 100644 index 0000000..cb9767e --- /dev/null +++ b/src/main/java/org/itmo/Main.java @@ -0,0 +1,180 @@ +package org.itmo; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class Main { + private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String RESULT_TOPIC = "result-topic"; + private static final int NUM_WORKERS = 3; + private static final String INPUT_FILE = "./text/warandpeace.txt"; + private static final String OUTPUT_DIR = "./results"; + + public static void main(String[] args) { + String inputFile = args.length >= 1 ? args[0] : INPUT_FILE; + int numWorkers = NUM_WORKERS; + // dummy to load class + TextProcessor.countWords(args[0]); + + if (args.length >= 2) { + try { + numWorkers = Integer.parseInt(args[1]); + } catch (NumberFormatException e) { + System.err.println("Invalid value for number of workers, using default: " + NUM_WORKERS); + } + } + try { + runFullPipeline(inputFile, numWorkers); + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } + + private static void runFullPipeline(String inputFile, int numWorkers) throws Exception { + + KafkaSharedResources sharedResources = new KafkaSharedResources(KAFKA_BOOTSTRAP_SERVERS); + KafkaProducer sharedProducer = sharedResources.getSharedProducer(); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "aggregator-group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + KafkaConsumer sharedAggregatorConsumer = new KafkaConsumer<>(consumerProps); + sharedAggregatorConsumer.subscribe(Collections.singletonList(RESULT_TOPIC)); + + FileProducer tempProducer = new FileProducer(sharedProducer, inputFile, 10); + int sectionsCount = tempProducer.countSections(); + System.out.println("[Setup] File contains " + sectionsCount + " sections\n"); + + KafkaAggregator aggregator = new KafkaAggregator( + RESULT_TOPIC, + sectionsCount + ); + + final KafkaAggregator.AggregatedResults[] aggregatorResults = new KafkaAggregator.AggregatedResults[1]; + final Exception aggregatorException[] = new Exception[1]; + + Thread aggregatorThread = new Thread(() -> { + try { + System.out.println("[Aggregator] Started. Waiting for results"); + aggregator.start(sharedAggregatorConsumer); + aggregatorResults[0] = aggregator.getResults(); + } catch (Exception e) { + System.err.println("[Aggregator] Error: " + e.getMessage()); + e.printStackTrace(); + aggregatorException[0] = e; + } + }); + aggregatorThread.start(); + + Thread.sleep(500); + + int totalWorkers = numWorkers * FileProducer.TASK_TYPES.length; + CountDownLatch workersReadyLatch = new CountDownLatch(totalWorkers); + List workers = new ArrayList<>(); + List workerThreads = new ArrayList<>(); + + for (String taskTopic : FileProducer.TASK_TYPES) { + for (int i = 0; i < numWorkers; i++) { + final String topic = taskTopic; + final int workerNum = i + 1; + final String workerId = "Worker-" + topic + "-" + workerNum; + final String consumerGroupId = "worker-group-" + topic; + + KafkaConsumerWorker worker = new KafkaConsumerWorker( + KAFKA_BOOTSTRAP_SERVERS, + topic, + RESULT_TOPIC, + workerId, + consumerGroupId, + sharedProducer, + workersReadyLatch + ); + workers.add(worker); + + Thread workerThread = new Thread(() -> { + try { + worker.start(); + } catch (Exception e) { + System.err.println("[" + workerId + "] Error: " + e.getMessage()); + e.printStackTrace(); + workersReadyLatch.countDown(); + } + }); + workerThreads.add(workerThread); + workerThread.start(); + } + } + + System.out.println("Waiting for all " + totalWorkers + " workers to subscribe to topics\n"); + boolean allReady = workersReadyLatch.await(10, java.util.concurrent.TimeUnit.SECONDS); + if (!allReady) { + System.err.println("Warning: Not all workers subscribed within timeout"); + } else { + System.out.println("All workers successfully subscribed\n"); + } + + Thread producerThread = new Thread(() -> { + try { + System.out.println("[Producer] Reading file and splitting into sections"); + FileProducer producer = new FileProducer(sharedProducer, inputFile, 10); + int actualSectionsCount = producer.prepareTextAndProduce(); + System.out.println("[Producer] Finished. Sent " + actualSectionsCount + " sections."); + } catch (Exception e) { + System.err.println("[Producer] Error: " + e.getMessage()); + e.printStackTrace(); + } + }); + producerThread.start(); + producerThread.join(); + aggregatorThread.join(); + + if (aggregatorException[0] != null) { + throw aggregatorException[0]; + } + + if (aggregatorResults[0] == null) { + throw new RuntimeException("Aggregator failed to get results"); + } + + Thread summarizerThread = new Thread(() -> { + try { + Summarizer.saveResults(aggregatorResults[0], OUTPUT_DIR); + } catch (Exception e) { + System.err.println("[Summarizer] Error: " + e.getMessage()); + e.printStackTrace(); + } + }); + summarizerThread.start(); + summarizerThread.join(); + + for (KafkaConsumerWorker worker : workers) { + worker.shutdown(); + } + aggregator.shutdown(); + Thread.sleep(2000); + for (Thread workerThread : workerThreads) { + try { + workerThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + sharedAggregatorConsumer.close(); + sharedResources.close(); + } +} diff --git a/src/main/java/org/itmo/MessageSerializer.java b/src/main/java/org/itmo/MessageSerializer.java new file mode 100644 index 0000000..89396c5 --- /dev/null +++ b/src/main/java/org/itmo/MessageSerializer.java @@ -0,0 +1,29 @@ +package org.itmo; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class MessageSerializer { + private static final Gson gson = new GsonBuilder().create(); + + public static byte[] serialize(TaskMessage message) { + String json = gson.toJson(message); + return json.getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + + public static TaskMessage deserializeTask(byte[] data) { + String json = new String(data, java.nio.charset.StandardCharsets.UTF_8); + return gson.fromJson(json, TaskMessage.class); + } + + public static byte[] serialize(ResultMessage message) { + String json = gson.toJson(message); + return json.getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + + public static ResultMessage deserializeResult(byte[] data) { + String json = new String(data, java.nio.charset.StandardCharsets.UTF_8); + return gson.fromJson(json, ResultMessage.class); + } +} + diff --git a/src/main/java/org/itmo/ResultMessage.java b/src/main/java/org/itmo/ResultMessage.java new file mode 100644 index 0000000..807ca2b --- /dev/null +++ b/src/main/java/org/itmo/ResultMessage.java @@ -0,0 +1,94 @@ +package org.itmo; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ResultMessage implements Serializable { + private String taskId; + private String taskType; + private int sectionIndex; + + private Long wordCount; + private Map topWords; + private Double sentimentScore; + private String processedText; + private List sortedSentences; + + public ResultMessage() { + this.topWords = new HashMap<>(); + } + + public ResultMessage(String taskId, String taskType, int sectionIndex) { + this(); + this.taskId = taskId; + this.taskType = taskType; + this.sectionIndex = sectionIndex; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; + } + + public int getSectionIndex() { + return sectionIndex; + } + + public void setSectionIndex(int sectionIndex) { + this.sectionIndex = sectionIndex; + } + + public Long getWordCount() { + return wordCount; + } + + public void setWordCount(Long wordCount) { + this.wordCount = wordCount; + } + + public Map getTopWords() { + return topWords; + } + + public void setTopWords(Map topWords) { + this.topWords = topWords; + } + + public Double getSentimentScore() { + return sentimentScore; + } + + public void setSentimentScore(Double sentimentScore) { + this.sentimentScore = sentimentScore; + } + + public String getProcessedText() { + return processedText; + } + + public void setProcessedText(String processedText) { + this.processedText = processedText; + } + + public List getSortedSentences() { + return sortedSentences; + } + + public void setSortedSentences(List sortedSentences) { + this.sortedSentences = sortedSentences; + } +} + diff --git a/src/main/java/org/itmo/Summarizer.java b/src/main/java/org/itmo/Summarizer.java new file mode 100644 index 0000000..ce2f6ce --- /dev/null +++ b/src/main/java/org/itmo/Summarizer.java @@ -0,0 +1,106 @@ +package org.itmo; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +public class Summarizer { + + public static void saveResults(KafkaAggregator.AggregatedResults results, String outputDir) throws IOException { + String timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()); + if (!Files.exists(Paths.get(outputDir))) { + Files.createDirectories(Paths.get(outputDir)); + } + saveJsonReport(results, outputDir + "/report_" + timestamp + ".json"); + saveTextReport(results, outputDir + "/report_" + timestamp + ".txt"); + if (results.getProcessedText() != null && !results.getProcessedText().isEmpty()) { + Files.write( + Paths.get(outputDir + "/processed_text_" + timestamp + ".txt"), + results.getProcessedText().getBytes() + ); + } + + if (results.getSortedSentences() != null && !results.getSortedSentences().isEmpty()) { + String sentencesText = String.join("\n", results.getSortedSentences()); + Files.write( + Paths.get(outputDir + "/sorted_sentences_" + timestamp + ".txt"), + sentencesText.getBytes() + ); + } + + System.out.println("Results saved to directory: " + outputDir); + } + + private static void saveJsonReport(KafkaAggregator.AggregatedResults results, String filename) throws IOException { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + StringBuilder json = new StringBuilder(); + json.append("{\n"); + json.append("\t\"totalWordCount\": ").append(results.getTotalWordCount()).append(",\n"); + json.append("\t\"averageSentiment\": ").append(results.getAverageSentiment()).append(",\n"); + json.append("\t\"topWords\": {\n"); + + boolean first = true; + for (Map.Entry entry : results.getTopWords().entrySet()) { + if (!first){ + json.append(",\n"); + } + json.append("\t\t\"").append(entry.getKey()).append("\": ").append(entry.getValue()); + first = false; + } + + json.append("\n\t},\n"); + json.append("\t\"sortedSentencesCount\": ").append(results.getSortedSentences().size()).append("\n"); + json.append("}\n"); + + try (FileWriter writer = new FileWriter(filename)) { + writer.write(json.toString()); + } + + System.out.println("JSON report saved: " + filename); + } + + private static void saveTextReport(KafkaAggregator.AggregatedResults results, String filename) throws IOException { + StringBuilder report = new StringBuilder(); + + report.append("1. WORD COUNT\n"); + report.append("Total words: ").append(results.getTotalWordCount()).append("\n\n"); + + report.append("2. TOP WORDS\n"); + int rank = 1; + for (Map.Entry entry : results.getTopWords().entrySet()) { + report.append(String.format("%2d. %-30s : %d occurrences\n", + rank++, entry.getKey(), entry.getValue())); + } + report.append("\n"); + + report.append("3. SENTIMENT ANALYSIS\n"); + report.append("Average sentiment score: ").append(String.format("%.4f", results.getAverageSentiment())).append("\n"); + String sentimentLabel = results.getAverageSentiment() > 0.1 ? "POSITIVE" : + results.getAverageSentiment() < -0.1 ? "NEGATIVE" : "NEUTRAL"; + report.append("Sentiment label: ").append(sentimentLabel).append("\n\n"); + + report.append("4. TEXT PROCESSING SUMMARY\n"); + report.append("Processed text length: ").append( + results.getProcessedText() != null ? results.getProcessedText().length() : 0 + ).append(" characters\n"); + report.append("Sorted sentences count: ").append(results.getSortedSentences().size()).append("\n"); + + Files.write(Paths.get(filename), report.toString().getBytes()); + System.out.println("Text report saved: " + filename); + } + + private static String repeatString(String str, int count) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < count; i++) { + sb.append(str); + } + return sb.toString(); + } +} diff --git a/src/main/java/org/itmo/TaskMessage.java b/src/main/java/org/itmo/TaskMessage.java new file mode 100644 index 0000000..c4fd764 --- /dev/null +++ b/src/main/java/org/itmo/TaskMessage.java @@ -0,0 +1,69 @@ +package org.itmo; + +import java.io.Serializable; +import java.util.UUID; + +public class TaskMessage implements Serializable { + private String taskId; + private String taskType; + private String textSection; + private int sectionIndex; + private int topN; + + public TaskMessage() { + this.taskId = UUID.randomUUID().toString(); + } + + public TaskMessage(String taskType, String textSection, int sectionIndex) { + this(); + this.taskType = taskType; + this.textSection = textSection; + this.sectionIndex = sectionIndex; + } + + public TaskMessage(String taskType, String textSection, int sectionIndex, int topN) { + this(taskType, textSection, sectionIndex); + this.topN = topN; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; + } + + public String getTextSection() { + return textSection; + } + + public void setTextSection(String textSection) { + this.textSection = textSection; + } + + public int getSectionIndex() { + return sectionIndex; + } + + public void setSectionIndex(int sectionIndex) { + this.sectionIndex = sectionIndex; + } + + public int getTopN() { + return topN; + } + + public void setTopN(int topN) { + this.topN = topN; + } +} + diff --git a/src/main/java/org/itmo/TextProcessor.java b/src/main/java/org/itmo/TextProcessor.java new file mode 100644 index 0000000..206fe27 --- /dev/null +++ b/src/main/java/org/itmo/TextProcessor.java @@ -0,0 +1,119 @@ +package org.itmo; + +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class TextProcessor { + private static final Set POSITIVE_WORDS = new HashSet<>(Arrays.asList( + "хорошо", "отлично", "прекрасно", "замечательно", "радость", "счастье", + "любовь", "дружба", "успех", "победа", "праздник", "красота", "мир", + "good", "great", "excellent", "wonderful", "beautiful", "happy", "joy", + "love", "success", "victory", "peace", "friend", "kind", "nice" + )); + + private static final Set NEGATIVE_WORDS = new HashSet<>(Arrays.asList( + "плохо", "ужасно", "грустно", "горе", "боль", "страх", "война", "зло", + "ненависть", "поражение", "беда", "проблема", "трудность", "страдание", + "bad", "terrible", "awful", "sad", "pain", "fear", "war", "evil", + "hate", "defeat", "trouble", "problem", "difficulty", "suffering" + )); + + private static final Pattern NAME_PATTERN = Pattern.compile( + "\\b[A-Z][a-z]+\\b" + ); + + public static long countWords(String text) { + if (text == null || text.trim().isEmpty()) { + return 0; + } + String[] words = text.trim().split("\\s+"); + return Arrays.stream(words) + .filter(word -> !word.isEmpty()) + .count(); + } + + public static Map findTopNWords(String text, int n) { + if (text == null || text.trim().isEmpty()) { + return new HashMap<>(); + } + String normalized = text.toLowerCase() + .replaceAll("[^\\p{L}\\p{N}\\s]", " ") + .trim(); + String[] words = normalized.split("\\s+"); + Map wordCount = new HashMap<>(); + for (String word : words) { + word = word.trim(); + if (!word.isEmpty() && word.length() > 2) { + wordCount.put(word, wordCount.getOrDefault(word, 0) + 1); + } + } + return wordCount.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .limit(n) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + } + + public static double analyzeSentiment(String text) { + if (text == null || text.trim().isEmpty()) { + return 0.0; + } + String normalized = text.toLowerCase() + .replaceAll("[^\\p{L}\\s]", " ") + .trim(); + String[] words = normalized.split("\\s+"); + + int positiveCount = 0; + int negativeCount = 0; + int totalWords = 0; + + for (String word : words) { + word = word.trim(); + if (!word.isEmpty()) { + totalWords++; + if (POSITIVE_WORDS.contains(word)) { + positiveCount++; + } else if (NEGATIVE_WORDS.contains(word)) { + negativeCount++; + } + } + } + if (totalWords == 0) { + return 0.0; + } + + return (double)(positiveCount - negativeCount) / Math.max(totalWords, 1); + } + + public static String replaceNames(String text, String replacement) { + if (text == null || text.trim().isEmpty()) { + return text; + } + + Matcher matcher = NAME_PATTERN.matcher(text); + String result = matcher.replaceAll(replacement); + + return result; + } + + public static List sortSentencesByLength(String text) { + if (text == null || text.trim().isEmpty()) { + return new ArrayList<>(); + } + + String[] sentences = text.split("[.!?]+\\s*"); + List sentenceList = Arrays.stream(sentences) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + sentenceList.sort(Comparator.comparingInt(String::length)); + return sentenceList; + } +} + diff --git a/text b/text new file mode 160000 index 0000000..55f9027 --- /dev/null +++ b/text @@ -0,0 +1 @@ +Subproject commit 55f9027799b5b3c67e2f7cb3d6a7154f707ff08a