Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
rabbitmq-db/
rabbitmq-logs

.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
Binary file removed .gradle/8.4/checksums/checksums.lock
Binary file not shown.
Binary file removed .gradle/8.4/checksums/md5-checksums.bin
Binary file not shown.
Binary file removed .gradle/8.4/checksums/sha1-checksums.bin
Binary file not shown.
Binary file not shown.
Empty file.
Binary file removed .gradle/8.4/executionHistory/executionHistory.bin
Binary file not shown.
Binary file removed .gradle/8.4/executionHistory/executionHistory.lock
Binary file not shown.
Binary file removed .gradle/8.4/fileChanges/last-build.bin
Binary file not shown.
Binary file removed .gradle/8.4/fileHashes/fileHashes.bin
Binary file not shown.
Binary file removed .gradle/8.4/fileHashes/fileHashes.lock
Binary file not shown.
Binary file removed .gradle/8.4/fileHashes/resourceHashesCache.bin
Binary file not shown.
Empty file removed .gradle/8.4/gc.properties
Empty file.
Binary file removed .gradle/buildOutputCleanup/buildOutputCleanup.lock
Binary file not shown.
2 changes: 0 additions & 2 deletions .gradle/buildOutputCleanup/cache.properties

This file was deleted.

Binary file removed .gradle/buildOutputCleanup/outputFiles.bin
Binary file not shown.
Binary file removed .gradle/file-system.probe
Binary file not shown.
Empty file removed .gradle/vcs-1/gc.properties
Empty file.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO)
# Распределенная обработка текстовых данных с использованием брокера сообщений

## Цель задания:
Expand Down
13 changes: 4 additions & 9 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
kotlin("jvm") version "1.9.20"
// kotlin("jvm") version "1.9.20"
application
}

Expand All @@ -11,19 +11,14 @@ repositories {
}

dependencies {
implementation("javax.jms:jms-api:2.0.1")
implementation("org.apache.activemq:activemq-broker:6.1.1")
implementation("com.rabbitmq:amqp-client:5.20.0")
testImplementation(kotlin("test"))
}

tasks.test {
useJUnitPlatform()
}

kotlin {
jvmToolchain(8)
}

application {
mainClass.set("MainKt")
}
mainClass.set("Main")
}
8,343 changes: 8,343 additions & 0 deletions data/Herbert Frank - Dune.txt

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions src/main/java/Broker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;

class Broker implements AutoCloseable {
final static ConnectionFactory factory = new ConnectionFactory();
final Connection connection;

Broker() throws IOException, TimeoutException {
this.connection = factory.newConnection();
}
static {
factory.setHost("localhost");
}

BrokerQueue<Task> getTaskChannel() throws IOException {
return new BrokerQueue<Task>(connection, "task_queue");
}

BrokerQueue<Result> getResultChannel() throws IOException {
return new BrokerQueue<Result>(connection, "result_queue");
}

@Override
public void close() throws IOException {
connection.close();
}
}
38 changes: 38 additions & 0 deletions src/main/java/BrokerQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeoutException;

class BrokerQueue<T extends Serializable> implements AutoCloseable {
final String queueName;
Channel ch;
BrokerQueue(Connection con, String queueName) throws IOException {
this.queueName = queueName;
this.ch = con.createChannel();
ch.queueDeclare(queueName, false, false, true, null);
}

void send(T result) throws IOException {
var serializer = new ByteSerializer<T>();
ch.basicPublish("", queueName, null, serializer.toBytes(result));
}

void receive(DeliverCallback callback) throws IOException {
ch.basicConsume(queueName, false, callback, consumerTag -> { });
}

void ack(long dtag) throws IOException {
ch.basicAck(dtag, false);
}

void requeue(long dtag) throws IOException {
ch.basicReject(dtag, true);
}

public void close() throws TimeoutException, IOException {
ch.close();
}
}
25 changes: 25 additions & 0 deletions src/main/java/ByteSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

class ByteSerializer<T extends Serializable> {
byte[] toBytes(T obj) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(obj);
return baos.toByteArray();
}
}

@SuppressWarnings("unchecked")
T fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais)) {
return (T) ois.readObject();
}
}
}

55 changes: 53 additions & 2 deletions src/main/java/Main.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,56 @@
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;

public class Main {
public static void main(String[] args) {
System.out.println("Lab 3");
final static Path DUNE_TEXT = Path.of("data", "Herbert Frank - Dune.txt");

public static void main(String[] args) throws Exception {

// Warmup iteration
var results = processPath(DUNE_TEXT, 4, 512);

System.out.println(results.toString());
try (BufferedWriter writer = new BufferedWriter(new FileWriter("modifiedText.txt"))) {
writer.write("Modified text:\n");
writer.write(results.getModifiedText());
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter("longestSequences.txt"))) {
writer.write("Longest sequences:\n");
for (String sent : results.getLongestSentences()) {
writer.write(sent + "\n\n");
}
}

// Additional runs to measure performance
for (var workers : new int[] {1, 2, 4}) {
for (var linesInBatch : new int[] {8, 32, 128, 512}) {
processPath(DUNE_TEXT, workers, linesInBatch);
}
}
}

public static Result processPath(Path inputFile, int nWorkers, int linesInBatch) throws Exception {
try (final var broker = new Broker()) {
final var prod = new Producer(broker);
final var workers = new WorkerPool(nWorkers, broker);
final var collector = new ResultsCollector(broker);

Instant start = Instant.now();
workers.start();
collector.start();
prod.emitTasksForFile(inputFile, linesInBatch);
workers.waitUntilProcessed();
collector.waitUntilProcessed();
Instant end = Instant.now();

Duration duration = Duration.between(start, end);
System.out.printf("Benchmark(workers=%d, linesInBatch=%d): %d ms\n",
nWorkers, linesInBatch, duration.toMillis());
return collector.getResults();
}
}
}
50 changes: 50 additions & 0 deletions src/main/java/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Producer {
final Broker broker;

Producer(Broker broker) {
this.broker = broker;
}

void emitTasksForFile(Path path, int nLinesBlockSize) {
try (Stream<String> stream = Files.lines(path);
var ch = broker.getTaskChannel()) {
stream.collect(blockCollector(nLinesBlockSize))
.forEach((nLines) -> { sendBatch(ch, nLines); });
ch.send(Task.stopSignal());
} catch (Exception x) {
System.err.format("Exception: %s%n", x);
}
}

private void sendBatch(BrokerQueue<Task> ch, List<String> nLines) {
try {
Task task = new Task(nLines.stream().collect(Collectors.joining("\n")));
ch.send(task);
} catch (IOException err) {
System.err.println(err);
}
}

private static Collector<String, List<List<String>>, List<List<String>>> blockCollector(int blockSize) {
return Collector.of(
ArrayList<List<String>>::new,
(list, value) -> {
List<String> block = (list.isEmpty() ? null : list.get(list.size() - 1));
if (block == null || block.size() == blockSize)
list.add(block = new ArrayList<>(blockSize));
block.add(value);
},
(r1, r2) -> { throw new UnsupportedOperationException("Parallel processing not supported"); }
);
}
}

Loading