diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b657919 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/producer/target/ +/.idea/yatool.xml +.idea/ +/worker/target/ +/aggregator/target/ +/producer/src/main/resources/sample-text.txt +/aggregator/results/ +/producer/src/main/resources/big.txt diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/compiler.xml b/.idea/compiler.xml deleted file mode 100644 index 61a9130..0000000 --- a/.idea/compiler.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/gradle.xml b/.idea/gradle.xml deleted file mode 100644 index f9163b4..0000000 --- a/.idea/gradle.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml deleted file mode 100644 index fdc392f..0000000 --- a/.idea/jarRepositories.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml deleted file mode 100644 index e805548..0000000 --- a/.idea/kotlinc.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 2dae0dd..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 5c567f2..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/lab_3.main.iml b/.idea/modules/lab_3.main.iml deleted file mode 100644 index d6ff951..0000000 --- a/.idea/modules/lab_3.main.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/lab_3.test.iml b/.idea/modules/lab_3.test.iml deleted file mode 100644 index 8210e2d..0000000 --- a/.idea/modules/lab_3.test.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/README.md b/README.md index 97f8c85..98999e5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO) # Распределенная обработка текстовых данных с использованием брокера сообщений ## Цель задания: diff --git a/aggregator/pom.xml b/aggregator/pom.xml new file mode 100644 index 0000000..7eac84e --- /dev/null +++ b/aggregator/pom.xml @@ -0,0 +1,87 @@ + + 4.0.0 + + ru.ifmo + aggregator + 1.0 + jar + + Aggregator + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.30 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + com.fasterxml.jackson.core + jackson-core + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + diff --git a/aggregator/src/main/java/ru/ifmo/App.java b/aggregator/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/AppConfig.java b/aggregator/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..eeb2748 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,16 @@ +package ru.ifmo; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AppConfig { + @Bean + public ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + return mapper; + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..6e1bee2 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,84 @@ +package ru.ifmo.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + @Bean + public DirectExchange resultsExchange( + @Value("${rabbitmq.results.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue resultsQueue( + @Value("${rabbitmq.results.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public DirectExchange sessionInfoExchange( + @Value("${rabbitmq.session.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue sessionInfoQueue( + @Value("${rabbitmq.session.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding sessionInfoBinding( + Queue sessionInfoQueue, + DirectExchange sessionInfoExchange, + @Value("${rabbitmq.session.routing.key}") String routingKey + ) { + return BindingBuilder.bind(sessionInfoQueue).to(sessionInfoExchange).with(routingKey); + } + + @Bean + public DirectExchange finalResultsExchange( + @Value("${rabbitmq.final.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue finalResultsQueue( + @Value("${rabbitmq.final.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding finalResultsBinding( + Queue finalResultsQueue, + DirectExchange finalResultsExchange, + @Value("${rabbitmq.final.routing.key}") String routingKey + ) { + return BindingBuilder.bind(finalResultsQueue).to(finalResultsExchange).with(routingKey); + } +} diff --git a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java new file mode 100644 index 0000000..4541378 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java @@ -0,0 +1,32 @@ +package ru.ifmo.dto; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class AggregatedResult { + private String aggregationId; + private LocalDateTime startTime; + private LocalDateTime endTime; + private Long processingDurationMs; + private int totalSections; + private List processedTaskIds; + + private Long totalWordCount; + + private Map mergedTopWords; + + private String overallSentiment; // POSITIVE, NEGATIVE, NEUTRAL + private Double averageSentimentScore; + private Map sentimentDistribution; + + private List modifiedTextSections; + private String combinedModifiedText; + + private List allSortedSentences; + + private List sectionResults; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java b/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java new file mode 100644 index 0000000..c06cab5 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java @@ -0,0 +1,16 @@ +package ru.ifmo.dto; + +import lombok.Data; +import java.util.List; +import java.util.Map; + +@Data +public class SectionResult { + private String taskId; + private Integer wordCount; + private Map topWords; + private String sentiment; + private Double sentimentScore; + private String modifiedText; + private List sortedSentences; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java new file mode 100644 index 0000000..9e688d0 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -0,0 +1,13 @@ +package ru.ifmo.dto; + +import java.time.LocalDateTime; + +import lombok.Data; + +@Data +public class SessionInfo { + private String sessionId; + private int expectedTaskCount; + private String description; + private LocalDateTime startTime; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java b/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java new file mode 100644 index 0000000..c0d7f24 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java @@ -0,0 +1,17 @@ +package ru.ifmo.dto; + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class TextProcessingResult { + private String taskId; + private Integer wordCount; + private Map topWords; + private String sentiment; // POSITIVE, NEGATIVE, NEUTRAL + private Double sentimentScore; + private String modifiedText; + private List sortedSentences; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java b/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java new file mode 100644 index 0000000..3908093 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java @@ -0,0 +1,101 @@ +package ru.ifmo.listener; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import ru.ifmo.dto.AggregatedResult; +import ru.ifmo.dto.TextProcessingResult; +import ru.ifmo.service.AggregationService; +import ru.ifmo.service.ResultStorageService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ResultsListener { + + private final AggregationService aggregationService; + private final ResultStorageService resultStorageService; + private final RabbitTemplate rabbitTemplate; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Value("${rabbitmq.final.exchange.name}") + private String finalExchange; + + @Value("${rabbitmq.final.routing.key}") + private String finalRoutingKey; + + @RabbitListener(queues = "${rabbitmq.results.queue.name}") + public void receiveResult(String message) { + log.info("Received result message: {}", message); + + try { + TextProcessingResult result = objectMapper.readValue(message, TextProcessingResult.class); + log.info("Processing result for task: {}", result.getTaskId()); + + String sessionId = extractSessionId(result.getTaskId()); + + aggregationService.addResult(sessionId, result); + + checkAndTriggerAggregation(sessionId); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as TextProcessingResult: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing result: {}", e.getMessage(), e); + } + } + + private String extractSessionId(String taskId) { + // taskId format "session-001-task-001" + if (taskId != null && taskId.contains("-")) { + String[] parts = taskId.split("-"); + if (parts.length >= 2) { + return parts[0] + "-" + parts[1]; + } + } + return "default-session"; + } + + private void checkAndTriggerAggregation(String sessionId) { + if (aggregationService.isReadyForAggregation(sessionId)) { + log.info("Session {} is ready for aggregation, triggering...", sessionId); + triggerAggregation(sessionId); + } else { + int resultCount = aggregationService.getResultCount(sessionId); + log.info("Session {} not ready yet: {} results received", sessionId, resultCount); + } + } + + private void triggerAggregation(String sessionId) { + try { + log.info("Triggering aggregation for session: {}", sessionId); + + AggregatedResult aggregatedResult = aggregationService.aggregateResults(sessionId); + if (aggregatedResult != null) { + resultStorageService.storeResult(aggregatedResult); + + publishFinalResult(aggregatedResult); + + log.info("Aggregation completed and published for session: {}", sessionId); + } + + } catch (Exception e) { + log.error("Error during aggregation for session {}: {}", sessionId, e.getMessage(), e); + } + } + + private void publishFinalResult(AggregatedResult result) { + try { + rabbitTemplate.convertAndSend(finalExchange, finalRoutingKey, result); + log.info("Final result published for aggregation: {}", result.getAggregationId()); + } catch (Exception e) { + log.error("Failed to publish final result for aggregation {}: {}", + result.getAggregationId(), e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java new file mode 100644 index 0000000..b3bc378 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java @@ -0,0 +1,50 @@ +package ru.ifmo.listener; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.service.AggregationService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class SessionInfoListener { + + private final AggregationService aggregationService; + private final ObjectMapper objectMapper; + + @RabbitListener(queues = "${rabbitmq.session.queue.name}") + public void receiveSessionInfo(String message) { + log.info("Received session info message: {}", message); + + try { + SessionInfo sessionInfo = objectMapper.readValue(message, SessionInfo.class); + log.info("Processing session info: {} with expected {} tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + + aggregationService.setExpectedTaskCount( + sessionInfo.getSessionId(), + sessionInfo.getExpectedTaskCount() + ); + + if (sessionInfo.getStartTime() != null) { + aggregationService.setSessionStartTime( + sessionInfo.getSessionId(), + sessionInfo.getStartTime() + ); + } + + log.info("Successfully registered session {} with {} expected tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as SessionInfo: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing session info: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java new file mode 100644 index 0000000..8e58ca0 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java @@ -0,0 +1,228 @@ +package ru.ifmo.service; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.AggregatedResult; +import ru.ifmo.dto.SectionResult; +import ru.ifmo.dto.TextProcessingResult; + +@Service +@Slf4j +public class AggregationService { + + private final Map> aggregationSessions = new ConcurrentHashMap<>(); + + private final Map expectedTaskCounts = new ConcurrentHashMap<>(); + + private final Map sessionStartTimes = new ConcurrentHashMap<>(); + + public void setExpectedTaskCount(String sessionId, int expectedCount) { + expectedTaskCounts.put(sessionId, expectedCount); + log.info("Set expected task count for session {}: {}", sessionId, expectedCount); + } + + public void setSessionStartTime(String sessionId, LocalDateTime startTime) { + sessionStartTimes.put(sessionId, startTime); + log.info("Set start time for session {}: {}", sessionId, startTime); + } + + public void addResult(String sessionId, TextProcessingResult result) { + aggregationSessions.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(result); + log.info("Added result for task {} to session {} ({}/{})", + result.getTaskId(), sessionId, + getResultCount(sessionId), + expectedTaskCounts.get(sessionId)); + } + + public boolean isReadyForAggregation(String sessionId) { + Integer expectedCount = expectedTaskCounts.get(sessionId); + if (expectedCount == null) { + log.warn("No expected count set for session {}", sessionId); + return false; + } + + int currentCount = getResultCount(sessionId); + boolean ready = currentCount + 2 >= expectedCount; + + if (ready) { + log.info("Session {} is ready for aggregation: {}/{} tasks completed", + sessionId, currentCount, expectedCount); + } + + return ready; + } + + public AggregatedResult aggregateResults(String sessionId) { + List results = aggregationSessions.get(sessionId); + if (results == null || results.isEmpty()) { + log.warn("No results found for session {}", sessionId); + return null; + } + + log.info("Aggregating {} results for session {}", results.size(), sessionId); + + AggregatedResult aggregated = new AggregatedResult(); + aggregated.setAggregationId(sessionId); + + LocalDateTime endTime = LocalDateTime.now(); + aggregated.setEndTime(endTime); + + LocalDateTime startTime = sessionStartTimes.get(sessionId); + if (startTime != null) { + aggregated.setStartTime(startTime); + Duration processingDuration = Duration.between(startTime, endTime); + aggregated.setProcessingDurationMs(processingDuration.toMillis()); + log.info("Processing duration for session {}: {} ms", sessionId, processingDuration.toMillis()); + } + + aggregated.setTotalSections(results.size()); + aggregated.setProcessedTaskIds(results.stream() + .map(TextProcessingResult::getTaskId) + .collect(Collectors.toList())); + + aggregated.setTotalWordCount(aggregateWordCounts(results)); + + aggregated.setMergedTopWords(mergeTopWords(results)); + + aggregateSentiment(results, aggregated); + + aggregateModifiedText(results, aggregated); + + aggregated.setAllSortedSentences(aggregateSortedSentences(results)); + + aggregated.setSectionResults(createSectionResults(results)); + + log.info("Aggregation completed for session {}", sessionId); + return aggregated; + } + + private Long aggregateWordCounts(List results) { + return results.stream() + .filter(r -> r.getWordCount() != null) + .mapToLong(r -> r.getWordCount().longValue()) + .sum(); + } + + private Map mergeTopWords(List results) { + Map mergedWords = new HashMap<>(); + + for (TextProcessingResult result : results) { + if (result.getTopWords() != null) { + for (Map.Entry entry : result.getTopWords().entrySet()) { + mergedWords.merge(entry.getKey(), entry.getValue(), Integer::sum); + } + } + } + + return mergedWords.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + } + + private void aggregateSentiment(List results, AggregatedResult aggregated) { + List sentiments = results.stream() + .map(TextProcessingResult::getSentiment) + .filter(Objects::nonNull) + .toList(); + + List scores = results.stream() + .map(TextProcessingResult::getSentimentScore) + .filter(Objects::nonNull) + .toList(); + + Map distribution = sentiments.stream() + .collect(Collectors.groupingBy( + sentiment -> sentiment, + Collectors.summingInt(sentiment -> 1) + )); + aggregated.setSentimentDistribution(distribution); + + if (!scores.isEmpty()) { + double avgScore = scores.stream().mapToDouble(Double::doubleValue).average().orElse(0.0); + aggregated.setAverageSentimentScore(avgScore); + } + + String overallSentiment = distribution.entrySet().stream() + .max(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElse("NEUTRAL"); + aggregated.setOverallSentiment(overallSentiment); + } + + private void aggregateModifiedText(List results, AggregatedResult aggregated) { + List modifiedSections = results.stream() + .map(TextProcessingResult::getModifiedText) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + aggregated.setModifiedTextSections(modifiedSections); + + String combinedText = String.join("\n\n", modifiedSections); + aggregated.setCombinedModifiedText(combinedText); + } + + private List aggregateSortedSentences(List results) { + List allSentences = new ArrayList<>(); + + for (TextProcessingResult result : results) { + if (result.getSortedSentences() != null) { + allSentences.addAll(result.getSortedSentences()); + } + } + + return allSentences.stream() + .sorted(Comparator.comparing(String::length)) + .collect(Collectors.toList()); + } + + private List createSectionResults(List results) { + return results.stream() + .map(this::convertToSectionResult) + .collect(Collectors.toList()); + } + + private SectionResult convertToSectionResult(TextProcessingResult result) { + SectionResult section = new SectionResult(); + section.setTaskId(result.getTaskId()); + section.setWordCount(result.getWordCount()); + section.setTopWords(result.getTopWords()); + section.setSentiment(result.getSentiment()); + section.setSentimentScore(result.getSentimentScore()); + section.setModifiedText(result.getModifiedText()); + section.setSortedSentences(result.getSortedSentences()); + return section; + } + + public void clearSession(String sessionId) { + aggregationSessions.remove(sessionId); + expectedTaskCounts.remove(sessionId); + sessionStartTimes.remove(sessionId); + log.info("Cleared session {}", sessionId); + } + + public List getActiveSessions() { + return new ArrayList<>(aggregationSessions.keySet()); + } + + public int getResultCount(String sessionId) { + List results = aggregationSessions.get(sessionId); + return results != null ? results.size() : 0; + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java new file mode 100644 index 0000000..53449b4 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java @@ -0,0 +1,169 @@ +package ru.ifmo.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.AggregatedResult; + +@Service +@Slf4j +public class ResultStorageService { + + private final ObjectMapper objectMapper; + private final ConcurrentMap storedResults = new ConcurrentHashMap<>(); + private final String resultsDirectory = "results"; + + public ResultStorageService(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + try { + Path resultsPath = Paths.get(resultsDirectory); + if (!Files.exists(resultsPath)) { + Files.createDirectories(resultsPath); + log.info("Created results directory: {}", resultsPath.toAbsolutePath()); + } + } catch (IOException e) { + log.error("Failed to create results directory: {}", e.getMessage()); + } + } + + public void storeResult(AggregatedResult result) { + try { + storedResults.put(result.getAggregationId(), result); + + saveToFile(result); + + log.info("Stored aggregated result for session: {}", result.getAggregationId()); + + } catch (Exception e) { + log.error("Failed to store result for session {}: {}", + result.getAggregationId(), e.getMessage(), e); + } + } + + private void saveToFile(AggregatedResult result) throws IOException { + String timestamp = result.getEndTime() != null ? + result.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")) : + String.valueOf(System.currentTimeMillis()); + String filename = String.format("%s_%s.json", result.getAggregationId(), timestamp); + Path filePath = Paths.get(resultsDirectory, filename); + + objectMapper.writerWithDefaultPrettyPrinter().writeValue(filePath.toFile(), result); + log.info("Saved result to file: {}", filePath.toAbsolutePath()); + + saveSummaryReport(result, timestamp); + } + + private void saveSummaryReport(AggregatedResult result, String timestamp) throws IOException { + String filename = String.format("%s_%s_summary.txt", result.getAggregationId(), timestamp); + Path filePath = Paths.get(resultsDirectory, filename); + + StringBuilder report = new StringBuilder(); + report.append("=== TEXT PROCESSING AGGREGATION REPORT ===\n"); + report.append("Aggregation ID: ").append(result.getAggregationId()).append("\n"); + if (result.getStartTime() != null) { + report.append("Start Time: ").append(result.getStartTime()).append("\n"); + } + if (result.getEndTime() != null) { + report.append("End Time: ").append(result.getEndTime()).append("\n"); + } + if (result.getProcessingDurationMs() != null) { + report.append("Processing Duration: ").append(result.getProcessingDurationMs()).append(" ms (") + .append(String.format("%.2f", result.getProcessingDurationMs() / 1000.0)).append(" seconds)\n"); + } + report.append("Total Sections Processed: ").append(result.getTotalSections()).append("\n"); + report.append("Processed Task IDs: ").append(String.join(", ", result.getProcessedTaskIds())).append("\n\n"); + + report.append("=== WORD COUNT SUMMARY ===\n"); + report.append("Total Words: ").append(result.getTotalWordCount()).append("\n\n"); + + report.append("=== TOP WORDS SUMMARY ===\n"); + if (result.getMergedTopWords() != null) { + result.getMergedTopWords().entrySet().stream() + .limit(10) + .forEach(entry -> report.append(String.format("%-20s: %d\n", entry.getKey(), entry.getValue()))); + } + report.append("\n"); + + report.append("=== SENTIMENT ANALYSIS SUMMARY ===\n"); + report.append("Overall Sentiment: ").append(result.getOverallSentiment()).append("\n"); + report.append("Average Sentiment Score: ").append(String.format("%.3f", result.getAverageSentimentScore())).append("\n"); + if (result.getSentimentDistribution() != null) { + report.append("Sentiment Distribution:\n"); + result.getSentimentDistribution().forEach((sentiment, count) -> + report.append(String.format(" %s: %d sections\n", sentiment, count))); + } + report.append("\n"); + + report.append("=== TEXT MODIFICATION SUMMARY ===\n"); + report.append("Modified Text Sections: ").append(result.getModifiedTextSections() != null ? + result.getModifiedTextSections().size() : 0).append("\n"); + if (result.getCombinedModifiedText() != null) { + report.append("Combined Text Length: ").append(result.getCombinedModifiedText().length()).append(" " + + "characters\n"); + } + report.append("\n"); + + report.append("=== SENTENCE SORTING SUMMARY ===\n"); + report.append("Total Sorted Sentences: ").append(result.getAllSortedSentences() != null ? + result.getAllSortedSentences().size() : 0).append("\n"); + if (result.getAllSortedSentences() != null && !result.getAllSortedSentences().isEmpty()) { + report.append("Shortest Sentence: ").append(result.getAllSortedSentences().getFirst()).append("\n"); + report.append("Longest Sentence: ").append(result.getAllSortedSentences().getLast()).append("\n"); + } + report.append("\n"); + + report.append("=== PER-SECTION SUMMARY ===\n"); + if (result.getSectionResults() != null) { + result.getSectionResults().forEach(section -> { + report.append(String.format("Task ID: %s\n", section.getTaskId())); + report.append(String.format(" Word Count: %d\n", section.getWordCount())); + report.append(String.format(" Sentiment: %s (%.3f)\n", section.getSentiment(), + section.getSentimentScore())); + report.append(String.format(" Top Words: %d\n", section.getTopWords() != null ? + section.getTopWords().size() : 0)); + report.append(String.format(" Sentences: %d\n", section.getSortedSentences() != null ? + section.getSortedSentences().size() : 0)); + report.append("\n"); + }); + } + + Files.write(filePath, report.toString().getBytes()); + log.info("Saved summary report to file: {}", filePath.toAbsolutePath()); + } + + public AggregatedResult getResult(String aggregationId) { + return storedResults.get(aggregationId); + } + + public List getAllResults() { + return new ArrayList<>(storedResults.values()); + } + + public List getStoredResultIds() { + return new ArrayList<>(storedResults.keySet()); + } + + public boolean deleteResult(String aggregationId) { + AggregatedResult removed = storedResults.remove(aggregationId); + if (removed != null) { + log.info("Deleted result for aggregation: {}", aggregationId); + return true; + } + return false; + } + + public void clearAllResults() { + storedResults.clear(); + log.info("Cleared all stored results"); + } +} \ No newline at end of file diff --git a/aggregator/src/main/resources/application.properties b/aggregator/src/main/resources/application.properties new file mode 100644 index 0000000..870a4a6 --- /dev/null +++ b/aggregator/src/main/resources/application.properties @@ -0,0 +1,28 @@ +server.port=${PORT:8079} + +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +# Results queue configuration (listening to worker results) +rabbitmq.results.exchange.name=resultsExchange +rabbitmq.results.queue.name=resultsQueue +rabbitmq.results.routing.key=results + +# Session info queue configuration (receiving expected task counts) +rabbitmq.session.exchange.name=sessionInfoExchange +rabbitmq.session.queue.name=sessionInfoQueue +rabbitmq.session.routing.key=session + +# Final results queue configuration (publishing aggregated results) +rabbitmq.final.exchange.name=finalResultsExchange +rabbitmq.final.queue.name=finalResultsQueue +rabbitmq.final.routing.key=final + +# Swagger/OpenAPI Configuration +springdoc.api-docs.path=/api-docs +springdoc.swagger-ui.path=/swagger-ui.html +springdoc.swagger-ui.operationsSorter=method +springdoc.swagger-ui.tagsSorter=alpha +springdoc.swagger-ui.tryItOutEnabled=true diff --git a/build.gradle.kts b/build.gradle.kts deleted file mode 100644 index 8e91d96..0000000 --- a/build.gradle.kts +++ /dev/null @@ -1,29 +0,0 @@ -plugins { - kotlin("jvm") version "1.9.20" - application -} - -group = "org.itmo" -version = "1.0-SNAPSHOT" - -repositories { - mavenCentral() -} - -dependencies { - implementation("javax.jms:jms-api:2.0.1") - implementation("org.apache.activemq:activemq-broker:6.1.1") - testImplementation(kotlin("test")) -} - -tasks.test { - useJUnitPlatform() -} - -kotlin { - jvmToolchain(8) -} - -application { - mainClass.set("MainKt") -} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..de2ba73 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +services: + rabbitmq: + image: rabbitmq:3.10.7-management + hostname: rabbitmq + ports: + - "15672:15672" + - "5672:5672" diff --git a/gradlew b/gradlew deleted file mode 100755 index 1aa94a4..0000000 --- a/gradlew +++ /dev/null @@ -1,249 +0,0 @@ -#!/bin/sh - -# -# Copyright © 2015-2021 the original authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -############################################################################## -# -# Gradle start up script for POSIX generated by Gradle. -# -# Important for running: -# -# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is -# noncompliant, but you have some other compliant shell such as ksh or -# bash, then to run this script, type that shell name before the whole -# command line, like: -# -# ksh Gradle -# -# Busybox and similar reduced shells will NOT work, because this script -# requires all of these POSIX shell features: -# * functions; -# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», -# «${var#prefix}», «${var%suffix}», and «$( cmd )»; -# * compound commands having a testable exit status, especially «case»; -# * various built-in commands including «command», «set», and «ulimit». -# -# Important for patching: -# -# (2) This script targets any POSIX shell, so it avoids extensions provided -# by Bash, Ksh, etc; in particular arrays are avoided. -# -# The "traditional" practice of packing multiple parameters into a -# space-separated string is a well documented source of bugs and security -# problems, so this is (mostly) avoided, by progressively accumulating -# options in "$@", and eventually passing that to Java. -# -# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, -# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; -# see the in-line comments for details. -# -# There are tweaks for specific operating systems such as AIX, CygWin, -# Darwin, MinGW, and NonStop. -# -# (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt -# within the Gradle project. -# -# You can find Gradle at https://github.com/gradle/gradle/. -# -############################################################################## - -# Attempt to set APP_HOME - -# Resolve links: $0 may be a link -app_path=$0 - -# Need this for daisy-chained symlinks. -while - APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path - [ -h "$app_path" ] -do - ls=$( ls -ld "$app_path" ) - link=${ls#*' -> '} - case $link in #( - /*) app_path=$link ;; #( - *) app_path=$APP_HOME$link ;; - esac -done - -# This is normally unused -# shellcheck disable=SC2034 -APP_BASE_NAME=${0##*/} -# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) -APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit - -# Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD=maximum - -warn () { - echo "$*" -} >&2 - -die () { - echo - echo "$*" - echo - exit 1 -} >&2 - -# OS specific support (must be 'true' or 'false'). -cygwin=false -msys=false -darwin=false -nonstop=false -case "$( uname )" in #( - CYGWIN* ) cygwin=true ;; #( - Darwin* ) darwin=true ;; #( - MSYS* | MINGW* ) msys=true ;; #( - NONSTOP* ) nonstop=true ;; -esac - -CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar - - -# Determine the Java command to use to start the JVM. -if [ -n "$JAVA_HOME" ] ; then - if [ -x "$JAVA_HOME/jre/sh/java" ] ; then - # IBM's JDK on AIX uses strange locations for the executables - JAVACMD=$JAVA_HOME/jre/sh/java - else - JAVACMD=$JAVA_HOME/bin/java - fi - if [ ! -x "$JAVACMD" ] ; then - die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi -else - JAVACMD=java - if ! command -v java >/dev/null 2>&1 - then - die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi -fi - -# Increase the maximum file descriptors if we can. -if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then - case $MAX_FD in #( - max*) - # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 - MAX_FD=$( ulimit -H -n ) || - warn "Could not query maximum file descriptor limit" - esac - case $MAX_FD in #( - '' | soft) :;; #( - *) - # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 - ulimit -n "$MAX_FD" || - warn "Could not set maximum file descriptor limit to $MAX_FD" - esac -fi - -# Collect all arguments for the java command, stacking in reverse order: -# * args from the command line -# * the main class name -# * -classpath -# * -D...appname settings -# * --module-path (only if needed) -# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. - -# For Cygwin or MSYS, switch paths to Windows format before running java -if "$cygwin" || "$msys" ; then - APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) - CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) - - JAVACMD=$( cygpath --unix "$JAVACMD" ) - - # Now convert the arguments - kludge to limit ourselves to /bin/sh - for arg do - if - case $arg in #( - -*) false ;; # don't mess with options #( - /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath - [ -e "$t" ] ;; #( - *) false ;; - esac - then - arg=$( cygpath --path --ignore --mixed "$arg" ) - fi - # Roll the args list around exactly as many times as the number of - # args, so each arg winds up back in the position where it started, but - # possibly modified. - # - # NB: a `for` loop captures its iteration list before it begins, so - # changing the positional parameters here affects neither the number of - # iterations, nor the values presented in `arg`. - shift # remove old arg - set -- "$@" "$arg" # push replacement arg - done -fi - - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' - -# Collect all arguments for the java command: -# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, -# and any embedded shellness will be escaped. -# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be -# treated as '${Hostname}' itself on the command line. - -set -- \ - "-Dorg.gradle.appname=$APP_BASE_NAME" \ - -classpath "$CLASSPATH" \ - org.gradle.wrapper.GradleWrapperMain \ - "$@" - -# Stop when "xargs" is not available. -if ! command -v xargs >/dev/null 2>&1 -then - die "xargs is not available" -fi - -# Use "xargs" to parse quoted args. -# -# With -n1 it outputs one arg per line, with the quotes and backslashes removed. -# -# In Bash we could simply go: -# -# readarray ARGS < <( xargs -n1 <<<"$var" ) && -# set -- "${ARGS[@]}" "$@" -# -# but POSIX shell has neither arrays nor command substitution, so instead we -# post-process each arg (as a line of input to sed) to backslash-escape any -# character that might be a shell metacharacter, then use eval to reverse -# that process (while maintaining the separation between arguments), and wrap -# the whole thing up as a single "set" statement. -# -# This will of course break if any of these variables contains a newline or -# an unmatched quote. -# - -eval "set -- $( - printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | - xargs -n1 | - sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | - tr '\n' ' ' - )" '"$@"' - -exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index 93e3f59..0000000 --- a/gradlew.bat +++ /dev/null @@ -1,92 +0,0 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%"=="" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%"=="" set DIRNAME=. -@rem This is normally unused -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if %ERRORLEVEL% equ 0 goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega diff --git a/producer/pom.xml b/producer/pom.xml new file mode 100644 index 0000000..3ef3c4d --- /dev/null +++ b/producer/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + ru.ifmo + producer + 1.0 + jar + + Producer + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.30 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + com.fasterxml.jackson.core + jackson-core + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + + diff --git a/producer/src/main/java/ru/ifmo/App.java b/producer/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/producer/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/AppConfig.java b/producer/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..eeb2748 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,16 @@ +package ru.ifmo; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AppConfig { + @Bean + public ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + return mapper; + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..80c06e0 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,39 @@ +package ru.ifmo.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + + @Bean + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + @Bean + public DirectExchange tasksExchange( + @Value("${rabbitmq.tasks.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public DirectExchange sessionInfoExchange( + @Value("${rabbitmq.session.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } +} diff --git a/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java b/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java new file mode 100644 index 0000000..6dc8702 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java @@ -0,0 +1,20 @@ +package ru.ifmo.configuration; + +import io.swagger.v3.oas.models.OpenAPI; +import io.swagger.v3.oas.models.info.Info; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SwaggerConfig { + + @Bean + public OpenAPI customOpenAPI() { + return new OpenAPI() + .info( + new Info() + .title("Producer API") + .version("1.0") + ); + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/dto/SessionInfo.java b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java new file mode 100644 index 0000000..9e688d0 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -0,0 +1,13 @@ +package ru.ifmo.dto; + +import java.time.LocalDateTime; + +import lombok.Data; + +@Data +public class SessionInfo { + private String sessionId; + private int expectedTaskCount; + private String description; + private LocalDateTime startTime; +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/dto/TextTask.java b/producer/src/main/java/ru/ifmo/dto/TextTask.java new file mode 100644 index 0000000..9eda89a --- /dev/null +++ b/producer/src/main/java/ru/ifmo/dto/TextTask.java @@ -0,0 +1,11 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class TextTask { + private String taskId; + private String text; + private int topN; // for top-N words task, default will be handled in service + private String nameReplacement; // for name replacement task, default will be handled in service +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java new file mode 100644 index 0000000..3a7449b --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java @@ -0,0 +1,73 @@ +package ru.ifmo.service; + +import java.util.List; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.dto.TextTask; + +@Service +@RequiredArgsConstructor +@Slf4j +public class MessagePublisherService { + + private final RabbitTemplate rabbitTemplate; + + @Value("${rabbitmq.tasks.exchange.name}") + private String tasksExchange; + + @Value("${rabbitmq.session.exchange.name}") + private String sessionExchange; + + @Value("${rabbitmq.session.routing.key}") + private String sessionRoutingKey; + + @Value("${rabbitmq.queue.name}") + private String queueName; + + public void publishSessionInfo(SessionInfo sessionInfo) { + try { + rabbitTemplate.convertAndSend(sessionExchange, sessionRoutingKey, sessionInfo); + log.info("Published session info: {} with {} expected tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + } catch (Exception e) { + log.error("Failed to publish session info for {}: {}", + sessionInfo.getSessionId(), e.getMessage(), e); + throw new RuntimeException("Failed to publish session info", e); + } + } + + public void publishTasks(List tasks) { + if (tasks == null || tasks.isEmpty()) { + log.warn("No tasks to publish"); + return; + } + + log.info("Publishing {} tasks to exchange {}", tasks.size(), tasksExchange); + + int successCount = 0; + int failureCount = 0; + + for (TextTask task : tasks) { + try { + rabbitTemplate.convertAndSend(queueName, task); + successCount++; + log.debug("Published task: {}", task.getTaskId()); + } catch (Exception e) { + failureCount++; + log.error("Failed to publish task {}: {}", task.getTaskId(), e.getMessage(), e); + } + } + + log.info("Task publishing completed. Success: {}, Failures: {}", successCount, failureCount); + + if (failureCount > 0) { + throw new RuntimeException(String.format("Failed to publish %d out of %d tasks", + failureCount, tasks.size())); + } + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java new file mode 100644 index 0000000..9fd9406 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java @@ -0,0 +1,58 @@ +package ru.ifmo.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +@Slf4j +public class StartupTextProcessingService { + + private final TextProcessingJobService textProcessingJobService; + + @EventListener(ApplicationReadyEvent.class) + public void processDefaultTextOnStartup() { + try { + log.info("Application started, checking for default text processing..."); + + // Check if sample text file exists and process it + ClassPathResource resource = new ClassPathResource("big.txt"); + if (resource.exists()) { + log.info("Found sample text file, processing..."); + + // Copy to temp file for processing + Path tempFile = Files.createTempFile("big", ".txt"); + Files.copy(resource.getInputStream(), tempFile, + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + + // Process the file + String sessionId = textProcessingJobService.processTextFile( + tempFile.toString(), + TextProcessingJobService.SplitStrategy.BY_PARAGRAPHS, + 1000 + ); + + log.info("Sample text processing started with session ID: {}", sessionId); + + // Clean up temp file + Files.deleteIfExists(tempFile); + + } else { + log.info("No sample text file found, skipping automatic processing"); + } + + } catch (IOException e) { + log.error("Error processing sample text file: {}", e.getMessage()); + } catch (Exception e) { + log.error("Unexpected error during startup text processing: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java new file mode 100644 index 0000000..d8a89ca --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java @@ -0,0 +1,118 @@ +package ru.ifmo.service; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.dto.TextTask; + +@Service +@RequiredArgsConstructor +@Slf4j +public class TextProcessingJobService { + + private final TextSplitterService textSplitterService; + private final MessagePublisherService messagePublisherService; + + public String processTextFile(String filePath, SplitStrategy strategy, int splitSize) throws IOException { + log.info("Starting text processing job for file: {}", filePath); + + // Generate session ID + String sessionId = textSplitterService.generateSessionId(); + log.info("Generated session ID: {}", sessionId); + + // Read and split the text file + List tasks = splitTextFile(filePath, sessionId, strategy, splitSize); + + if (tasks.isEmpty()) { + throw new IllegalArgumentException("No tasks generated from file: " + filePath); + } + + // Send session info to aggregator + SessionInfo sessionInfo = new SessionInfo(); + sessionInfo.setSessionId(sessionId); + sessionInfo.setExpectedTaskCount(tasks.size()); + sessionInfo.setDescription("Text processing job for file: " + filePath); + sessionInfo.setStartTime(LocalDateTime.now()); + + messagePublisherService.publishSessionInfo(sessionInfo); + + // Send tasks to workers + messagePublisherService.publishTasks(tasks); + + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); + return sessionId; + } + + public String processTextContent(String text, String description) { + return processTextContent(text, description, SplitStrategy.BY_PARAGRAPHS, 1000); + } + + public String processTextContent(String text, String description, SplitStrategy strategy, int splitSize) { + log.info("Starting text processing job for content: {}", description); + + // Generate session ID + String sessionId = textSplitterService.generateSessionId(); + log.info("Generated session ID: {}", sessionId); + + // Split the text content + List tasks = splitTextContent(text, sessionId, strategy, splitSize); + + if (tasks.isEmpty()) { + throw new IllegalArgumentException("No tasks generated from text content"); + } + + // Send session info to aggregator + SessionInfo sessionInfo = new SessionInfo(); + sessionInfo.setSessionId(sessionId); + sessionInfo.setExpectedTaskCount(tasks.size()); + sessionInfo.setDescription(description); + sessionInfo.setStartTime(LocalDateTime.now()); + + messagePublisherService.publishSessionInfo(sessionInfo); + + // Send tasks to workers + messagePublisherService.publishTasks(tasks); + + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); + return sessionId; + } + + private List splitTextFile(String filePath, String sessionId, SplitStrategy strategy, int splitSize) throws IOException { + switch (strategy) { + case BY_PARAGRAPHS: + return textSplitterService.splitTextFile(filePath, sessionId); + case BY_SENTENCES: + String content = java.nio.file.Files.readString(java.nio.file.Paths.get(filePath)); + return textSplitterService.splitTextBySentences(content, sessionId, splitSize); + case BY_WORDS: + content = java.nio.file.Files.readString(java.nio.file.Paths.get(filePath)); + return textSplitterService.splitTextByWords(content, sessionId, splitSize); + default: + throw new IllegalArgumentException("Unknown split strategy: " + strategy); + } + } + + private List splitTextContent(String text, String sessionId, SplitStrategy strategy, int splitSize) { + switch (strategy) { + case BY_PARAGRAPHS: + return textSplitterService.splitTextIntoTasks(text, sessionId); + case BY_SENTENCES: + return textSplitterService.splitTextBySentences(text, sessionId, splitSize); + case BY_WORDS: + return textSplitterService.splitTextByWords(text, sessionId, splitSize); + default: + throw new IllegalArgumentException("Unknown split strategy: " + strategy); + } + } + + public enum SplitStrategy { + BY_PARAGRAPHS, + BY_SENTENCES, + BY_WORDS + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/TextSplitterService.java b/producer/src/main/java/ru/ifmo/service/TextSplitterService.java new file mode 100644 index 0000000..5e19a30 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/TextSplitterService.java @@ -0,0 +1,178 @@ +package ru.ifmo.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.TextTask; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Service +@Slf4j +public class TextSplitterService { + + @Value("${text.processing.chunk.size:1000}") + private int chunkSize; + + @Value("${text.processing.top.words:10}") + private int topWords; + + @Value("${text.processing.name.replacement:[NAME]}") + private String nameReplacement; + + public List splitTextFile(String filePath, String sessionId) throws IOException { + log.info("Reading text file: {}", filePath); + + Path path = Paths.get(filePath); + if (!Files.exists(path)) { + throw new IOException("File not found: " + filePath); + } + + String content = Files.readString(path); + log.info("File read successfully. Content length: {} characters", content.length()); + + return splitTextIntoTasks(content, sessionId); + } + + public List splitTextIntoTasks(String text, String sessionId) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + // Split by paragraphs first + String[] paragraphs = text.split("\\n\\s*\\n"); + + StringBuilder currentChunk = new StringBuilder(); + int taskCounter = 1; + + for (String paragraph : paragraphs) { + paragraph = paragraph.trim(); + if (paragraph.isEmpty()) { + continue; + } + + // If adding this paragraph would exceed chunk size, create a task + if (currentChunk.length() > 0 && + currentChunk.length() + paragraph.length() + 2 > chunkSize) { + + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter++)); + currentChunk = new StringBuilder(); + } + + if (currentChunk.length() > 0) { + currentChunk.append("\n\n"); + } + currentChunk.append(paragraph); + } + + // Add the last chunk if it's not empty + if (currentChunk.length() > 0) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter)); + } + + log.info("Split text into {} tasks for session {}", tasks.size(), sessionId); + return tasks; + } + + public List splitTextBySentences(String text, String sessionId, int sentencesPerTask) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + // Split by sentences + String[] sentences = text.split("(?<=[.!?])\\s+"); + + List currentChunk = new ArrayList<>(); + int taskCounter = 1; + + for (String sentence : sentences) { + sentence = sentence.trim(); + if (sentence.isEmpty()) { + continue; + } + + currentChunk.add(sentence); + + if (currentChunk.size() >= sentencesPerTask) { + String chunkText = String.join(" ", currentChunk); + tasks.add(createTextTask(chunkText, sessionId, taskCounter++)); + currentChunk.clear(); + } + } + + // Add the last chunk if it's not empty + if (!currentChunk.isEmpty()) { + String chunkText = String.join(" ", currentChunk); + tasks.add(createTextTask(chunkText, sessionId, taskCounter)); + } + + log.info("Split text into {} tasks by sentences for session {}", tasks.size(), sessionId); + return tasks; + } + + public List splitTextByWords(String text, String sessionId, int wordsPerTask) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + String[] words = text.split("\\s+"); + + StringBuilder currentChunk = new StringBuilder(); + int wordCount = 0; + int taskCounter = 1; + + for (String word : words) { + if (word.trim().isEmpty()) { + continue; + } + + if (wordCount > 0) { + currentChunk.append(" "); + } + currentChunk.append(word); + wordCount++; + + if (wordCount >= wordsPerTask) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter++)); + currentChunk = new StringBuilder(); + wordCount = 0; + } + } + + // Add the last chunk if it's not empty + if (wordCount > 0) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter)); + } + + log.info("Split text into {} tasks by words for session {}", tasks.size(), sessionId); + return tasks; + } + + private TextTask createTextTask(String text, String sessionId, int taskNumber) { + TextTask task = new TextTask(); + task.setTaskId(String.format("%s-task-%03d", sessionId, taskNumber)); + task.setText(text); + task.setTopN(topWords); + task.setNameReplacement(nameReplacement); + return task; + } + + public String generateSessionId() { + return "session-" + UUID.randomUUID().toString().substring(0, 8); + } +} \ No newline at end of file diff --git a/producer/src/main/resources/application.properties b/producer/src/main/resources/application.properties new file mode 100644 index 0000000..2d782b7 --- /dev/null +++ b/producer/src/main/resources/application.properties @@ -0,0 +1,26 @@ +server.port=8080 + +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +# Tasks exchange configuration (for sending tasks to workers) +rabbitmq.tasks.exchange.name=textTopic +rabbitmq.queue.name=workerQueue + +# Session info exchange configuration (for sending session info to aggregator) +rabbitmq.session.exchange.name=sessionInfoExchange +rabbitmq.session.routing.key=session + +# Text processing configuration +text.processing.chunk.size=1000 +text.processing.top.words=10 +text.processing.name.replacement=[NAME] + +# Swagger/OpenAPI Configuration +springdoc.api-docs.path=/api-docs +springdoc.swagger-ui.path=/swagger-ui.html +springdoc.swagger-ui.operationsSorter=method +springdoc.swagger-ui.tagsSorter=alpha +springdoc.swagger-ui.tryItOutEnabled=true diff --git a/producer/src/main/resources/logback.xml b/producer/src/main/resources/logback.xml new file mode 100644 index 0000000..c53c4a5 --- /dev/null +++ b/producer/src/main/resources/logback.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/producer/src/main/resources/templates/hello.html b/producer/src/main/resources/templates/hello.html new file mode 100644 index 0000000..8cb9705 --- /dev/null +++ b/producer/src/main/resources/templates/hello.html @@ -0,0 +1,12 @@ + + + + + + + +

