Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.gradle
.vscode
bin
build
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "text"]
path = text
url = git@gist.github.com:c95f3a864828f7f034b7a33d1676e420.git
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO)
# Распределенная обработка текстовых данных с использованием брокера сообщений

## Использование

1. `git submodule update`

1. `docker compose up`

<!-- 1. `gradle run` -->

## Цель задания:
Реализовать распределённую систему обработки текстовых данных, где секции текста рассылаются на обработку через брокер сообщений (message broker). Несколько воркеров параллельно обрабатывают секции и отправляют результаты на агрегатор. Воркеры выполняют набор задач

Expand Down
15 changes: 10 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
kotlin("jvm") version "1.9.20"
java
application
}

Expand All @@ -11,8 +12,8 @@ repositories {
}

dependencies {
implementation("javax.jms:jms-api:2.0.1")
implementation("org.apache.activemq:activemq-broker:6.1.1")
implementation("org.apache.kafka:kafka-clients:3.6.1")
implementation("com.google.code.gson:gson:2.10.1")
testImplementation(kotlin("test"))
}

Expand All @@ -21,9 +22,13 @@ tasks.test {
}

kotlin {
jvmToolchain(8)
jvmToolchain(14)
}

application {
mainClass.set("MainKt")
}
mainClass.set("org.itmo.Main")
applicationDefaultJvmArgs = listOf(
"-Dfile.encoding=UTF-8",
"-Dconsole.encoding=UTF-8"
)
}
32 changes: 32 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
services:

broker:
image: apache/kafka:latest
container_name: broker
hostname: broker
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
5 changes: 0 additions & 5 deletions src/main/java/Main.java

This file was deleted.

186 changes: 186 additions & 0 deletions src/main/java/org/itmo/FileProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.itmo;

import org.apache.kafka.clients.producer.KafkaProducer;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

public class FileProducer extends KafkaProducerBase {
private final String filename;
private final int topN;
public static final String[] TASK_TYPES = {
"WORD_COUNT",
"TOP_N",
"SENTIMENT",
"NAME_REPLACE",
"SORT_SENTENCES"
};
// public static final String[] TASK_TOPICS = {
// "task-topic-word-count",
// "task-topic-top-n",
// "task-topic-sentiment",
// "task-topic-name-replace",
// "task-topic-sort-sentences"
// };

public FileProducer(KafkaProducer<String, byte[]> sharedProducer, String filename) {
super(sharedProducer);
this.filename = filename;
this.topN = 10;
}

public FileProducer(KafkaProducer<String, byte[]> sharedProducer, String filename, int topN) {
super(sharedProducer);
this.filename = filename;
this.topN = topN;
}

/**
* Разбивает файл на секции (по параграфам) и отправляет задачи в Kafka topics
* @return количество созданных секций
*/
public int prepareTextAndProduce() throws IOException {
Path file;

try {
file = Paths.get(this.filename);
if (!Files.isRegularFile(file)) {
throw new FileNotFoundException();
}
} catch (InvalidPathException e) {
throw new IOException("Failed to open file " + filename + "\n" + e.getMessage());
} catch (FileNotFoundException e) {
throw new IOException("File " + filename + " is not present or a directory");
}

// Читаем весь файл
String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);

// Разбиваем на секции по параграфам (двойной перенос строки) или предложениям
List<String> sections = splitIntoSections(fileContent);

System.out.println("[Producer] Split file into " + sections.size() + " sections");

// Создаем и отправляем задачи для каждой секции
try {
int taskCounter = 0;
for (int sectionIndex = 0; sectionIndex < sections.size(); sectionIndex++) {
String section = sections.get(sectionIndex);
// System.out.println("[Producer] Processing section " + sectionIndex + " (length: " + section.length() + " chars)");

// Отправляем задачи всех типов для каждой секции
for (int taskTypeIndex = 0; taskTypeIndex < TASK_TYPES.length; taskTypeIndex++) {
String taskType = TASK_TYPES[taskTypeIndex];
String taskTopic = TASK_TYPES[taskTypeIndex];

TaskMessage task;
if ("TOP_N".equals(taskType)) {
task = new TaskMessage(taskType, section, sectionIndex, topN);
} else {
task = new TaskMessage(taskType, section, sectionIndex);
}

byte[] messageBytes = MessageSerializer.serialize(task);
// Use taskId as key for partition distribution
String key = task.getTaskId();
taskCounter++;
// System.out.println("[Producer] Sending task #" + (taskCounter) + ": " + taskType +
// " for section " + sectionIndex + " (message size: " + messageBytes.length + " bytes)");
super.produce(taskTopic, key, messageBytes);
}
}

// Ждем завершения всех отправок
super.producer.flush();
System.out.println("[Producer] All tasks sent to Kafka topics (" + (sections.size() * TASK_TYPES.length) + " tasks total)");

} catch (Exception e) {
throw new IOException("Error while sending messages to Kafka: " + e.getMessage(), e);
}

return sections.size();
}

/**
* Подсчитывает количество секций в файле без разбиения
*/
public int countSections() throws IOException {
Path file;

try {
file = Paths.get(this.filename);
if (!Files.isRegularFile(file)) {
throw new FileNotFoundException();
}
} catch (InvalidPathException e) {
throw new IOException("Failed to open file " + filename + "\n" + e.getMessage());
} catch (FileNotFoundException e) {
throw new IOException("File " + filename + " is not present or a directory");
}

// Читаем весь файл
String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);

// Разбиваем на секции по параграфам (двойной перенос строки) или предложениям
List<String> sections = splitIntoSections(fileContent);

return sections.size();
}

/**
* Разбивает текст на секции по параграфам
*/
private List<String> splitIntoSections(String text) {
List<String> sections = new ArrayList<>();

// Разбиваем по двойному переносу строки (параграфы)
String[] paragraphs = text.split("\\n\\s*\\n+");

// Если получилось слишком мало параграфов, разбиваем по предложениям
if (paragraphs.length < 2) {
paragraphs = text.split("[.!?]+\\s+");
}

// Объединяем очень короткие секции
StringBuilder currentSection = new StringBuilder();
int minSectionLength = 100; // минимальная длина секции

for (String paragraph : paragraphs) {
String trimmed = paragraph.trim();
if (trimmed.isEmpty()) {
continue;
}

if (currentSection.length() > 0 &&
currentSection.length() + trimmed.length() < minSectionLength) {
// Добавляем к текущей секции
currentSection.append(" ").append(trimmed);
} else {
// Сохраняем предыдущую секцию и начинаем новую
if (currentSection.length() > 0) {
sections.add(currentSection.toString());
}
currentSection = new StringBuilder(trimmed);
}
}

// Добавляем последнюю секцию
if (currentSection.length() > 0) {
sections.add(currentSection.toString());
}

// Если всё ещё пусто, используем весь текст как одну секцию
if (sections.isEmpty()) {
sections.add(text.trim());
}

return sections;
}
}
Loading