Skip to content

Commit 81fcca0

Browse files
Artemiy Romanovartemiyjjj
authored andcommitted
solution
1 parent c5c9fa2 commit 81fcca0

18 files changed

+1283
-10
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.gradle
2+
.vscode
3+
bin
4+
build

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "text"]
2+
path = text
3+
url = git@gist.github.com:c95f3a864828f7f034b7a33d1676e420.git

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO)
22
# Распределенная обработка текстовых данных с использованием брокера сообщений
33

4+
## Использование
5+
6+
1. `git submodule update`
7+
8+
1. `docker compose up`
9+
10+
<!-- 1. `gradle run` -->
11+
412
## Цель задания:
513
Реализовать распределённую систему обработки текстовых данных, где секции текста рассылаются на обработку через брокер сообщений (message broker). Несколько воркеров параллельно обрабатывают секции и отправляют результаты на агрегатор. Воркеры выполняют набор задач
614

build.gradle.kts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
plugins {
22
kotlin("jvm") version "1.9.20"
3+
java
34
application
45
}
56

@@ -11,8 +12,8 @@ repositories {
1112
}
1213

1314
dependencies {
14-
implementation("javax.jms:jms-api:2.0.1")
15-
implementation("org.apache.activemq:activemq-broker:6.1.1")
15+
implementation("org.apache.kafka:kafka-clients:3.6.1")
16+
implementation("com.google.code.gson:gson:2.10.1")
1617
testImplementation(kotlin("test"))
1718
}
1819

@@ -21,9 +22,13 @@ tasks.test {
2122
}
2223

2324
kotlin {
24-
jvmToolchain(8)
25+
jvmToolchain(14)
2526
}
2627

2728
application {
28-
mainClass.set("MainKt")
29-
}
29+
mainClass.set("org.itmo.Main")
30+
applicationDefaultJvmArgs = listOf(
31+
"-Dfile.encoding=UTF-8",
32+
"-Dconsole.encoding=UTF-8"
33+
)
34+
}

docker-compose.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
services:
2+
3+
broker:
4+
image: apache/kafka:latest
5+
container_name: broker
6+
hostname: broker
7+
ports:
8+
- "9092:9092"
9+
environment:
10+
KAFKA_NODE_ID: 1
11+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
12+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092'
13+
KAFKA_PROCESS_ROLES: 'broker,controller'
14+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
15+
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
16+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
17+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
18+
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
19+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
20+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
21+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
22+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
23+
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
24+
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1
25+
26+
kafka-ui:
27+
container_name: kafka-ui
28+
image: provectuslabs/kafka-ui:latest
29+
ports:
30+
- 8080:8080
31+
environment:
32+
DYNAMIC_CONFIG_ENABLED: true