+ Hello! +

+ + \ No newline at end of file diff --git a/producer/src/test/java/ru/ifmo/.nop b/producer/src/test/java/ru/ifmo/.nop new file mode 100644 index 0000000..e69de29 diff --git a/producer/src/test/resources/.gitkeep b/producer/src/test/resources/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/Main.java b/src/main/java/Main.java deleted file mode 100644 index 477e8b9..0000000 --- a/src/main/java/Main.java +++ /dev/null @@ -1,5 +0,0 @@ -public class Main { - public static void main(String[] args) { - System.out.println("Lab 3"); - } -} diff --git a/worker/pom.xml b/worker/pom.xml new file mode 100644 index 0000000..96fe267 --- /dev/null +++ b/worker/pom.xml @@ -0,0 +1,86 @@ + + 4.0.0 + + ru.ifmo + worker + 1.0 + jar + + Worker + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.30 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + diff --git a/worker/src/main/java/ru/ifmo/App.java b/worker/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/worker/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/AppConfig.java b/worker/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..af5370e --- /dev/null +++ b/worker/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,7 @@ +package ru.ifmo; + +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AppConfig { +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/MessagesListener.java b/worker/src/main/java/ru/ifmo/MessagesListener.java new file mode 100644 index 0000000..20d13a5 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/MessagesListener.java @@ -0,0 +1,65 @@ +package ru.ifmo; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import ru.ifmo.dto.TextTask; +import ru.ifmo.dto.TextProcessingResult; +import ru.ifmo.service.TextProcessingService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class MessagesListener { + + private final TextProcessingService textProcessingService; + private final RabbitTemplate rabbitTemplate; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Value("${rabbitmq.results.exchange.name}") + private String resultsExchange; + + @Value("${rabbitmq.results.queue.name}") + private String resultsQueue; + + @Value("${rabbitmq.results.routing.key}") + private String resultsRoutingKey; + + @RabbitListener(queues = "${rabbitmq.queue.name}") + public void receiveMessage(String message) { + log.info("Received message: {}", message); + + try { + // Parse the incoming message as TextTask + TextTask task = objectMapper.readValue(message, TextTask.class); + log.info("Processing task: {}", task.getTaskId()); + + // Process the task + TextProcessingResult result = textProcessingService.processTask(task); + + // Send result back to results queue + sendResult(result); + + log.info("Successfully processed task: {}", task.getTaskId()); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as TextTask: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing task: {}", e.getMessage(), e); + } + } + + private void sendResult(TextProcessingResult result) { + try { + rabbitTemplate.convertAndSend(resultsQueue, result); + log.info("Result sent for task: {}", result.getTaskId()); + } catch (Exception e) { + log.error("Failed to send result for task {}: {}", result.getTaskId(), e.getMessage(), e); + } + } +} diff --git a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..bf024a5 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,52 @@ +package ru.ifmo.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + @Bean + public DirectExchange fanoutExchange( + @Value("${rabbitmq.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue queue( + @Value("${rabbitmq.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public DirectExchange resultsExchange( + @Value("${rabbitmq.results.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue resultsQueue( + @Value("${rabbitmq.results.queue.name}") String queueName + ) { + return new Queue(queueName); + } +} diff --git a/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java new file mode 100644 index 0000000..d83331b --- /dev/null +++ b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java @@ -0,0 +1,17 @@ +package ru.ifmo.dto; + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class TextProcessingResult { + private String taskId; + private Integer wordCount; + private Map topWords; + private String sentiment; // POSITIVE, NEGATIVE, NEUTRAL + private Double sentimentScore; + private String modifiedText; + private List sortedSentences; +} diff --git a/worker/src/main/java/ru/ifmo/dto/TextTask.java b/worker/src/main/java/ru/ifmo/dto/TextTask.java new file mode 100644 index 0000000..36a438d --- /dev/null +++ b/worker/src/main/java/ru/ifmo/dto/TextTask.java @@ -0,0 +1,11 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class TextTask { + private String taskId; + private String text; + private int topN; + private String nameReplacement; +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/NameReplacementService.java b/worker/src/main/java/ru/ifmo/service/NameReplacementService.java new file mode 100644 index 0000000..b0f269c --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/NameReplacementService.java @@ -0,0 +1,84 @@ +package ru.ifmo.service; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.springframework.stereotype.Service; + +@Service +public class NameReplacementService { + + private static final Pattern NAME_PATTERN = Pattern.compile( + "(? { + String prefix = matchResult.group().substring(0, matchResult.start(1) - matchResult.start()); + return prefix + replacement; + }); + + result = QUOTED_NAME_PATTERN.matcher(result) + .replaceAll(matchResult -> "\"" + replacement + "\"" + + matchResult.group().substring(matchResult.end(1) + 1)); + + Matcher nameMatcher = NAME_PATTERN.matcher(result); + StringBuffer sb = new StringBuffer(); + + while (nameMatcher.find()) { + String match = nameMatcher.group(); + if (isPotentialName(match)) { + nameMatcher.appendReplacement(sb, replacement); + } else { + nameMatcher.appendReplacement(sb, match); + } + } + nameMatcher.appendTail(sb); + + return sb.toString(); + } + + private boolean isPotentialName(String word) { + String lowerWord = word.toLowerCase(); + + String[] excludeWords = { + "january", "february", "march", "april", "may", "june", + "july", "august", "september", "october", "november", "december", + "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", + "январь", "февраль", "март", "апрель", "май", "июнь", + "июль", "август", "сентябрь", "октябрь", "ноябрь", "декабрь", + "понедельник", "вторник", "среда", "четверг", "пятница", "суббота", "воскресенье", + "the", "this", "that", "these", "those", "when", "where", "why", "how", + "это", "этот", "эта", "эти", "когда", "где", "почему", "как", + "god", "lord", "jesus", "christ", "allah", "buddha", + "бог", "господь", "иисус", "христос", "аллах", "будда", + "america", "europe", "asia", "africa", "australia", + "америка", "европа", "азия", "африка", "австралия" + }; + + for (String exclude : excludeWords) { + if (lowerWord.equals(exclude)) { + return false; + } + } + + return word.length() >= 2 && word.length() <= 20 && word.matches("^[A-ZА-Я][a-zа-я]+$"); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java b/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java new file mode 100644 index 0000000..0995b96 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java @@ -0,0 +1,78 @@ +package ru.ifmo.service; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; + +@Service +public class SentenceSortingService { + + private static final Pattern SENTENCE_PATTERN = Pattern.compile( + "(?<=[.!?])\\s+(?=[A-ZА-Я])" + ); + + public List sortSentencesByLength(String text) { + if (text == null || text.trim().isEmpty()) { + return new ArrayList<>(); + } + + List sentences = splitIntoSentences(text); + + return sentences.stream() + .filter(sentence -> !sentence.trim().isEmpty()) + .sorted(Comparator.comparing(String::length)) + .collect(Collectors.toList()); + } + + public List sortSentencesByLengthDescending(String text) { + if (text == null || text.trim().isEmpty()) { + return new ArrayList<>(); + } + + List sentences = splitIntoSentences(text); + + return sentences.stream() + .filter(sentence -> !sentence.trim().isEmpty()) + .sorted(Comparator.comparing(String::length).reversed()) + .collect(Collectors.toList()); + } + + private List splitIntoSentences(String text) { + String cleanText = text.trim().replaceAll("\\s+", " "); + + String[] sentences = SENTENCE_PATTERN.split(cleanText); + + List result = new ArrayList<>(); + for (String sentence : sentences) { + String trimmed = sentence.trim(); + if (!trimmed.isEmpty()) { + if (!trimmed.matches(".*[.!?]$")) { + if (!trimmed.endsWith(".") && !trimmed.endsWith("!") && !trimmed.endsWith("?")) { + trimmed += "."; + } + } + result.add(trimmed); + } + } + + if (result.isEmpty() && !cleanText.isEmpty()) { + String[] fallbackSentences = cleanText.split("(?<=[.!?])\\s+"); + for (String sentence : fallbackSentences) { + String trimmed = sentence.trim(); + if (!trimmed.isEmpty()) { + result.add(trimmed); + } + } + } + + if (result.isEmpty() && !cleanText.isEmpty()) { + result.add(cleanText); + } + + return result; + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java b/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java new file mode 100644 index 0000000..83fd51d --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java @@ -0,0 +1,120 @@ +package ru.ifmo.service; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import jakarta.annotation.PostConstruct; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Service; + +@Service +public class SentimentAnalysisService { + + private Set positiveWords; + private Set negativeWords; + + @PostConstruct + public void init() { + positiveWords = loadWordsFromFile("sentiment/positive-words.txt"); + negativeWords = loadWordsFromFile("sentiment/negative-words.txt"); + } + + public String analyzeSentiment(String text) { + if (text == null || text.trim().isEmpty()) { + return "NEUTRAL"; + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") + .trim() + .split("\\s+"); + + int positiveCount = 0; + int negativeCount = 0; + + for (String word : words) { + if (positiveWords.contains(word)) { + positiveCount++; + } else if (negativeWords.contains(word)) { + negativeCount++; + } + } + + if (positiveCount > negativeCount) { + return "POSITIVE"; + } else if (negativeCount > positiveCount) { + return "NEGATIVE"; + } else { + return "NEUTRAL"; + } + } + + public double calculateSentimentScore(String text) { + if (text == null || text.trim().isEmpty()) { + return 0.0; + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") + .trim() + .split("\\s+"); + + int positiveCount = 0; + int negativeCount = 0; + int totalWords = words.length; + + for (String word : words) { + if (positiveWords.contains(word)) { + positiveCount++; + } else if (negativeWords.contains(word)) { + negativeCount++; + } + } + + return totalWords > 0 ? (double) (positiveCount - negativeCount) / totalWords : 0.0; + } + + private Set loadWordsFromFile(String fileName) { + Set words = new HashSet<>(); + try { + ClassPathResource resource = new ClassPathResource(fileName); + if (resource.exists()) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + line = line.trim().toLowerCase(); + if (!line.isEmpty() && !line.startsWith("#")) { + words.add(line); + } + } + } + } + } catch (IOException e) { + System.err.println("Could not load sentiment words from " + fileName + ": " + e.getMessage()); + return getBasicSentimentWords(fileName); + } + return words; + } + + private Set getBasicSentimentWords(String fileName) { + if (fileName.contains("positive")) { + return new HashSet<>(Arrays.asList( + "good", "great", "excellent", "amazing", "wonderful", "fantastic", "awesome", "perfect", + "love", "like", "enjoy", "happy", "pleased", "satisfied", "delighted", "thrilled", + "beautiful", "brilliant", "outstanding", "superb", "magnificent", "terrific", + "хорошо", "отлично", "прекрасно", "замечательно", "великолепно", "чудесно" + )); + } else { + return new HashSet<>(Arrays.asList( + "bad", "terrible", "awful", "horrible", "disgusting", "hate", "dislike", "angry", + "sad", "disappointed", "upset", "annoyed", "frustrated", "furious", "disgusted", + "worst", "pathetic", "useless", "stupid", "ridiculous", "absurd", + "плохо", "ужасно", "отвратительно", "кошмар", "ненавижу", "расстроен" + )); + } + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TextProcessingService.java b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java new file mode 100644 index 0000000..fbc8acc --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java @@ -0,0 +1,37 @@ +package ru.ifmo.service; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.TextTask; +import ru.ifmo.dto.TextProcessingResult; + +@Service +@RequiredArgsConstructor +public class TextProcessingService { + + private final WordCountService wordCountService; + private final TopWordsService topWordsService; + private final SentimentAnalysisService sentimentAnalysisService; + private final NameReplacementService nameReplacementService; + private final SentenceSortingService sentenceSortingService; + + public TextProcessingResult processTask(TextTask task) { + TextProcessingResult result = new TextProcessingResult(); + result.setTaskId(task.getTaskId()); + + result.setWordCount(wordCountService.countWords(task.getText())); + + int topN = task.getTopN() > 0 ? task.getTopN() : 5; + result.setTopWords(topWordsService.findTopWords(task.getText(), topN)); + + result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); + result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); + + String replacement = task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]"; + result.setModifiedText(nameReplacementService.replaceNames(task.getText(), replacement)); + + result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); + + return result; + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TopWordsService.java b/worker/src/main/java/ru/ifmo/service/TopWordsService.java new file mode 100644 index 0000000..6283782 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/TopWordsService.java @@ -0,0 +1,45 @@ +package ru.ifmo.service; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; + +@Service +public class TopWordsService { + + public Map findTopWords(String text, int topN) { + if (text == null || text.trim().isEmpty() || topN <= 0) { + return new HashMap<>(); + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") // Remove punctuation + .trim() + .split("\\s+"); + + Map wordCount = Arrays.stream(words) + .filter(word -> !word.trim().isEmpty()) + .filter(this::isValidWord) + .collect(Collectors.groupingBy( + word -> word, + Collectors.summingInt(word -> 1) + )); + + return wordCount.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + Math::max, + LinkedHashMap::new + )); + } + + private boolean isValidWord(String word) { + return word.length() > 1 && word.matches(".*[a-zA-Zа-яА-Я].*"); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/WordCountService.java b/worker/src/main/java/ru/ifmo/service/WordCountService.java new file mode 100644 index 0000000..de8a20b --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/WordCountService.java @@ -0,0 +1,26 @@ +package ru.ifmo.service; + +import java.util.Arrays; + +import org.springframework.stereotype.Service; + +@Service +public class WordCountService { + + public int countWords(String text) { + if (text == null || text.trim().isEmpty()) { + return 0; + } + + String[] words = text.trim().split("\\s+"); + + return (int) Arrays.stream(words) + .filter(word -> !word.trim().isEmpty()) + .filter(this::isWord) + .count(); + } + + private boolean isWord(String token) { + return token.matches(".*[a-zA-Zа-яА-Я].*"); + } +} \ No newline at end of file diff --git a/worker/src/main/resources/application.properties b/worker/src/main/resources/application.properties new file mode 100644 index 0000000..1410d7e --- /dev/null +++ b/worker/src/main/resources/application.properties @@ -0,0 +1,14 @@ +server.port=${PORT:8081} + +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +rabbitmq.exchange.name=textTopic +rabbitmq.queue.name=workerQueue + +# Results queue configuration +rabbitmq.results.exchange.name=resultsExchange +rabbitmq.results.queue.name=resultsQueue +rabbitmq.results.routing.key=results diff --git a/worker/src/main/resources/sentiment/negative-words.txt b/worker/src/main/resources/sentiment/negative-words.txt new file mode 100644 index 0000000..1347bd0 --- /dev/null +++ b/worker/src/main/resources/sentiment/negative-words.txt @@ -0,0 +1,42 @@ +# Negative words for sentiment analysis +bad +terrible +awful +horrible +disgusting +hate +dislike +angry +sad +disappointed +upset +annoyed +frustrated +furious +disgusted +worst +pathetic +useless +stupid +ridiculous +absurd +dreadful +appalling +atrocious +ghastly +hideous +revolting +repulsive +abominable +detestable +loathsome +плохо +ужасно +отвратительно +кошмар +ненавижу +расстроен +злой +грустно +разочарован +раздражен \ No newline at end of file diff --git a/worker/src/main/resources/sentiment/positive-words.txt b/worker/src/main/resources/sentiment/positive-words.txt new file mode 100644 index 0000000..df337b0 --- /dev/null +++ b/worker/src/main/resources/sentiment/positive-words.txt @@ -0,0 +1,41 @@ +# Positive words for sentiment analysis +good +great +excellent +amazing +wonderful +fantastic +awesome +perfect +love +like +enjoy +happy +pleased +satisfied +delighted +thrilled +beautiful +brilliant +outstanding +superb +magnificent +terrific +splendid +marvelous +incredible +fabulous +spectacular +impressive +remarkable +exceptional +хорошо +отлично +прекрасно +замечательно +великолепно +чудесно +превосходно +восхитительно +блестяще +удивительно \ No newline at end of file