src/main/java/Main.java

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.itmo;
2+
3+
import org.apache.kafka.clients.producer.KafkaProducer;
4+
5+
import java.io.FileNotFoundException;
6+
import java.io.IOException;
7+
import java.nio.charset.StandardCharsets;
8+
import java.nio.file.Files;
9+
import java.nio.file.InvalidPathException;
10+
import java.nio.file.Path;
11+
import java.nio.file.Paths;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
public class FileProducer extends KafkaProducerBase {
16+
private final String filename;
17+
private final int topN;
18+
public static final String[] TASK_TYPES = {
19+
"WORD_COUNT",
20+
"TOP_N",
21+
"SENTIMENT",
22+
"NAME_REPLACE",
23+
"SORT_SENTENCES"
24+
};
25+
// public static final String[] TASK_TOPICS = {
26+
// "task-topic-word-count",
27+
// "task-topic-top-n",
28+
// "task-topic-sentiment",
29+
// "task-topic-name-replace",
30+
// "task-topic-sort-sentences"
31+
// };
32+
33+
public FileProducer(KafkaProducer<String, byte[]> sharedProducer, String filename) {
34+
super(sharedProducer);
35+
this.filename = filename;
36+
this.topN = 10;
37+
}
38+
39+
public FileProducer(KafkaProducer<String, byte[]> sharedProducer, String filename, int topN) {
40+
super(sharedProducer);
41+
this.filename = filename;
42+
this.topN = topN;
43+
}
44+
45+
/**
46+
* Разбивает файл на секции (по параграфам) и отправляет задачи в Kafka topics
47+
* @return количество созданных секций
48+
*/
49+
public int prepareTextAndProduce() throws IOException {
50+
Path file;
51+
52+
try {
53+
file = Paths.get(this.filename);
54+
if (!Files.isRegularFile(file)) {
55+
throw new FileNotFoundException();
56+
}
57+
} catch (InvalidPathException e) {
58+
throw new IOException("Failed to open file " + filename + "\n" + e.getMessage());
59+
} catch (FileNotFoundException e) {
60+
throw new IOException("File " + filename + " is not present or a directory");
61+
}
62+
63+
// Читаем весь файл
64+
String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);
65+
66+
// Разбиваем на секции по параграфам (двойной перенос строки) или предложениям
67+
List<String> sections = splitIntoSections(fileContent);
68+
69+
System.out.println("[Producer] Split file into " + sections.size() + " sections");
70+
71+
// Создаем и отправляем задачи для каждой секции
72+
try {
73+
int taskCounter = 0;
74+
for (int sectionIndex = 0; sectionIndex < sections.size(); sectionIndex++) {
75+
String section = sections.get(sectionIndex);
76+
// System.out.println("[Producer] Processing section " + sectionIndex + " (length: " + section.length() + " chars)");
77+
78+
// Отправляем задачи всех типов для каждой секции
79+
for (int taskTypeIndex = 0; taskTypeIndex < TASK_TYPES.length; taskTypeIndex++) {
80+
String taskType = TASK_TYPES[taskTypeIndex];
81+
String taskTopic = TASK_TYPES[taskTypeIndex];
82+
83+
TaskMessage task;
84+
if ("TOP_N".equals(taskType)) {
85+
task = new TaskMessage(taskType, section, sectionIndex, topN);
86+
} else {
87+
task = new TaskMessage(taskType, section, sectionIndex);
88+
}
89+
90+
byte[] messageBytes = MessageSerializer.serialize(task);
91+
// Use taskId as key for partition distribution
92+
String key = task.getTaskId();
93+
taskCounter++;
94+
// System.out.println("[Producer] Sending task #" + (taskCounter) + ": " + taskType +
95+
// " for section " + sectionIndex + " (message size: " + messageBytes.length + " bytes)");
96+
super.produce(taskTopic, key, messageBytes);
97+
}
98+
}
99+
100+
// Ждем завершения всех отправок
101+
super.producer.flush();
102+
System.out.println("[Producer] All tasks sent to Kafka topics (" + (sections.size() * TASK_TYPES.length) + " tasks total)");
103+
104+
} catch (Exception e) {
105+
throw new IOException("Error while sending messages to Kafka: " + e.getMessage(), e);
106+
}
107+
108+
return sections.size();
109+
}
110+
111+
/**
112+
* Подсчитывает количество секций в файле без разбиения
113+
*/
114+
public int countSections() throws IOException {
115+
Path file;
116+
117+
try {
118+
file = Paths.get(this.filename);
119+
if (!Files.isRegularFile(file)) {
120+
throw new FileNotFoundException();
121+
}
122+
} catch (InvalidPathException e) {
123+
throw new IOException("Failed to open file " + filename + "\n" + e.getMessage());
124+
} catch (FileNotFoundException e) {
125+
throw new IOException("File " + filename + " is not present or a directory");
126+
}
127+
128+
// Читаем весь файл
129+
String fileContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);
130+
131+
// Разбиваем на секции по параграфам (двойной перенос строки) или предложениям
132+
List<String> sections = splitIntoSections(fileContent);
133+
134+
return sections.size();
135+
}
136+
137+
/**
138+
* Разбивает текст на секции по параграфам
139+
*/
140+
private List<String> splitIntoSections(String text) {
141+
List<String> sections = new ArrayList<>();
142+
143+
// Разбиваем по двойному переносу строки (параграфы)
144+
String[] paragraphs = text.split("\\n\\s*\\n+");
145+
146+
// Если получилось слишком мало параграфов, разбиваем по предложениям
147+
if (paragraphs.length < 2) {
148+
paragraphs = text.split("[.!?]+\\s+");
149+
}
150+
151+
// Объединяем очень короткие секции
152+
StringBuilder currentSection = new StringBuilder();
153+
int minSectionLength = 100; // минимальная длина секции
154+
155+
for (String paragraph : paragraphs) {
156+
String trimmed = paragraph.trim();
157+
if (trimmed.isEmpty()) {
158+
continue;
159+
}
160+
161+
if (currentSection.length() > 0 &&
162+
currentSection.length() + trimmed.length() < minSectionLength) {
163+
// Добавляем к текущей секции
164+
currentSection.append(" ").append(trimmed);
165+
} else {
166+
// Сохраняем предыдущую секцию и начинаем новую
167+
if (currentSection.length() > 0) {
168+
sections.add(currentSection.toString());
169+
}
170+
currentSection = new StringBuilder(trimmed);
171+
}
172+
}
173+
174+
// Добавляем последнюю секцию
175+
if (currentSection.length() > 0) {
176+
sections.add(currentSection.toString());
177+
}
178+
179+
// Если всё ещё пусто, используем весь текст как одну секцию
180+
if (sections.isEmpty()) {
181+
sections.add(text.trim());
182+
}
183+
184+
return sections;
185+
}
186+
}

0 commit comments

Comments
 (0)